博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hystrix 源码分析--线程隔离
阅读量:7063 次
发布时间:2019-06-28

本文共 20575 字,大约阅读时间需要 68 分钟。

hot3.png

hystrix 源码分析--线程隔离 博客分类: java
@Test	public void testThreadUpdateCoreSize() throws Exception {		for (int i = 0; i < 10; i++) {			int size = 5;			if(i>1){				size = 14;			}			Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("EchoGroup"))					.andCommandKey(HystrixCommandKey.Factory.asKey("Echo"))					//com.netflix.hystrix.HystrixThreadPool 根据key来做线程跟李,为null时默认是withGroupKey					//com.netflix.hystrix.HystrixThreadPool.Factory					//com.netflix.hystrix.HystrixThreadPool.Factory.getInstance(HystrixThreadPoolKey, Setter)EEEEE					.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("EchoThreadPool"+i))					.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()							.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))					.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(size));			// 一个实例只能被调用一次			final ThreadEchoCommand command1 = new ThreadEchoCommand("xianlinbox",setter);			System.out.println("getThreadPoolKey:"+command1.getThreadPoolKey().name());						new Thread(new Runnable() {				@Override				public void run() {					System.out.println(command1.execute());				}			}).start();		}		Thread.sleep(10000);	}

 

   com.netflix.hystrix.AbstractCommand<R> 主要看这个类

 

    线程池隔离  

public interface HystrixThreadPool {    /**     * Implementation of {@link ThreadPoolExecutor}.     *      * @return ThreadPoolExecutor     */    public ExecutorService getExecutor();    public Scheduler getScheduler();    public Scheduler getScheduler(Func0
shouldInterruptThread); /** * Mark when a thread begins executing a command. */ public void markThreadExecution(); /** * Mark when a thread completes executing a command. */ public void markThreadCompletion(); /** * Mark when a command gets rejected from the threadpool */ public void markThreadRejection(); /** * Whether the queue will allow adding an item to it. *

* This allows dynamic control of the max queueSize versus whatever the actual max queueSize is so that dynamic changes can be done via property changes rather than needing an app * restart to adjust when commands should be rejected from queuing up. * * @return boolean whether there is space on the queue */ public boolean isQueueSpaceAvailable(); /** * @ExcludeFromJavadoc */ /* package */static class Factory { /* * Use the String from HystrixThreadPoolKey.name() instead of the HystrixThreadPoolKey instance as it's just an interface and we can't ensure the object * we receive implements hashcode/equals correctly and do not want the default hashcode/equals which would create a new threadpool for every object we get even if the name is the same */ /* package */final static ConcurrentHashMap

threadPools = new ConcurrentHashMap
(); /** * Get the {@link HystrixThreadPool} instance for a given {@link HystrixThreadPoolKey}. *

* This is thread-safe and ensures only 1 {@link HystrixThreadPool} per {@link HystrixThreadPoolKey}. * * @return {@link HystrixThreadPool} instance */ /* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name(); // this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }

   

  

/**     * @ExcludeFromJavadoc     * @ThreadSafe     */    /* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {        private final HystrixThreadPoolProperties properties;        private final BlockingQueue
queue; private final ThreadPoolExecutor threadPool; private final HystrixThreadPoolMetrics metrics; private final int queueSize; public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.queue = concurrencyStrategy.getBlockingQueue(queueSize); this.metrics = HystrixThreadPoolMetrics.getInstance( threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), properties); this.threadPool = metrics.getThreadPool(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); }

 

    信号量隔离

 com.netflix.hystrix.AbstractCommand.getExecutionSemaphore()   

/**     * Get the TryableSemaphore this HystrixCommand should use for execution if not running in a separate thread.     *      * @return TryableSemaphore     */    protected TryableSemaphore getExecutionSemaphore() {        if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.SEMAPHORE)) {            if (executionSemaphoreOverride == null) {                TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());                if (_s == null) {                    // we didn't find one cache so setup                    executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));                    // assign whatever got set (this or another thread)                    return executionSemaphorePerCircuit.get(commandKey.name());                } else {                    return _s;                }            } else {                return executionSemaphoreOverride;            }        } else {            // return NoOp implementation since we're not using SEMAPHORE isolation            return TryableSemaphoreNoOp.DEFAULT;        }    }

   

/**     * Get the TryableSemaphore this HystrixCommand should use if a fallback occurs.     *      * @return TryableSemaphore     */    protected TryableSemaphore getFallbackSemaphore() {        if (fallbackSemaphoreOverride == null) {            TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name());            if (_s == null) {                // we didn't find one cache so setup                fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));                // assign whatever got set (this or another thread)                return fallbackSemaphorePerCircuit.get(commandKey.name());            } else {                return _s;            }        } else {            return fallbackSemaphoreOverride;        }    }

   

   

com.netflix.hystrix.HystrixCommand.execute()  public R execute() {        try {            return queue().get();        } catch (Exception e) {            throw decomposeException(e);        }    }	rx.observables.BlockingObservable.toFuture()public Future
queue() { /* * --- Schedulers.immediate() * * We use the 'immediate' schedule since Future.get() is blocking so we don't want to bother doing the callback to the Future on a separate thread * as we don't need to separate the Hystrix thread from user threads since they are already providing it via the Future.get() call. * * We pass 'false' to tell the Observable we will block on it so it doesn't schedule an async timeout. * * This optimizes for using the calling thread to do the timeout rather than scheduling another thread. * * In a tight-loop of executing commands this optimization saves a few microseconds per execution. * It also just makes no sense to use a separate thread to timeout the command when the calling thread * is going to sit waiting on it. */ final Observable
o = toObservable(); final Future
f = o.toBlocking().toFuture();com.netflix.hystrix.AbstractCommand.toObservable() public Observable
toObservable() { /* this is a stateful object so can only be used once */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(this); } } final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); final AbstractCommand
_cmd = this; /* try from cache first */ if (requestCacheEnabled) { HystrixCommandResponseFromCache
fromCache = (HystrixCommandResponseFromCache
) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } //doOnCompleted handler already did all of the SUCCESS work //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(_cmd, false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(_cmd, true); //user code did run } } }; //mark the command as CANCELLED and store the latency (in addition to standard cleanup) final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); handleCommandEnd(_cmd, false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); handleCommandEnd(_cmd, true); //user code did run } } }; final Func0
> applyHystrixSemantics = new Func0
>() { @Override public Observable
call() { return applyHystrixSemantics(_cmd); } }; com.netflix.hystrix.AbstractCommand.applyHystrixSemantics(AbstractCommand
)private Observable
applyHystrixSemantics(final AbstractCommand
_cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ //判断熔断是否打开 if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1
markExceptionThrown = new Action1
() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }com.netflix.hystrix.AbstractCommand.getExecutionSemaphore() protected TryableSemaphore getExecutionSemaphore() { if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.SEMAPHORE)) { if (executionSemaphoreOverride == null) { TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // we didn't find one cache so setup executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return executionSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return executionSemaphoreOverride; } } else { // return NoOp implementation since we're not using SEMAPHORE isolation return TryableSemaphoreNoOp.DEFAULT; } } com.netflix.hystrix.AbstractCommand.executeCommandAndObserve(AbstractCommand
) private Observable
executeCommandAndObserve(final AbstractCommand
_cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); final Action1
markEmits = new Action1
() { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } } }; final Action0 markCompleted = new Action0() { @Override public void call() { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); } }; final Func1
> handleFallback = new Func1
>() { @Override public Observable
call(Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1
> setRequestContext = new Action1
>() { @Override public void call(Notification
rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable
execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator
(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }com.netflix.hystrix.AbstractCommand.executeCommandWithSpecifiedIsolation(AbstractCommand
) private Observable
executeCommandWithSpecifiedIsolation(final AbstractCommand
_cmd) { if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0
>() { @Override public Observable
call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0
() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT); } })); } else { return Observable.defer(new Func0
>() { @Override public Observable
call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }

 

转载于:https://my.oschina.net/xiaominmin/blog/1598978

你可能感兴趣的文章
左神算法进阶班4_1画出楼的轮廓
查看>>
力扣算法题—072编辑距离
查看>>
MySQL(数据库)
查看>>
gulp在webstorm里运行,告别cmd控制台!
查看>>
BIG biang教你误删oracle 怎么办,
查看>>
1.1 面试问题整理
查看>>
来美国一年半了,命里有时终须有,命里无时莫强求(2)
查看>>
css盒模型 以及块级元素的margin折叠问题 以及一些注意的问题
查看>>
POJ 1661 Help Jimmy(DP/最短路)
查看>>
[网络流24题] 最小路径覆盖问题
查看>>
微软职位内部推荐-Sr DEV
查看>>
jdk 与jre
查看>>
深度优化LNMP之Nginx (转)
查看>>
DP接口中AUX
查看>>
【转】在Eclipse中使用JUnit4进行单元测试(初级篇)
查看>>
【斜优DP】bzoj4518-Sdoi2016征途
查看>>
iOS开发网络篇—文件的上传
查看>>
Linode服务器部署docker环境
查看>>
在servlet中注入spring环境
查看>>
Android源代码编译——下载
查看>>