这个队列与DCQ的区别在于,前者是Refine线程处理,用于在堆的日常运维中追踪被修改的内存区域,优化垃圾收集的效率,SATB mark queue是mutator线程协助处理,用于记录并发标记阶段开始时对象的引用状态,确保标记的完整性和一致性。并且两者处理的数据类型和GC过程中的角色不一样:SATB Mark Queue 处理对象引用,特别是在堆修改前的状态;Dirty Card Queue 处理的是堆内存中的区域(card),这些区域在被修改时被标记为脏;SATB Mark Queue 在并发标记阶段发挥作用,帮助实现堆状态的一致性快照;Dirty Card Queue 在整个垃圾收集过程中都可能被使用,用于标记和处理那些自上次垃圾收集以来发生变化的堆区域。
正如 Safe Point 名称的寓意一样,Safe Point 是一个线程可以安全停留在这里的代码点。当我们需要进行 GC 操作的时候,JVM 可以让所有线程在 Safe Point 处停留下来,等到所有线程都停在 Safe Point 处时,就可以进行内存引用分析,从而确定哪些对象是存活的、哪些对象是不存活的。
对于上面这些操作,都需要知道现场的各种信息,例如寄存器有什么内容,堆使用情况等等。在做这些操作的时候,线程需要暂停,等到这些操作完成才行,否则会有并发问题,这就需要 Safe Point 的存在。
因此,我们可以将 Safe Point 理解成代码执行过程中的一些特殊位置,当线程执行到这个位置时,线程可以暂停。 Safe Point 处保存了其他位置没有的一些当前线程信息,可以提供给其他线程读取,这些信息包括:线程上下文信息,对象的内部指针等。
而 Stop the World 就是所有线程同时进入 Safe Point 并停留在那里,等待 JVM 进行内存分析扫描,接着进行内存垃圾回收的时间。
为啥需要 Safe Point
前面我们说到,Safe Point 其实就是一个代码的特殊位置,在这个位置时线程可以暂停下来。而当我们进行 GC 的时候,所有线程都要进入到 Safe Point 处,才可以进行内存的分析及垃圾回收。根据这个过程,其实我们可以看到:Safe Point 其实就是栅栏的作用,让所有线程停下来,否则如果所有线程都在运行的话,JVM 无法进行对象引用的分析,那么也无法进行垃圾回收了。
此外,另一个重要的 Java 线程特性 —— interrupted 也是根据 Safe Point 实现的。当我们在代码里写入 Thread.interrupt() 时,只有线程运行到 Safe Point 处时才知道是否发生了 interrupted。因此,Safe Point 也承担了存储线程通信的功能。
Get the object for the given bean instance, either the bean instance itself or its created object in case of a FactoryBean.
Now we have the bean instance, which may be a normal bean or a FactoryBean. If it’s a FactoryBean, we use it to create a bean instance.
protectedObjectgetObjectFromFactoryBean(FactoryBean<?>factory,StringbeanName,booleanshouldPostProcess){if(factory.isSingleton()&&this.containsSingleton(beanName)){synchronized(this.getSingletonMutex()){Objectobject=this.factoryBeanObjectCache.get(beanName);if(object==null){object=this.doGetObjectFromFactoryBean(factory,beanName);ObjectalreadyThere=this.factoryBeanObjectCache.get(beanName);if(alreadyThere!=null){object=alreadyThere;}else{if(shouldPostProcess){if(this.isSingletonCurrentlyInCreation(beanName)){returnobject;}this.beforeSingletonCreation(beanName);try{object=this.postProcessObjectFromFactoryBean(object,beanName);}catch(Throwablevar14){thrownewBeanCreationException(beanName,"Post-processing of FactoryBean's singleton object failed",var14);}finally{this.afterSingletonCreation(beanName);}}if(this.containsSingleton(beanName)){this.factoryBeanObjectCache.put(beanName,object);}}}returnobject;}}else{Objectobject=this.doGetObjectFromFactoryBean(factory,beanName);if(shouldPostProcess){try{object=this.postProcessObjectFromFactoryBean(object,beanName);}catch(Throwablevar17){thrownewBeanCreationException(beanName,"Post-processing of FactoryBean's object failed",var17);}}returnobject;}}
try{// Give BeanPostProcessors a chance to return a proxy instead of the target bean instance.Objectbean=resolveBeforeInstantiation(beanName,mbdToUse);if(bean!=null){// 如果这里生成了代理的bean instance会直接返回returnbean;}}cache(Throwableex){// throw exception }try{// 创建bean instanceObjectbeanInstance=doCreateBean(beanName,mbdToUse,args);// ...}
protectedObjectdoCreateBean(StringbeanName,RootBeanDefinitionmbd,@NullableObject[]args)throwsBeanCreationException{BeanWrapperinstanceWrapper=null;if(mbd.isSingleton()){instanceWrapper=(BeanWrapper)this.factoryBeanInstanceCache.remove(beanName);}if(instanceWrapper==null){instanceWrapper=this.createBeanInstance(beanName,mbd,args);}Objectbean=instanceWrapper.getWrappedInstance();Class<?>beanType=instanceWrapper.getWrappedClass();if(beanType!=NullBean.class){mbd.resolvedTargetType=beanType;}synchronized(mbd.postProcessingLock){if(!mbd.postProcessed){try{this.applyMergedBeanDefinitionPostProcessors(mbd,beanType,beanName);}catch(Throwablevar17){thrownewBeanCreationException(mbd.getResourceDescription(),beanName,"Post-processing of merged bean definition failed",var17);}mbd.markAsPostProcessed();}}booleanearlySingletonExposure=mbd.isSingleton()&&this.allowCircularReferences&&this.isSingletonCurrentlyInCreation(beanName);if(earlySingletonExposure){if(this.logger.isTraceEnabled()){this.logger.trace("Eagerly caching bean '"+beanName+"' to allow for resolving potential circular references");}this.addSingletonFactory(beanName,()->{returnthis.getEarlyBeanReference(beanName,mbd,bean);});}ObjectexposedObject=bean;try{this.populateBean(beanName,mbd,instanceWrapper);exposedObject=this.initializeBean(beanName,exposedObject,mbd);}catch(Throwablevar18){if(var18instanceofBeanCreationExceptionbce){if(beanName.equals(bce.getBeanName())){throwbce;}}thrownewBeanCreationException(mbd.getResourceDescription(),beanName,var18.getMessage(),var18);}if(earlySingletonExposure){ObjectearlySingletonReference=this.getSingleton(beanName,false);if(earlySingletonReference!=null){if(exposedObject==bean){exposedObject=earlySingletonReference;}elseif(!this.allowRawInjectionDespiteWrapping&&this.hasDependentBean(beanName)){String[]dependentBeans=this.getDependentBeans(beanName);Set<String>actualDependentBeans=newLinkedHashSet(dependentBeans.length);String[]var12=dependentBeans;intvar13=dependentBeans.length;for(intvar14=0;var14<var13;++var14){StringdependentBean=var12[var14];if(!this.removeSingletonIfCreatedForTypeCheckOnly(dependentBean)){actualDependentBeans.add(dependentBean);}}if(!actualDependentBeans.isEmpty()){thrownewBeanCurrentlyInCreationException(beanName,"Bean with name '"+beanName+"' has been injected into other beans ["+StringUtils.collectionToCommaDelimitedString(actualDependentBeans)+"] in its raw version as part of a circular reference, but has eventually been wrapped. This means that said other beans do not use the final version of the bean. This is often the result of over-eager type matching - consider using 'getBeanNamesForType' with the 'allowEagerInit' flag turned off, for example.");}}}}try{this.registerDisposableBeanIfNecessary(beanName,bean,mbd);returnexposedObject;}catch(BeanDefinitionValidationExceptionvar16){thrownewBeanCreationException(mbd.getResourceDescription(),beanName,"Invalid destruction signature",var16);}}
Give any InstantiationAwareBeanPostProcessors the opportunity to modify the state of the bean before properties are set. This can be used, for example, to support styles of field injection.
publicinterfaceScope{/**
* Return the object with the given name from the underlying scope
*/Objectget(Stringname,ObjectFactory<?>objectFactory);/**
* Remove the object with the given name from the underlying scope.
*/Objectremove(Stringname);}
publicinterfaceBeanFactoryPostProcessor{/**
* Modify the application context's internal bean factory after its standard
* initialization. All bean definitions will have been loaded, but no beans
* will have been instantiated yet. This allows for overriding or adding
* properties even to eager-initializing beans.
*/voidpostProcessBeanFactory(ConfigurableListableBeanFactorybeanFactory)throwsBeansException;}
publicinterfaceBeanPostProcessor{/**
* Apply this BeanPostProcessor to the given new bean instance before any bean
* initialization callbacks (like InitializingBean's afterPropertiesSet
* or a custom init-method).
* The returned bean instance may be a wrapper around the original.
*/@NullabledefaultObjectpostProcessBeforeInitialization(Objectbean,StringbeanName)throwsBeansException{returnbean;}/**
* Apply this BeanPostProcessor to the given new bean instance after any bean
* initialization callbacks (like InitializingBean's afterPropertiesSet
* or a custom init-method).
* The returned bean instance may be a wrapper around the original.
*/@NullabledefaultObjectpostProcessAfterInitialization(Objectbean,StringbeanName)throwsBeansException{returnbean;}}
publicAnnotatedBeanDefinitionReader(BeanDefinitionRegistryregistry,Environmentenvironment){Assert.notNull(registry,"BeanDefinitionRegistry must not be null");Assert.notNull(environment,"Environment must not be null");this.registry=registry;this.conditionEvaluator=newConditionEvaluator(registry,environment,null);//这里注册了注解处理配置相关的后置处理器AnnotationConfigUtils.registerAnnotationConfigProcessors(this.registry);}
@Overridepublicvoidregister(Class<?>...componentClasses){Assert.notEmpty(componentClasses,"At least one component class must be specified");StartupStepregisterComponentClass=this.getApplicationStartup().start("spring.context.component-classes.register").tag("classes",()->Arrays.toString(componentClasses));//使用我们上面创建的Reader注册配置类this.reader.register(componentClasses);registerComponentClass.end();}
publicvoidparse(Set<BeanDefinitionHolder>configCandidates){for(BeanDefinitionHolderholder:configCandidates){BeanDefinitionbd=holder.getBeanDefinition();try{if(bdinstanceofAnnotatedBeanDefinitionannotatedBeanDef){parse(annotatedBeanDef,holder.getBeanName());}elseif(bdinstanceofAbstractBeanDefinitionabstractBeanDef&&abstractBeanDef.hasBeanClass()){parse(abstractBeanDef.getBeanClass(),holder.getBeanName());}else{parse(bd.getBeanClassName(),holder.getBeanName());}}catch(BeanDefinitionStoreExceptionex){throwex;}catch(Throwableex){thrownewBeanDefinitionStoreException("Failed to parse configuration class ["+bd.getBeanClassName()+"]",ex);}}this.deferredImportSelectorHandler.process();}
@NullableprotectedfinalSourceClassdoProcessConfigurationClass(ConfigurationClassconfigClass,SourceClasssourceClass,Predicate<String>filter)throwsIOException{if(configClass.getMetadata().isAnnotated(Component.class.getName())){// Recursively process any member (nested) classes firstprocessMemberClasses(configClass,sourceClass,filter);}// Process any @PropertySource annotationsfor(AnnotationAttributespropertySource:AnnotationConfigUtils.attributesForRepeatable(sourceClass.getMetadata(),org.springframework.context.annotation.PropertySource.class,PropertySources.class,true)){if(this.propertySourceRegistry!=null){this.propertySourceRegistry.processPropertySource(propertySource);}else{logger.info("Ignoring @PropertySource annotation on ["+sourceClass.getMetadata().getClassName()+"]. Reason: Environment must implement ConfigurableEnvironment");}}// Search for locally declared @ComponentScan annotations first.Set<AnnotationAttributes>componentScans=AnnotationConfigUtils.attributesForRepeatable(sourceClass.getMetadata(),ComponentScan.class,ComponentScans.class,MergedAnnotation::isDirectlyPresent);// Fall back to searching for @ComponentScan meta-annotations (which indirectly// includes locally declared composed annotations).if(componentScans.isEmpty()){componentScans=AnnotationConfigUtils.attributesForRepeatable(sourceClass.getMetadata(),ComponentScan.class,ComponentScans.class,MergedAnnotation::isMetaPresent);}if(!componentScans.isEmpty()){List<Condition>registerBeanConditions=collectRegisterBeanConditions(configClass);if(!registerBeanConditions.isEmpty()){thrownewApplicationContextException("Component scan could not be used with conditions in REGISTER_BEAN phase: "+registerBeanConditions);}for(AnnotationAttributescomponentScan:componentScans){// The config class is annotated with @ComponentScan -> perform the scan immediatelySet<BeanDefinitionHolder>scannedBeanDefinitions=this.componentScanParser.parse(componentScan,sourceClass.getMetadata().getClassName());// Check the set of scanned definitions for any further config classes and parse recursively if neededfor(BeanDefinitionHolderholder:scannedBeanDefinitions){BeanDefinitionbdCand=holder.getBeanDefinition().getOriginatingBeanDefinition();if(bdCand==null){bdCand=holder.getBeanDefinition();}if(ConfigurationClassUtils.checkConfigurationClassCandidate(bdCand,this.metadataReaderFactory)){parse(bdCand.getBeanClassName(),holder.getBeanName());}}}}// Process any @Import annotationsprocessImports(configClass,sourceClass,getImports(sourceClass),filter,true);// Process any @ImportResource annotationsAnnotationAttributesimportResource=AnnotationConfigUtils.attributesFor(sourceClass.getMetadata(),ImportResource.class);if(importResource!=null){String[]resources=importResource.getStringArray("locations");Class<?extendsBeanDefinitionReader>readerClass=importResource.getClass("reader");for(Stringresource:resources){StringresolvedResource=this.environment.resolveRequiredPlaceholders(resource);configClass.addImportedResource(resolvedResource,readerClass);}}// Process individual @Bean methodsSet<MethodMetadata>beanMethods=retrieveBeanMethodMetadata(sourceClass);for(MethodMetadatamethodMetadata:beanMethods){configClass.addBeanMethod(newBeanMethod(methodMetadata,configClass));}// Process default methods on interfacesprocessInterfaces(configClass,sourceClass);// Process superclass, if anyif(sourceClass.getMetadata().hasSuperClass()){Stringsuperclass=sourceClass.getMetadata().getSuperClassName();if(superclass!=null&&!superclass.startsWith("java")){booleansuperclassKnown=this.knownSuperclasses.containsKey(superclass);this.knownSuperclasses.add(superclass,configClass);if(!superclassKnown){// Superclass found, return its annotation metadata and recursereturnsourceClass.getSuperClass();}}}// No superclass -> processing is completereturnnull;}
privatevoidprocessImports(ConfigurationClassconfigClass,SourceClasscurrentSourceClass,Collection<SourceClass>importCandidates,Predicate<String>filter,booleancheckForCircularImports){if(importCandidates.isEmpty()){return;}if(checkForCircularImports&&isChainedImportOnStack(configClass)){this.problemReporter.error(newCircularImportProblem(configClass,this.importStack));}else{this.importStack.push(configClass);try{for(SourceClasscandidate:importCandidates){if(candidate.isAssignable(ImportSelector.class)){// Candidate class is an ImportSelector -> delegate to it to determine importsClass<?>candidateClass=candidate.loadClass();ImportSelectorselector=ParserStrategyUtils.instantiateClass(candidateClass,ImportSelector.class,this.environment,this.resourceLoader,this.registry);Predicate<String>selectorFilter=selector.getExclusionFilter();if(selectorFilter!=null){filter=filter.or(selectorFilter);}if(selectorinstanceofDeferredImportSelectordeferredImportSelector){this.deferredImportSelectorHandler.handle(configClass,deferredImportSelector);}else{String[]importClassNames=selector.selectImports(currentSourceClass.getMetadata());Collection<SourceClass>importSourceClasses=asSourceClasses(importClassNames,filter);processImports(configClass,currentSourceClass,importSourceClasses,filter,false);}}elseif(candidate.isAssignable(ImportBeanDefinitionRegistrar.class)){// Candidate class is an ImportBeanDefinitionRegistrar ->// delegate to it to register additional bean definitionsClass<?>candidateClass=candidate.loadClass();ImportBeanDefinitionRegistrarregistrar=ParserStrategyUtils.instantiateClass(candidateClass,ImportBeanDefinitionRegistrar.class,this.environment,this.resourceLoader,this.registry);configClass.addImportBeanDefinitionRegistrar(registrar,currentSourceClass.getMetadata());}else{// Candidate class not an ImportSelector or ImportBeanDefinitionRegistrar ->// process it as an @Configuration classthis.importStack.registerImport(currentSourceClass.getMetadata(),candidate.getMetadata().getClassName());processConfigurationClass(candidate.asConfigClass(configClass),filter);}}}catch(BeanDefinitionStoreExceptionex){throwex;}catch(Throwableex){thrownewBeanDefinitionStoreException("Failed to process import candidates for configuration class ["+configClass.getMetadata().getClassName()+"]: "+ex.getMessage(),ex);}finally{this.importStack.pop();}}}
voidprocessGroupImports(){for(DeferredImportSelectorGroupinggrouping:this.groupings.values()){Predicate<String>filter=grouping.getCandidateFilter();grouping.getImports().forEach(entry->{ConfigurationClassconfigurationClass=this.configurationClasses.get(entry.getMetadata());try{processImports(configurationClass,asSourceClass(configurationClass,filter),Collections.singleton(asSourceClass(entry.getImportClassName(),filter)),filter,false);}catch(BeanDefinitionStoreExceptionex){throwex;}catch(Throwableex){thrownewBeanDefinitionStoreException("Failed to process import candidates for configuration class ["+configurationClass.getMetadata().getClassName()+"]",ex);}});}}
publicvoidloadBeanDefinitions(Set<ConfigurationClass>configurationModel){TrackedConditionEvaluatortrackedConditionEvaluator=newTrackedConditionEvaluator();for(ConfigurationClassconfigClass:configurationModel){loadBeanDefinitionsForConfigurationClass(configClass,trackedConditionEvaluator);}}/**
* Read a particular {@link ConfigurationClass}, registering bean definitions
* for the class itself and all of its {@link Bean} methods.
*/privatevoidloadBeanDefinitionsForConfigurationClass(ConfigurationClassconfigClass,TrackedConditionEvaluatortrackedConditionEvaluator){if(trackedConditionEvaluator.shouldSkip(configClass)){StringbeanName=configClass.getBeanName();if(StringUtils.hasLength(beanName)&&this.registry.containsBeanDefinition(beanName)){this.registry.removeBeanDefinition(beanName);}this.importRegistry.removeImportingClass(configClass.getMetadata().getClassName());return;}if(configClass.isImported()){registerBeanDefinitionForImportedConfigurationClass(configClass);}for(BeanMethodbeanMethod:configClass.getBeanMethods()){loadBeanDefinitionsForBeanMethod(beanMethod);}loadBeanDefinitionsFromImportedResources(configClass.getImportedResources());loadBeanDefinitionsFromRegistrars(configClass.getImportBeanDefinitionRegistrars());}
staticfinalintWAITING=1;// must be 1 staticfinalintCANCELLED=0x80000000;// must be negative staticfinalintCOND=2;// in a condition wait/**
* Head of the wait queue, lazily initialized.
*/privatetransientvolatileNodehead;/**
* Tail of the wait queue. After initialization, modified only via casTail. */privatetransientvolatileNodetail;/**
* The synchronization state. */privatevolatileintstate;
abstractstaticclassNode{volatileNodeprev;// initially attached via casTail volatileNodenext;// visibly nonnull when signallable Threadwaiter;// visibly nonnull when enqueued volatileintstatus;// written by owner, atomic bit ops by others // methods for atomic operations finalbooleancasPrev(Nodec,Nodev){// for cleanQueue returnU.weakCompareAndSetReference(this,PREV,c,v);}finalbooleancasNext(Nodec,Nodev){// for cleanQueue returnU.weakCompareAndSetReference(this,NEXT,c,v);}finalintgetAndUnsetStatus(intv){// for signalling returnU.getAndBitwiseAndInt(this,STATUS,~v);}finalvoidsetPrevRelaxed(Nodep){// for off-queue assignment U.putReference(this,PREV,p);}finalvoidsetStatusRelaxed(ints){// for off-queue assignment U.putInt(this,STATUS,s);}finalvoidclearStatus(){// for reducing unneeded signals U.putIntOpaque(this,STATUS,0);}privatestaticfinallongSTATUS=U.objectFieldOffset(Node.class,"status");privatestaticfinallongNEXT=U.objectFieldOffset(Node.class,"next");privatestaticfinallongPREV=U.objectFieldOffset(Node.class,"prev");}
BigMap
Sync#lock()
1
2
3
4
finalvoidlock(){if(!initialTryLock())acquire(1);}
[Fair|Unfair]Sync#initialTryLock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Acquires only if reentrant or queue is empty. */finalbooleaninitialTryLock(){Threadcurrent=Thread.currentThread();intc=getState();if(c==0){if(!hasQueuedThreads()&&compareAndSetState(0,1)){setExclusiveOwnerThread(current);returntrue;}}elseif(getExclusiveOwnerThread()==current){if(++c<0)// overflow thrownewError("Maximum lock count exceeded");setState(c);returntrue;}returnfalse;}
/**
* Queries whether any threads are waiting to acquire. Note that * because cancellations due to interrupts and timeouts may occur * at any time, a {@code true} return does not guarantee that any
* other thread will ever acquire. * * @return {@code true} if there may be other threads waiting to acquire
*/publicfinalbooleanhasQueuedThreads(){for(Nodep=tail,h=head;p!=h&&p!=null;p=p.prev)if(p.status>=0)returntrue;returnfalse;}
AQS#acquire(int arg)
1
2
3
4
5
6
7
8
9
10
11
12
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
* * @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like. */publicfinalvoidacquire(intarg){if(!tryAcquire(arg))acquire(null,arg,false,false,false,0L);}
该方法调用AQS中待实现类实现的tryAcquire()方法,以自定义的方式尝试获取一次锁,若获取失败,则调用AQS#acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)方法
FairSync#tryAcquire(int acquires)
1
2
3
4
5
6
7
8
9
10
/**
* Acquires only if thread is first waiter or empty */protectedfinalbooleantryAcquire(intacquires){if(getState()==0&&!hasQueuedPredecessors()&&compareAndSetState(0,acquires)){setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}
/**
* Queries whether any threads have been waiting to acquire longer * than the current thread. * * <p>An invocation of this method is equivalent to (but may be
* more efficient than): * <pre> {@code
* getFirstQueuedThread() != Thread.currentThread()
* && hasQueuedThreads()}</pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current * thread. Likewise, it is possible for another thread to win a * race to enqueue after this method has returned {@code false},
* due to the queue being empty. * * <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this: * * <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) { * // A reentrant acquire; increment hold count * return true; * } else if (hasQueuedPredecessors()) { * return false; * } else { * // try to acquire normally * } * }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty * @since 1.7
*/publicfinalbooleanhasQueuedPredecessors(){Threadfirst=null;Nodeh,s;if((h=head)!=null&&((s=h.next)==null||(first=s.waiter)==null||s.prev==null))first=getFirstQueuedThread();// retry via getFirstQueuedThread returnfirst!=null&&first!=Thread.currentThread();}
[!NOTE] GPT解释
Detailed Explanation of hasQueuedPredecessors():
This method provides a way to determine if the calling thread should wait in line or attempt to acquire the lock directly, based on whether there are other threads ahead of it in the queue.
Checking the Queue:
The method starts by initializing Thread first to null and declaring Node h and Node s.
It then checks if the head of the queue (h) is not null. If the head exists, it proceeds to check the next node (s = h.next).
Evaluating Conditions:
If the head’s next node (s) is null, or s.waiter (the thread in the s node) is null, or s.prev (the link back to the head) is null, it implies a possibility of inconsistency in the queue or that the queue might be transitioning states (e.g., nodes being added or removed).
In such cases, it uses getFirstQueuedThread() to reliably get the first thread in the queue and reassess the situation. This call is more robust but potentially less efficient, hence used as a fallback.
3.Return Logic:
Finally, the method returns true if first (the first queued thread determined either directly or through the fallback) is not null and is not the current thread. This means there is at least one thread that has been waiting longer than the current thread.
If first is null or it is the current thread, it returns false, indicating either the queue is empty or the current thread is at the head of the queue.
AQS#acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)
finalintacquire(Nodenode,intarg,booleanshared,booleaninterruptible,booleantimed,longtime){Threadcurrent=Thread.currentThread();bytespins=0,postSpins=0;// retries upon unpark of first threadbooleaninterrupted=false,first=false;Nodepred=null;// predecessor of node when enqueued/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if queue is not initialized, do so by attaching new header node
* resort to spinwait on OOME trying to create node
* else if node not yet created, create it
* resort to spinwait on OOME trying to create node
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/for(;;){if(!first&&(pred=(node==null)?null:node.prev)!=null&&!(first=(head==pred))){if(pred.status<0){cleanQueue();// predecessor cancelledcontinue;}elseif(pred.prev==null){Thread.onSpinWait();// ensure serializationcontinue;}}if(first||pred==null){booleanacquired;try{if(shared)acquired=(tryAcquireShared(arg)>=0);elseacquired=tryAcquire(arg);}catch(Throwableex){cancelAcquire(node,interrupted,false);throwex;}if(acquired){if(first){node.prev=null;head=node;pred.next=null;node.waiter=null;if(shared)signalNextIfShared(node);if(interrupted)current.interrupt();}return1;}}Nodet;if((t=tail)==null){// initialize queueif(tryInitializeHead()==null)returnacquireOnOOME(shared,arg);}elseif(node==null){// allocate; retry before enqueuetry{node=(shared)?newSharedNode():newExclusiveNode();}catch(OutOfMemoryErroroome){returnacquireOnOOME(shared,arg);}}elseif(pred==null){// try to enqueuenode.waiter=current;node.setPrevRelaxed(t);// avoid unnecessary fenceif(!casTail(t,node))node.setPrevRelaxed(null);// back outelset.next=node;}elseif(first&&spins!=0){--spins;// reduce unfairness on rewaitsThread.onSpinWait();}elseif(node.status==0){node.status=WAITING;// enable signal and recheck}else{longnanos;spins=postSpins=(byte)((postSpins<<1)|1);if(!timed)LockSupport.park(this);elseif((nanos=time-System.nanoTime())>0L)LockSupport.parkNanos(this,nanos);elsebreak;node.clearStatus();if((interrupted|=Thread.interrupted())&&interruptible)break;}}returncancelAcquire(node,interrupted,interruptible);}
[!NOTE] GPT对于Status的解释
Understanding Status Management
Role of WAITING Status:
> + When a node (representing a thread in a queue) is set to WAITING, it typically indicates that the thread is actively waiting and should remain parked until explicitly signalled. The WAITING status is used to manage thread wake-up correctly and to avoid lost wake-ups.
Role of Status Zero (0):
> + A status of 0 generally indicates that the node is not in any particular waiting or signal state. This can mean several things depending on the context:
> + The thread is not currently waiting.
> + The thread has been woken up and is about to retry acquiring the lock.
> + The thread has completed its operation and is being cleaned up.
Acquiring the Lock with Status Zero:
> + Setting the status to zero does not by itself grant the lock to the node. Instead, it signifies that the node is in a state eligible to attempt to acquire the lock. When a thread (node) attempts to acquire the lock, having a status of zero implies that it is neither parked nor scheduled to be parked. This status allows it to enter the lock acquisition logic without being delayed by unnecessary waits.
Transition from WAITING to Zero:
The transition from WAITING to 0 typically occurs when:
The node is signalled (either by LockSupport.unpark() or similar mechanisms) that it should wake up and retry acquiring the lock.
The thread successfully acquires the lock and subsequently clears its status to indicate it is no longer waiting.
The thread is aborting its wait, possibly due to a timeout or an interrupt, and needs to clear its status as part of cleanup operations.
Practical Implication
In a Blocking Scenario (Park):
While the node is WAITING, the thread is typically parked (LockSupport.park()) and will remain so until it is unparked or otherwise signalled. The WAITING status helps ensure that the node remains correctly identified as being in need of a wake-up signal.
In a Lock Acquisition Scenario:
A node may attempt to acquire the lock regardless of its initial status (either 0 or transitioning from WAITING). If the lock acquisition is successful, any status related to waiting is irrelevant post-acquisition; thus, clearing the status to 0 is often an administrative or cleanup action, preparing the node for potential reuse or ensuring it does not remain marked as waiting unnecessarily.
/**
* Possibly repeatedly traverses from tail, unsplicing cancelled
* nodes until none are found. Unparks nodes that may have been
* relinked to be next eligible acquirer.
*/privatevoidcleanQueue(){for(;;){// restart pointfor(Nodeq=tail,s=null,p,n;;){// (p, q, s) triplesif(q==null||(p=q.prev)==null)return;// end of listif(s==null?tail!=q:(s.prev!=q||s.status<0))break;// inconsistentif(q.status<0){// cancelledif((s==null?casTail(q,p):s.casPrev(q,p))&&q.prev==p){p.casNext(q,s);// OK if failsif(p.prev==null)signalNext(p);}break;}if((n=p.next)!=q){// help finishif(n!=null&&q.prev==p){p.casNext(n,q);if(p.prev==null)signalNext(p);}break;}s=q;q=q.prev;}}}
protectedfinalbooleantryRelease(intreleases){intc=getState()-releases;if(getExclusiveOwnerThread()!=Thread.currentThread())thrownewIllegalMonitorStateException();booleanfree=(c==0);if(free)setExclusiveOwnerThread(null);setState(c);returnfree;}/**
* Wakes up the successor of given node, if one exists, and unsets its
* WAITING status to avoid park race. This may fail to wake up an
* eligible thread when one or more have been cancelled, but
* cancelAcquire ensures liveness.
*/privatestaticvoidsignalNext(Nodeh){Nodes;if(h!=null&&(s=h.next)!=null&&s.status!=0){s.getAndUnsetStatus(WAITING);LockSupport.unpark(s.waiter);}}
staticfinalclassConditionNodeextendsNodeimplementsForkJoinPool.ManagedBlocker{ConditionNodenextWaiter;// link to next waiting node// ......}/**
* Condition implementation for a {@link AbstractQueuedSynchronizer}
* serving as the basis of a {@link Lock} implementation.
*
* <p>Method documentation for this class describes mechanics,
* not behavioral specifications from the point of view of Lock
* and Condition users. Exported versions of this class will in
* general need to be accompanied by documentation describing
* condition semantics that rely on those of the associated
* {@code AbstractQueuedSynchronizer}.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/publicclassConditionObjectimplementsCondition,java.io.Serializable{privatestaticfinallongserialVersionUID=1173984872572414699L;/** First node of condition queue. */privatetransientConditionNodefirstWaiter;/** Last node of condition queue. */privatetransientConditionNodelastWaiter;// ......}
/**
* Implements interruptible condition wait.
* <ol>
* <li>If current thread is interrupted, throw InterruptedException.
* <li>Save lock state returned by {@link #getState}.
* <li>Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li>Block until signalled or interrupted.
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/publicfinalvoidawait()throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();ConditionNodenode=newConditionNode();if(node==null)return;intsavedState=enableWait(node);LockSupport.setCurrentBlocker(this);// for back-compatibilitybooleaninterrupted=false,cancelled=false,rejected=false;while(!canReacquire(node)){if(interrupted|=Thread.interrupted()){if(cancelled=(node.getAndUnsetStatus(COND)&COND)!=0)break;// else interrupted after signal}elseif((node.status&COND)!=0){try{if(rejected)node.block();elseForkJoinPool.managedBlock(node);}catch(RejectedExecutionExceptionex){rejected=true;}catch(InterruptedExceptionie){interrupted=true;}}elseThread.onSpinWait();// awoke while enqueuing}LockSupport.setCurrentBlocker(null);node.clearStatus();acquire(node,savedState,false,false,false,0L);if(interrupted){if(cancelled){unlinkCancelledWaiters(node);thrownewInterruptedException();}Thread.currentThread().interrupt();}}
/**
* Adds node to condition list and releases lock.
*
* @param node the node
* @return savedState to reacquire after wait
*/privateintenableWait(ConditionNodenode){if(isHeldExclusively()){node.waiter=Thread.currentThread();node.setStatusRelaxed(COND|WAITING);ConditionNodelast=lastWaiter;if(last==null)firstWaiter=node;elselast.nextWaiter=node;lastWaiter=node;intsavedState=getState();if(release(savedState))returnsavedState;}node.status=CANCELLED;// lock not held or inconsistentthrownewIllegalMonitorStateException();}
/**
* Returns true if a node that was initially placed on a condition
* queue is now ready to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/privatebooleancanReacquire(ConditionNodenode){// check links, not status to avoid enqueue raceNodep;// traverse unless known to be bidirectionally linkedreturnnode!=null&&(p=node.prev)!=null&&(p.next==node||isEnqueued(node));}
/**
* Removes and transfers one or all waiters to sync queue.
*/privatevoiddoSignal(ConditionNodefirst,booleanall){while(first!=null){ConditionNodenext=first.nextWaiter;if((firstWaiter=next)==null)lastWaiter=null;if((first.getAndUnsetStatus(COND)&COND)!=0){enqueue(first);if(!all)break;}first=next;}}
/**
* Enqueues the node unless null. (Currently used only for
* ConditionNodes; other cases are interleaved with acquires.)
*/finalvoidenqueue(ConditionNodenode){if(node!=null){booleanunpark=false;for(Nodet;;){if((t=tail)==null&&(t=tryInitializeHead())==null){unpark=true;// wake up to spin on OOMEbreak;}node.setPrevRelaxed(t);// avoid unnecessary fenceif(casTail(t,node)){t.next=node;if(t.status<0)// wake up to clean linkunpark=true;break;}}if(unpark)LockSupport.unpark(node.waiter);}}
Why Wake Up on Cancelled Status?: If the previous tail is cancelled, it might be necessary to wake up or signal other threads because the presence of a cancelled node at the tail can disrupt normal lock acquisition processes. The cancelled node may not be properly participating in the queue dynamics (like signaling next nodes), so handling or removing it quickly is crucial.
/** Main lock guarding all access */finalReentrantLocklock;/** Condition for waiting takes */@SuppressWarnings("serial")// Classes implementing Condition may be serializable. privatefinalConditionnotEmpty;/** Condition for waiting puts */@SuppressWarnings("serial")// Classes implementing Condition may be serializable. privatefinalConditionnotFull;/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity<=0)thrownewIllegalArgumentException();this.items=newObject[capacity];lock=newReentrantLock(fair);notEmpty=lock.newCondition();notFull=lock.newCondition();}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/privatevoidenqueue(Ee){// assert lock.isHeldByCurrentThread();// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;finalObject[]items=this.items;items[putIndex]=e;if(++putIndex==items.length)putIndex=0;count++;notEmpty.signal();}/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/privateEdequeue(){// assert lock.isHeldByCurrentThread();// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;finalObject[]items=this.items;@SuppressWarnings("unchecked")Ee=(E)items[takeIndex];items[takeIndex]=null;if(++takeIndex==items.length)takeIndex=0;count--;if(itrs!=null)itrs.elementDequeued();notFull.signal();returne;}
/** Head of queue */transientvolatileQNodehead;/** Tail of queue */transientvolatileQNodetail;/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/transientvolatileQNodecleanMe;TransferQueue(){QNodeh=newQNode(null,false);// initialize to dummy node.head=h;tail=h;}staticfinalclassQNodeimplementsForkJoinPool.ManagedBlocker{volatileQNodenext;// next node in queue volatileObjectitem;// CAS'ed to or from null volatileThreadwaiter;// to control park/unpark finalbooleanisData;QNode(Objectitem,booleanisData){this.item=item;this.isData=isData;}//......}
/**
* Puts or takes an item.
*/@SuppressWarnings("unchecked")Etransfer(Ee,booleantimed,longnanos){/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/QNodes=null;// constructed/reused as neededbooleanisData=(e!=null);for(;;){QNodet=tail,h=head,m,tn;// m is node to fulfillif(t==null||h==null);// inconsistentelseif(h==t||t.isData==isData){// empty or same-modeif(t!=tail)// inconsistent;elseif((tn=t.next)!=null)// lagging tailadvanceTail(t,tn);elseif(timed&&nanos<=0L)// can't waitreturnnull;elseif(t.casNext(null,(s!=null)?s:(s=newQNode(e,isData)))){advanceTail(t,s);longdeadline=timed?System.nanoTime()+nanos:0L;Threadw=Thread.currentThread();intstat=-1;// same idea as TransferStackObjectitem;while((item=s.item)==e){if((timed&&(nanos=deadline-System.nanoTime())<=0)||w.isInterrupted()){if(s.tryCancel(e)){clean(t,s);returnnull;}}elseif((item=s.item)!=e){break;// recheck}elseif(stat<=0){if(t.next==s){if(stat<0&&t.isFulfilled()){stat=0;// yield once if firstThread.yield();}else{stat=1;s.waiter=w;}}}elseif(!timed){LockSupport.setCurrentBlocker(this);try{ForkJoinPool.managedBlock(s);}catch(InterruptedExceptioncannotHappen){}LockSupport.setCurrentBlocker(null);}elseif(nanos>SPIN_FOR_TIMEOUT_THRESHOLD)LockSupport.parkNanos(this,nanos);}if(stat==1)s.forgetWaiter();if(!s.isOffList()){// not already unlinkedadvanceHead(t,s);// unlink if headif(item!=null)// and forget fieldss.item=s;}return(item!=null)?(E)item:e;}}elseif((m=h.next)!=null&&t==tail&&h==head){Threadwaiter;Objectx=m.item;booleanfulfilled=((isData==(x==null))&&x!=m&&m.casItem(x,e));advanceHead(h,m);// (help) dequeueif(fulfilled){if((waiter=m.waiter)!=null)LockSupport.unpark(waiter);return(x!=null)?(E)x:e;}}}}
publicclassDelayQueue<EextendsDelayed>extendsAbstractQueue<E>implementsBlockingQueue<E>{privatefinaltransientReentrantLocklock=newReentrantLock();privatefinalPriorityQueue<E>q=newPriorityQueue<E>();/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/privateThreadleader;/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/privatefinalConditionavailable=lock.newCondition();// ......}publicinterfaceDelayedextendsComparable<Delayed>{/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/longgetDelay(TimeUnitunit);
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/publicEtake()throwsInterruptedException{finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{for(;;){Efirst=q.peek();if(first==null)available.await();else{longdelay=first.getDelay(NANOSECONDS);if(delay<=0L)returnq.poll();first=null;// don't retain ref while waitingif(leader!=null)available.await();else{ThreadthisThread=Thread.currentThread();leader=thisThread;try{available.awaitNanos(delay);}finally{if(leader==thisThread)leader=null;}}}}}finally{if(leader==null&&q.peek()!=null)available.signal();lock.unlock();}}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/intc=ctl.get();if(workerCountOf(c)<corePoolSize){if(addWorker(command,true))return;c=ctl.get();}if(isRunning(c)&&workQueue.offer(command)){intrecheck=ctl.get();if(!isRunning(recheck)&&remove(command))reject(command);elseif(workerCountOf(recheck)==0)addWorker(null,false);}elseif(!addWorker(command,false))reject(command);}
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:for(intc=ctl.get();;){// Check if queue empty only if necessary.if(runStateAtLeast(c,SHUTDOWN)&&(runStateAtLeast(c,STOP)||firstTask!=null||workQueue.isEmpty()))returnfalse;for(;;){if(workerCountOf(c)>=((core?corePoolSize:maximumPoolSize)&COUNT_MASK))returnfalse;if(compareAndIncrementWorkerCount(c))breakretry;c=ctl.get();// Re-read ctlif(runStateAtLeast(c,SHUTDOWN))continueretry;// else CAS failed due to workerCount change; retry inner loop}}booleanworkerStarted=false;booleanworkerAdded=false;Workerw=null;try{w=newWorker(firstTask);finalThreadt=w.thread;if(t!=null){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.intc=ctl.get();if(isRunning(c)||(runStateLessThan(c,STOP)&&firstTask==null)){if(t.getState()!=Thread.State.NEW)thrownewIllegalThreadStateException();workers.add(w);workerAdded=true;ints=workers.size();if(s>largestPoolSize)largestPoolSize=s;}}finally{mainLock.unlock();}if(workerAdded){container.start(t);workerStarted=true;}}}finally{if(!workerStarted)addWorkerFailed(w);}returnworkerStarted;}
privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/** Thread this worker is running in. Null if factory fails. */@SuppressWarnings("serial")// Unlikely to be serializablefinalThreadthread;/** Initial task to run. Possibly null. */@SuppressWarnings("serial")// Not statically typed as SerializableRunnablefirstTask;/** Per-thread task counter */volatilelongcompletedTasks;/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/Worker(RunnablefirstTask){setState(-1);// inhibit interrupts until runWorkerthis.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */publicvoidrun(){runWorker(this);}// Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state.protectedbooleanisHeldExclusively(){returngetState()!=0;}protectedbooleantryAcquire(intunused){if(compareAndSetState(0,1)){setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}protectedbooleantryRelease(intunused){setExclusiveOwnerThread(null);setState(0);returntrue;}// .....}
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/finalvoidrunWorker(Workerw){Threadwt=Thread.currentThread();Runnabletask=w.firstTask;w.firstTask=null;w.unlock();// allow interruptsbooleancompletedAbruptly=true;try{while(task!=null||(task=getTask())!=null){w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif((runStateAtLeast(ctl.get(),STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())wt.interrupt();try{beforeExecute(wt,task);try{task.run();afterExecute(task,null);}catch(Throwableex){afterExecute(task,ex);throwex;}}finally{task=null;w.completedTasks++;w.unlock();}}completedAbruptly=false;}finally{processWorkerExit(w,completedAbruptly);}}
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/privateRunnablegetTask(){booleantimedOut=false;// Did the last poll() time out?for(;;){intc=ctl.get();// Check if queue empty only if necessary.if(runStateAtLeast(c,SHUTDOWN)&&(runStateAtLeast(c,STOP)||workQueue.isEmpty())){decrementWorkerCount();returnnull;}intwc=workerCountOf(c);// Are workers subject to culling?booleantimed=allowCoreThreadTimeOut||wc>corePoolSize;if((wc>maximumPoolSize||(timed&&timedOut))&&(wc>1||workQueue.isEmpty())){if(compareAndDecrementWorkerCount(c))returnnull;continue;}try{Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!=null)returnr;timedOut=true;}catch(InterruptedExceptionretry){timedOut=false;}}}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/publicvoidshutdown(){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown();// hook for ScheduledThreadPoolExecutor}finally{mainLock.unlock();}tryTerminate();}
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/privatevoidinterruptIdleWorkers(booleanonlyOne){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{for(Workerw:workers){Threadt=w.thread;if(!t.isInterrupted()&&w.tryLock()){try{t.interrupt();}catch(SecurityExceptionignore){}finally{w.unlock();}}if(onlyOne)break;}}finally{mainLock.unlock();}}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/finalvoidtryTerminate(){for(;;){intc=ctl.get();if(isRunning(c)||runStateAtLeast(c,TIDYING)||(runStateLessThan(c,STOP)&&!workQueue.isEmpty()))return;if(workerCountOf(c)!=0){// Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{if(ctl.compareAndSet(c,ctlOf(TIDYING,0))){try{terminated();}finally{ctl.set(ctlOf(TERMINATED,0));termination.signalAll();container.close();}return;}}finally{mainLock.unlock();}// else retry on failed CAS}}