Spring Cloud Hystrix:【Spring Cloud学习】-3.Spring Cloud Hystrix 服务降级
1.Hystrix 如何触发熔断?
hystrix 熔断的注解 @HystrixCommand,是通过 HystrixCommandAspect 切面来处理的。
切入点定义如下
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
它在 methodsAnnotatedWithHystrixCommand 上触发调用
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
// 获取目标方法
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
// 判断方法上是否存在 @HystrixCommand 注解
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
// 如果是异步,则创建GenericObservableCommand, 否则创建GenericCommand
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
// 是否响应式,默认为非响应式
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
CommandExecutor.execute 实现如下
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: {
// 同步
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
// 异步
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
// 响应式
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
由于是同步调用,所以到 HystrixCommand.execute,这里通过 queue() 返回一个 future 对象
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
queue() 这个方法,返回了一个Future对象,这个future的实际处理委派给 f 实现,f是匿名内部类,当调用queue().get()方法时,最终调用 delegate.get 方法
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
// 创建一个委派对象
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
/*
* The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
* (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
* issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
* The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
* than that interruption request cannot be taken back.
*/
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
return res;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
delegate 对象由 toObservable() 创建,toObservable() 中调用了 applyHystrixSemantics() 方法
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
在 applyHystrixSemantics 中先通过 circuitBreaker.allowRequest() 判断是否允许当前请求,如果允许执行后续逻辑;否则 调用 handleShortCircuitViaFallback 执行 fallback 方法。
handleShortCircuitViaFallback 的调用路劲为:handleShortCircuitViaFallback() -> getFallbackOrThrowException() -> getFallbackObservable() -> HystrixCommand.getFallbackObservable() -> getFallback() -> GenericCommand.getFallback()
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
// 判断是否允许当前请求
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// 获取信号量,获取到执行 executeCommandAndObserve 方法
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 拒绝,执行fallback方法
return handleSemaphoreRejectionViaFallback();
}
} else {
// 不允许执行,直接调用fallback
return handleShortCircuitViaFallback();
}
}
通过以上多次调用后,最终到 GenericCommand.run()
@Override
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}