【源码分析-Spring-Cloud】-3.Spring Cloud Hystrix 实现原理


Spring Cloud Hystrix:【Spring Cloud学习】-3.Spring Cloud Hystrix 服务降级

1.Hystrix 如何触发熔断?

hystrix 熔断的注解 @HystrixCommand,是通过 HystrixCommandAspect 切面来处理的。

切入点定义如下

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}

它在 methodsAnnotatedWithHystrixCommand 上触发调用

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    // 获取目标方法
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    // 判断方法上是否存在 @HystrixCommand 注解
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time");
    }
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    // 如果是异步,则创建GenericObservableCommand, 否则创建GenericCommand
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
        metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
        // 是否响应式,默认为非响应式
        if (!metaHolder.isObservable()) {
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause();
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}

CommandExecutor.execute 实现如下

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    Validate.notNull(invokable);
    Validate.notNull(metaHolder);

    switch (executionType) {        
        case SYNCHRONOUS: {
            // 同步
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: {
            // 异步
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: {
            // 响应式
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}

由于是同步调用,所以到 HystrixCommand.execute,这里通过 queue() 返回一个 future 对象

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

queue() 这个方法,返回了一个Future对象,这个future的实际处理委派给 f 实现,f是匿名内部类,当调用queue().get()方法时,最终调用 delegate.get 方法

public Future<R> queue() {
    /*
      * The Future returned by Observable.toBlocking().toFuture() does not implement the
      * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
      * thus, to comply with the contract of Future, we must wrap around it.
      */
    // 创建一个委派对象
    final Future<R> delegate = toObservable().toBlocking().toFuture();

    final Future<R> f = new Future<R>() {

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (delegate.isCancelled()) {
                return false;
            }

            if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                /*
                  * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                  * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                  * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
                  * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                  * than that interruption request cannot be taken back.
                  */
                interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
            }

            final boolean res = delegate.cancel(interruptOnFutureCancel.get());

            if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                final Thread t = executionThread.get();
                if (t != null && !t.equals(Thread.currentThread())) {
                    t.interrupt();
                }
            }

            return res;
        }

        @Override
        public boolean isCancelled() {
            return delegate.isCancelled();
        }

        @Override
        public boolean isDone() {
            return delegate.isDone();
        }

        @Override
        public R get() throws InterruptedException, ExecutionException {
            return delegate.get();
        }

        @Override
        public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return delegate.get(timeout, unit);
        }

    };

    /* special handling of error states that throw immediately */
    if (f.isDone()) {
        try {
            f.get();
            return f;
        } catch (Exception e) {
            Throwable t = decomposeException(e);
            if (t instanceof HystrixBadRequestException) {
                return f;
            } else if (t instanceof HystrixRuntimeException) {
                HystrixRuntimeException hre = (HystrixRuntimeException) t;
                switch (hre.getFailureType()) {
                    case COMMAND_EXCEPTION:
                    case TIMEOUT:
                        // we don't throw these types from queue() only from queue().get() as they are execution errors
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        throw hre;
                }
            } else {
                throw Exceptions.sneakyThrow(t);
            }
        }
    }

    return f;
}

delegate 对象由 toObservable() 创建,toObservable() 中调用了 applyHystrixSemantics() 方法

Observable<R> hystrixObservable =
        Observable.defer(applyHystrixSemantics)
                .map(wrapWithAllOnNextHooks);

在 applyHystrixSemantics 中先通过 circuitBreaker.allowRequest() 判断是否允许当前请求,如果允许执行后续逻辑;否则 调用 handleShortCircuitViaFallback 执行 fallback 方法。

handleShortCircuitViaFallback 的调用路劲为:handleShortCircuitViaFallback() -> getFallbackOrThrowException() -> getFallbackObservable() -> HystrixCommand.getFallbackObservable() -> getFallback() -> GenericCommand.getFallback()

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // mark that we're starting execution on the ExecutionHook
    // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
    executionHook.onStart(_cmd);

    /* determine if we're allowed to execute */
    // 判断是否允许当前请求
    if (circuitBreaker.allowRequest()) {
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    executionSemaphore.release();
                }
            }
        };

        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };

        // 获取信号量,获取到执行 executeCommandAndObserve 方法
        if (executionSemaphore.tryAcquire()) {
            try {
                /* used to track userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                    .doOnError(markExceptionThrown)
                    .doOnTerminate(singleSemaphoreRelease)
                    .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            // 拒绝,执行fallback方法
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        // 不允许执行,直接调用fallback
        return handleShortCircuitViaFallback();
    }
}

通过以上多次调用后,最终到 GenericCommand.run()

@Override
protected Object run() throws Exception {
 &nbsp; &nbsp;LOGGER.debug("execute command: {}", getCommandKey().name());
 &nbsp; &nbsp;return process(new Action() {
 &nbsp; &nbsp; &nbsp; &nbsp;@Override
 &nbsp; &nbsp; &nbsp; &nbsp;Object execute() {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return getCommandAction().execute(getExecutionType());
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  });
}

文章作者: Soulballad
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Soulballad !
评论
 上一篇
【Spring Cloud学习】-4.Spring Cloud Feign 服务调用 【Spring Cloud学习】-4.Spring Cloud Feign 服务调用
1.简介1.1 概述 Feign is a declarative web service client. It makes writing web service clients easier. To use Feign create a
下一篇 
【Spring Cloud学习】-3.Spring Cloud Hystrix 服务降级 【Spring Cloud学习】-3.Spring Cloud Hystrix 服务降级
1.简介1.1 概述 In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a librar
  目录