@Test public void testThreadUpdateCoreSize() throws Exception { for (int i = 0; i < 10; i++) { int size = 5; if(i>1){ size = 14; } Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("EchoGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("Echo")) //com.netflix.hystrix.HystrixThreadPool 根据key来做线程跟李,为null时默认是withGroupKey //com.netflix.hystrix.HystrixThreadPool.Factory //com.netflix.hystrix.HystrixThreadPool.Factory.getInstance(HystrixThreadPoolKey, Setter)EEEEE .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("EchoThreadPool"+i)) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)) .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(size)); // 一个实例只能被调用一次 final ThreadEchoCommand command1 = new ThreadEchoCommand("xianlinbox",setter); System.out.println("getThreadPoolKey:"+command1.getThreadPoolKey().name()); new Thread(new Runnable() { @Override public void run() { System.out.println(command1.execute()); } }).start(); } Thread.sleep(10000); }
com.netflix.hystrix.AbstractCommand<R> 主要看这个类
线程池隔离
public interface HystrixThreadPool { /** * Implementation of {@link ThreadPoolExecutor}. * * @return ThreadPoolExecutor */ public ExecutorService getExecutor(); public Scheduler getScheduler(); public Scheduler getScheduler(Func0shouldInterruptThread); /** * Mark when a thread begins executing a command. */ public void markThreadExecution(); /** * Mark when a thread completes executing a command. */ public void markThreadCompletion(); /** * Mark when a command gets rejected from the threadpool */ public void markThreadRejection(); /** * Whether the queue will allow adding an item to it. * * This allows dynamic control of the max queueSize versus whatever the actual max queueSize is so that dynamic changes can be done via property changes rather than needing an app * restart to adjust when commands should be rejected from queuing up. * * @return boolean whether there is space on the queue */ public boolean isQueueSpaceAvailable(); /** * @ExcludeFromJavadoc */ /* package */static class Factory { /* * Use the String from HystrixThreadPoolKey.name() instead of the HystrixThreadPoolKey instance as it's just an interface and we can't ensure the object * we receive implements hashcode/equals correctly and do not want the default hashcode/equals which would create a new threadpool for every object we get even if the name is the same */ /* package */final static ConcurrentHashMap
threadPools = new ConcurrentHashMap (); /** * Get the {@link HystrixThreadPool} instance for a given {@link HystrixThreadPoolKey}. * * This is thread-safe and ensures only 1 {@link HystrixThreadPool} per {@link HystrixThreadPoolKey}. * * @return {@link HystrixThreadPool} instance */ /* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name(); // this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }
/** * @ExcludeFromJavadoc * @ThreadSafe */ /* package */static class HystrixThreadPoolDefault implements HystrixThreadPool { private final HystrixThreadPoolProperties properties; private final BlockingQueuequeue; private final ThreadPoolExecutor threadPool; private final HystrixThreadPoolMetrics metrics; private final int queueSize; public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.queue = concurrencyStrategy.getBlockingQueue(queueSize); this.metrics = HystrixThreadPoolMetrics.getInstance( threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), properties); this.threadPool = metrics.getThreadPool(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); }
信号量隔离
com.netflix.hystrix.AbstractCommand.getExecutionSemaphore()
/** * Get the TryableSemaphore this HystrixCommand should use for execution if not running in a separate thread. * * @return TryableSemaphore */ protected TryableSemaphore getExecutionSemaphore() { if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.SEMAPHORE)) { if (executionSemaphoreOverride == null) { TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // we didn't find one cache so setup executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return executionSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return executionSemaphoreOverride; } } else { // return NoOp implementation since we're not using SEMAPHORE isolation return TryableSemaphoreNoOp.DEFAULT; } }
/** * Get the TryableSemaphore this HystrixCommand should use if a fallback occurs. * * @return TryableSemaphore */ protected TryableSemaphore getFallbackSemaphore() { if (fallbackSemaphoreOverride == null) { TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // we didn't find one cache so setup fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return fallbackSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return fallbackSemaphoreOverride; } }
com.netflix.hystrix.HystrixCommand.execute() public R execute() { try { return queue().get(); } catch (Exception e) { throw decomposeException(e); } } rx.observables.BlockingObservable.toFuture()public Futurequeue() { /* * --- Schedulers.immediate() * * We use the 'immediate' schedule since Future.get() is blocking so we don't want to bother doing the callback to the Future on a separate thread * as we don't need to separate the Hystrix thread from user threads since they are already providing it via the Future.get() call. * * We pass 'false' to tell the Observable we will block on it so it doesn't schedule an async timeout. * * This optimizes for using the calling thread to do the timeout rather than scheduling another thread. * * In a tight-loop of executing commands this optimization saves a few microseconds per execution. * It also just makes no sense to use a separate thread to timeout the command when the calling thread * is going to sit waiting on it. */ final Observable o = toObservable(); final Future f = o.toBlocking().toFuture();com.netflix.hystrix.AbstractCommand.toObservable() public Observable toObservable() { /* this is a stateful object so can only be used once */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(this); } } final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); final AbstractCommand _cmd = this; /* try from cache first */ if (requestCacheEnabled) { HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache ) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } //doOnCompleted handler already did all of the SUCCESS work //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(_cmd, false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(_cmd, true); //user code did run } } }; //mark the command as CANCELLED and store the latency (in addition to standard cleanup) final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); handleCommandEnd(_cmd, false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); handleCommandEnd(_cmd, true); //user code did run } } }; final Func0 > applyHystrixSemantics = new Func0 >() { @Override public Observable call() { return applyHystrixSemantics(_cmd); } }; com.netflix.hystrix.AbstractCommand.applyHystrixSemantics(AbstractCommand )private Observable applyHystrixSemantics(final AbstractCommand _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ //判断熔断是否打开 if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1 markExceptionThrown = new Action1 () { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }com.netflix.hystrix.AbstractCommand.getExecutionSemaphore() protected TryableSemaphore getExecutionSemaphore() { if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.SEMAPHORE)) { if (executionSemaphoreOverride == null) { TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // we didn't find one cache so setup executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return executionSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return executionSemaphoreOverride; } } else { // return NoOp implementation since we're not using SEMAPHORE isolation return TryableSemaphoreNoOp.DEFAULT; } } com.netflix.hystrix.AbstractCommand.executeCommandAndObserve(AbstractCommand ) private Observable executeCommandAndObserve(final AbstractCommand _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); final Action1 markEmits = new Action1 () { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } } }; final Action0 markCompleted = new Action0() { @Override public void call() { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); } }; final Func1 > handleFallback = new Func1 >() { @Override public Observable call(Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1 > setRequestContext = new Action1 >() { @Override public void call(Notification rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator (_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }com.netflix.hystrix.AbstractCommand.executeCommandWithSpecifiedIsolation(AbstractCommand ) private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd) { if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0 >() { @Override public Observable call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0 () { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT); } })); } else { return Observable.defer(new Func0 >() { @Override public Observable call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }