staticfinalintWAITING=1;// must be 1 staticfinalintCANCELLED=0x80000000;// must be negative staticfinalintCOND=2;// in a condition wait/**
* Head of the wait queue, lazily initialized.
*/privatetransientvolatileNodehead;/**
* Tail of the wait queue. After initialization, modified only via casTail. */privatetransientvolatileNodetail;/**
* The synchronization state. */privatevolatileintstate;
abstractstaticclassNode{volatileNodeprev;// initially attached via casTail volatileNodenext;// visibly nonnull when signallable Threadwaiter;// visibly nonnull when enqueued volatileintstatus;// written by owner, atomic bit ops by others // methods for atomic operations finalbooleancasPrev(Nodec,Nodev){// for cleanQueue returnU.weakCompareAndSetReference(this,PREV,c,v);}finalbooleancasNext(Nodec,Nodev){// for cleanQueue returnU.weakCompareAndSetReference(this,NEXT,c,v);}finalintgetAndUnsetStatus(intv){// for signalling returnU.getAndBitwiseAndInt(this,STATUS,~v);}finalvoidsetPrevRelaxed(Nodep){// for off-queue assignment U.putReference(this,PREV,p);}finalvoidsetStatusRelaxed(ints){// for off-queue assignment U.putInt(this,STATUS,s);}finalvoidclearStatus(){// for reducing unneeded signals U.putIntOpaque(this,STATUS,0);}privatestaticfinallongSTATUS=U.objectFieldOffset(Node.class,"status");privatestaticfinallongNEXT=U.objectFieldOffset(Node.class,"next");privatestaticfinallongPREV=U.objectFieldOffset(Node.class,"prev");}
BigMap
Sync#lock()
1
2
3
4
finalvoidlock(){if(!initialTryLock())acquire(1);}
[Fair|Unfair]Sync#initialTryLock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Acquires only if reentrant or queue is empty. */finalbooleaninitialTryLock(){Threadcurrent=Thread.currentThread();intc=getState();if(c==0){if(!hasQueuedThreads()&&compareAndSetState(0,1)){setExclusiveOwnerThread(current);returntrue;}}elseif(getExclusiveOwnerThread()==current){if(++c<0)// overflow thrownewError("Maximum lock count exceeded");setState(c);returntrue;}returnfalse;}
/**
* Queries whether any threads are waiting to acquire. Note that * because cancellations due to interrupts and timeouts may occur * at any time, a {@code true} return does not guarantee that any
* other thread will ever acquire. * * @return {@code true} if there may be other threads waiting to acquire
*/publicfinalbooleanhasQueuedThreads(){for(Nodep=tail,h=head;p!=h&&p!=null;p=p.prev)if(p.status>=0)returntrue;returnfalse;}
AQS#acquire(int arg)
1
2
3
4
5
6
7
8
9
10
11
12
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
* * @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like. */publicfinalvoidacquire(intarg){if(!tryAcquire(arg))acquire(null,arg,false,false,false,0L);}
该方法调用AQS中待实现类实现的tryAcquire()方法,以自定义的方式尝试获取一次锁,若获取失败,则调用AQS#acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)方法
FairSync#tryAcquire(int acquires)
1
2
3
4
5
6
7
8
9
10
/**
* Acquires only if thread is first waiter or empty */protectedfinalbooleantryAcquire(intacquires){if(getState()==0&&!hasQueuedPredecessors()&&compareAndSetState(0,acquires)){setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}
/**
* Queries whether any threads have been waiting to acquire longer * than the current thread. * * <p>An invocation of this method is equivalent to (but may be
* more efficient than): * <pre> {@code
* getFirstQueuedThread() != Thread.currentThread()
* && hasQueuedThreads()}</pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current * thread. Likewise, it is possible for another thread to win a * race to enqueue after this method has returned {@code false},
* due to the queue being empty. * * <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this: * * <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) { * // A reentrant acquire; increment hold count * return true; * } else if (hasQueuedPredecessors()) { * return false; * } else { * // try to acquire normally * } * }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty * @since 1.7
*/publicfinalbooleanhasQueuedPredecessors(){Threadfirst=null;Nodeh,s;if((h=head)!=null&&((s=h.next)==null||(first=s.waiter)==null||s.prev==null))first=getFirstQueuedThread();// retry via getFirstQueuedThread returnfirst!=null&&first!=Thread.currentThread();}
[!NOTE] GPT解释
Detailed Explanation of hasQueuedPredecessors():
This method provides a way to determine if the calling thread should wait in line or attempt to acquire the lock directly, based on whether there are other threads ahead of it in the queue.
Checking the Queue:
The method starts by initializing Thread first to null and declaring Node h and Node s.
It then checks if the head of the queue (h) is not null. If the head exists, it proceeds to check the next node (s = h.next).
Evaluating Conditions:
If the head’s next node (s) is null, or s.waiter (the thread in the s node) is null, or s.prev (the link back to the head) is null, it implies a possibility of inconsistency in the queue or that the queue might be transitioning states (e.g., nodes being added or removed).
In such cases, it uses getFirstQueuedThread() to reliably get the first thread in the queue and reassess the situation. This call is more robust but potentially less efficient, hence used as a fallback.
3.Return Logic:
Finally, the method returns true if first (the first queued thread determined either directly or through the fallback) is not null and is not the current thread. This means there is at least one thread that has been waiting longer than the current thread.
If first is null or it is the current thread, it returns false, indicating either the queue is empty or the current thread is at the head of the queue.
AQS#acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)
finalintacquire(Nodenode,intarg,booleanshared,booleaninterruptible,booleantimed,longtime){Threadcurrent=Thread.currentThread();bytespins=0,postSpins=0;// retries upon unpark of first threadbooleaninterrupted=false,first=false;Nodepred=null;// predecessor of node when enqueued/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if queue is not initialized, do so by attaching new header node
* resort to spinwait on OOME trying to create node
* else if node not yet created, create it
* resort to spinwait on OOME trying to create node
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/for(;;){if(!first&&(pred=(node==null)?null:node.prev)!=null&&!(first=(head==pred))){if(pred.status<0){cleanQueue();// predecessor cancelledcontinue;}elseif(pred.prev==null){Thread.onSpinWait();// ensure serializationcontinue;}}if(first||pred==null){booleanacquired;try{if(shared)acquired=(tryAcquireShared(arg)>=0);elseacquired=tryAcquire(arg);}catch(Throwableex){cancelAcquire(node,interrupted,false);throwex;}if(acquired){if(first){node.prev=null;head=node;pred.next=null;node.waiter=null;if(shared)signalNextIfShared(node);if(interrupted)current.interrupt();}return1;}}Nodet;if((t=tail)==null){// initialize queueif(tryInitializeHead()==null)returnacquireOnOOME(shared,arg);}elseif(node==null){// allocate; retry before enqueuetry{node=(shared)?newSharedNode():newExclusiveNode();}catch(OutOfMemoryErroroome){returnacquireOnOOME(shared,arg);}}elseif(pred==null){// try to enqueuenode.waiter=current;node.setPrevRelaxed(t);// avoid unnecessary fenceif(!casTail(t,node))node.setPrevRelaxed(null);// back outelset.next=node;}elseif(first&&spins!=0){--spins;// reduce unfairness on rewaitsThread.onSpinWait();}elseif(node.status==0){node.status=WAITING;// enable signal and recheck}else{longnanos;spins=postSpins=(byte)((postSpins<<1)|1);if(!timed)LockSupport.park(this);elseif((nanos=time-System.nanoTime())>0L)LockSupport.parkNanos(this,nanos);elsebreak;node.clearStatus();if((interrupted|=Thread.interrupted())&&interruptible)break;}}returncancelAcquire(node,interrupted,interruptible);}
[!NOTE] GPT对于Status的解释
Understanding Status Management
Role of WAITING Status:
> + When a node (representing a thread in a queue) is set to WAITING, it typically indicates that the thread is actively waiting and should remain parked until explicitly signalled. The WAITING status is used to manage thread wake-up correctly and to avoid lost wake-ups.
Role of Status Zero (0):
> + A status of 0 generally indicates that the node is not in any particular waiting or signal state. This can mean several things depending on the context:
> + The thread is not currently waiting.
> + The thread has been woken up and is about to retry acquiring the lock.
> + The thread has completed its operation and is being cleaned up.
Acquiring the Lock with Status Zero:
> + Setting the status to zero does not by itself grant the lock to the node. Instead, it signifies that the node is in a state eligible to attempt to acquire the lock. When a thread (node) attempts to acquire the lock, having a status of zero implies that it is neither parked nor scheduled to be parked. This status allows it to enter the lock acquisition logic without being delayed by unnecessary waits.
Transition from WAITING to Zero:
The transition from WAITING to 0 typically occurs when:
The node is signalled (either by LockSupport.unpark() or similar mechanisms) that it should wake up and retry acquiring the lock.
The thread successfully acquires the lock and subsequently clears its status to indicate it is no longer waiting.
The thread is aborting its wait, possibly due to a timeout or an interrupt, and needs to clear its status as part of cleanup operations.
Practical Implication
In a Blocking Scenario (Park):
While the node is WAITING, the thread is typically parked (LockSupport.park()) and will remain so until it is unparked or otherwise signalled. The WAITING status helps ensure that the node remains correctly identified as being in need of a wake-up signal.
In a Lock Acquisition Scenario:
A node may attempt to acquire the lock regardless of its initial status (either 0 or transitioning from WAITING). If the lock acquisition is successful, any status related to waiting is irrelevant post-acquisition; thus, clearing the status to 0 is often an administrative or cleanup action, preparing the node for potential reuse or ensuring it does not remain marked as waiting unnecessarily.
/**
* Possibly repeatedly traverses from tail, unsplicing cancelled
* nodes until none are found. Unparks nodes that may have been
* relinked to be next eligible acquirer.
*/privatevoidcleanQueue(){for(;;){// restart pointfor(Nodeq=tail,s=null,p,n;;){// (p, q, s) triplesif(q==null||(p=q.prev)==null)return;// end of listif(s==null?tail!=q:(s.prev!=q||s.status<0))break;// inconsistentif(q.status<0){// cancelledif((s==null?casTail(q,p):s.casPrev(q,p))&&q.prev==p){p.casNext(q,s);// OK if failsif(p.prev==null)signalNext(p);}break;}if((n=p.next)!=q){// help finishif(n!=null&&q.prev==p){p.casNext(n,q);if(p.prev==null)signalNext(p);}break;}s=q;q=q.prev;}}}
protectedfinalbooleantryRelease(intreleases){intc=getState()-releases;if(getExclusiveOwnerThread()!=Thread.currentThread())thrownewIllegalMonitorStateException();booleanfree=(c==0);if(free)setExclusiveOwnerThread(null);setState(c);returnfree;}/**
* Wakes up the successor of given node, if one exists, and unsets its
* WAITING status to avoid park race. This may fail to wake up an
* eligible thread when one or more have been cancelled, but
* cancelAcquire ensures liveness.
*/privatestaticvoidsignalNext(Nodeh){Nodes;if(h!=null&&(s=h.next)!=null&&s.status!=0){s.getAndUnsetStatus(WAITING);LockSupport.unpark(s.waiter);}}
staticfinalclassConditionNodeextendsNodeimplementsForkJoinPool.ManagedBlocker{ConditionNodenextWaiter;// link to next waiting node// ......}/**
* Condition implementation for a {@link AbstractQueuedSynchronizer}
* serving as the basis of a {@link Lock} implementation.
*
* <p>Method documentation for this class describes mechanics,
* not behavioral specifications from the point of view of Lock
* and Condition users. Exported versions of this class will in
* general need to be accompanied by documentation describing
* condition semantics that rely on those of the associated
* {@code AbstractQueuedSynchronizer}.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/publicclassConditionObjectimplementsCondition,java.io.Serializable{privatestaticfinallongserialVersionUID=1173984872572414699L;/** First node of condition queue. */privatetransientConditionNodefirstWaiter;/** Last node of condition queue. */privatetransientConditionNodelastWaiter;// ......}
/**
* Implements interruptible condition wait.
* <ol>
* <li>If current thread is interrupted, throw InterruptedException.
* <li>Save lock state returned by {@link #getState}.
* <li>Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li>Block until signalled or interrupted.
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/publicfinalvoidawait()throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();ConditionNodenode=newConditionNode();if(node==null)return;intsavedState=enableWait(node);LockSupport.setCurrentBlocker(this);// for back-compatibilitybooleaninterrupted=false,cancelled=false,rejected=false;while(!canReacquire(node)){if(interrupted|=Thread.interrupted()){if(cancelled=(node.getAndUnsetStatus(COND)&COND)!=0)break;// else interrupted after signal}elseif((node.status&COND)!=0){try{if(rejected)node.block();elseForkJoinPool.managedBlock(node);}catch(RejectedExecutionExceptionex){rejected=true;}catch(InterruptedExceptionie){interrupted=true;}}elseThread.onSpinWait();// awoke while enqueuing}LockSupport.setCurrentBlocker(null);node.clearStatus();acquire(node,savedState,false,false,false,0L);if(interrupted){if(cancelled){unlinkCancelledWaiters(node);thrownewInterruptedException();}Thread.currentThread().interrupt();}}
/**
* Adds node to condition list and releases lock.
*
* @param node the node
* @return savedState to reacquire after wait
*/privateintenableWait(ConditionNodenode){if(isHeldExclusively()){node.waiter=Thread.currentThread();node.setStatusRelaxed(COND|WAITING);ConditionNodelast=lastWaiter;if(last==null)firstWaiter=node;elselast.nextWaiter=node;lastWaiter=node;intsavedState=getState();if(release(savedState))returnsavedState;}node.status=CANCELLED;// lock not held or inconsistentthrownewIllegalMonitorStateException();}
/**
* Returns true if a node that was initially placed on a condition
* queue is now ready to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/privatebooleancanReacquire(ConditionNodenode){// check links, not status to avoid enqueue raceNodep;// traverse unless known to be bidirectionally linkedreturnnode!=null&&(p=node.prev)!=null&&(p.next==node||isEnqueued(node));}
/**
* Removes and transfers one or all waiters to sync queue.
*/privatevoiddoSignal(ConditionNodefirst,booleanall){while(first!=null){ConditionNodenext=first.nextWaiter;if((firstWaiter=next)==null)lastWaiter=null;if((first.getAndUnsetStatus(COND)&COND)!=0){enqueue(first);if(!all)break;}first=next;}}
/**
* Enqueues the node unless null. (Currently used only for
* ConditionNodes; other cases are interleaved with acquires.)
*/finalvoidenqueue(ConditionNodenode){if(node!=null){booleanunpark=false;for(Nodet;;){if((t=tail)==null&&(t=tryInitializeHead())==null){unpark=true;// wake up to spin on OOMEbreak;}node.setPrevRelaxed(t);// avoid unnecessary fenceif(casTail(t,node)){t.next=node;if(t.status<0)// wake up to clean linkunpark=true;break;}}if(unpark)LockSupport.unpark(node.waiter);}}
Why Wake Up on Cancelled Status?: If the previous tail is cancelled, it might be necessary to wake up or signal other threads because the presence of a cancelled node at the tail can disrupt normal lock acquisition processes. The cancelled node may not be properly participating in the queue dynamics (like signaling next nodes), so handling or removing it quickly is crucial.
/** Main lock guarding all access */finalReentrantLocklock;/** Condition for waiting takes */@SuppressWarnings("serial")// Classes implementing Condition may be serializable. privatefinalConditionnotEmpty;/** Condition for waiting puts */@SuppressWarnings("serial")// Classes implementing Condition may be serializable. privatefinalConditionnotFull;/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity<=0)thrownewIllegalArgumentException();this.items=newObject[capacity];lock=newReentrantLock(fair);notEmpty=lock.newCondition();notFull=lock.newCondition();}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/privatevoidenqueue(Ee){// assert lock.isHeldByCurrentThread();// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;finalObject[]items=this.items;items[putIndex]=e;if(++putIndex==items.length)putIndex=0;count++;notEmpty.signal();}/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/privateEdequeue(){// assert lock.isHeldByCurrentThread();// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;finalObject[]items=this.items;@SuppressWarnings("unchecked")Ee=(E)items[takeIndex];items[takeIndex]=null;if(++takeIndex==items.length)takeIndex=0;count--;if(itrs!=null)itrs.elementDequeued();notFull.signal();returne;}
/** Head of queue */transientvolatileQNodehead;/** Tail of queue */transientvolatileQNodetail;/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/transientvolatileQNodecleanMe;TransferQueue(){QNodeh=newQNode(null,false);// initialize to dummy node.head=h;tail=h;}staticfinalclassQNodeimplementsForkJoinPool.ManagedBlocker{volatileQNodenext;// next node in queue volatileObjectitem;// CAS'ed to or from null volatileThreadwaiter;// to control park/unpark finalbooleanisData;QNode(Objectitem,booleanisData){this.item=item;this.isData=isData;}//......}
/**
* Puts or takes an item.
*/@SuppressWarnings("unchecked")Etransfer(Ee,booleantimed,longnanos){/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/QNodes=null;// constructed/reused as neededbooleanisData=(e!=null);for(;;){QNodet=tail,h=head,m,tn;// m is node to fulfillif(t==null||h==null);// inconsistentelseif(h==t||t.isData==isData){// empty or same-modeif(t!=tail)// inconsistent;elseif((tn=t.next)!=null)// lagging tailadvanceTail(t,tn);elseif(timed&&nanos<=0L)// can't waitreturnnull;elseif(t.casNext(null,(s!=null)?s:(s=newQNode(e,isData)))){advanceTail(t,s);longdeadline=timed?System.nanoTime()+nanos:0L;Threadw=Thread.currentThread();intstat=-1;// same idea as TransferStackObjectitem;while((item=s.item)==e){if((timed&&(nanos=deadline-System.nanoTime())<=0)||w.isInterrupted()){if(s.tryCancel(e)){clean(t,s);returnnull;}}elseif((item=s.item)!=e){break;// recheck}elseif(stat<=0){if(t.next==s){if(stat<0&&t.isFulfilled()){stat=0;// yield once if firstThread.yield();}else{stat=1;s.waiter=w;}}}elseif(!timed){LockSupport.setCurrentBlocker(this);try{ForkJoinPool.managedBlock(s);}catch(InterruptedExceptioncannotHappen){}LockSupport.setCurrentBlocker(null);}elseif(nanos>SPIN_FOR_TIMEOUT_THRESHOLD)LockSupport.parkNanos(this,nanos);}if(stat==1)s.forgetWaiter();if(!s.isOffList()){// not already unlinkedadvanceHead(t,s);// unlink if headif(item!=null)// and forget fieldss.item=s;}return(item!=null)?(E)item:e;}}elseif((m=h.next)!=null&&t==tail&&h==head){Threadwaiter;Objectx=m.item;booleanfulfilled=((isData==(x==null))&&x!=m&&m.casItem(x,e));advanceHead(h,m);// (help) dequeueif(fulfilled){if((waiter=m.waiter)!=null)LockSupport.unpark(waiter);return(x!=null)?(E)x:e;}}}}
publicclassDelayQueue<EextendsDelayed>extendsAbstractQueue<E>implementsBlockingQueue<E>{privatefinaltransientReentrantLocklock=newReentrantLock();privatefinalPriorityQueue<E>q=newPriorityQueue<E>();/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/privateThreadleader;/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/privatefinalConditionavailable=lock.newCondition();// ......}publicinterfaceDelayedextendsComparable<Delayed>{/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/longgetDelay(TimeUnitunit);
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/publicEtake()throwsInterruptedException{finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{for(;;){Efirst=q.peek();if(first==null)available.await();else{longdelay=first.getDelay(NANOSECONDS);if(delay<=0L)returnq.poll();first=null;// don't retain ref while waitingif(leader!=null)available.await();else{ThreadthisThread=Thread.currentThread();leader=thisThread;try{available.awaitNanos(delay);}finally{if(leader==thisThread)leader=null;}}}}}finally{if(leader==null&&q.peek()!=null)available.signal();lock.unlock();}}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/intc=ctl.get();if(workerCountOf(c)<corePoolSize){if(addWorker(command,true))return;c=ctl.get();}if(isRunning(c)&&workQueue.offer(command)){intrecheck=ctl.get();if(!isRunning(recheck)&&remove(command))reject(command);elseif(workerCountOf(recheck)==0)addWorker(null,false);}elseif(!addWorker(command,false))reject(command);}
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:for(intc=ctl.get();;){// Check if queue empty only if necessary.if(runStateAtLeast(c,SHUTDOWN)&&(runStateAtLeast(c,STOP)||firstTask!=null||workQueue.isEmpty()))returnfalse;for(;;){if(workerCountOf(c)>=((core?corePoolSize:maximumPoolSize)&COUNT_MASK))returnfalse;if(compareAndIncrementWorkerCount(c))breakretry;c=ctl.get();// Re-read ctlif(runStateAtLeast(c,SHUTDOWN))continueretry;// else CAS failed due to workerCount change; retry inner loop}}booleanworkerStarted=false;booleanworkerAdded=false;Workerw=null;try{w=newWorker(firstTask);finalThreadt=w.thread;if(t!=null){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.intc=ctl.get();if(isRunning(c)||(runStateLessThan(c,STOP)&&firstTask==null)){if(t.getState()!=Thread.State.NEW)thrownewIllegalThreadStateException();workers.add(w);workerAdded=true;ints=workers.size();if(s>largestPoolSize)largestPoolSize=s;}}finally{mainLock.unlock();}if(workerAdded){container.start(t);workerStarted=true;}}}finally{if(!workerStarted)addWorkerFailed(w);}returnworkerStarted;}
privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/** Thread this worker is running in. Null if factory fails. */@SuppressWarnings("serial")// Unlikely to be serializablefinalThreadthread;/** Initial task to run. Possibly null. */@SuppressWarnings("serial")// Not statically typed as SerializableRunnablefirstTask;/** Per-thread task counter */volatilelongcompletedTasks;/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/Worker(RunnablefirstTask){setState(-1);// inhibit interrupts until runWorkerthis.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */publicvoidrun(){runWorker(this);}// Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state.protectedbooleanisHeldExclusively(){returngetState()!=0;}protectedbooleantryAcquire(intunused){if(compareAndSetState(0,1)){setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}protectedbooleantryRelease(intunused){setExclusiveOwnerThread(null);setState(0);returntrue;}// .....}
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/finalvoidrunWorker(Workerw){Threadwt=Thread.currentThread();Runnabletask=w.firstTask;w.firstTask=null;w.unlock();// allow interruptsbooleancompletedAbruptly=true;try{while(task!=null||(task=getTask())!=null){w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif((runStateAtLeast(ctl.get(),STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())wt.interrupt();try{beforeExecute(wt,task);try{task.run();afterExecute(task,null);}catch(Throwableex){afterExecute(task,ex);throwex;}}finally{task=null;w.completedTasks++;w.unlock();}}completedAbruptly=false;}finally{processWorkerExit(w,completedAbruptly);}}
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/privateRunnablegetTask(){booleantimedOut=false;// Did the last poll() time out?for(;;){intc=ctl.get();// Check if queue empty only if necessary.if(runStateAtLeast(c,SHUTDOWN)&&(runStateAtLeast(c,STOP)||workQueue.isEmpty())){decrementWorkerCount();returnnull;}intwc=workerCountOf(c);// Are workers subject to culling?booleantimed=allowCoreThreadTimeOut||wc>corePoolSize;if((wc>maximumPoolSize||(timed&&timedOut))&&(wc>1||workQueue.isEmpty())){if(compareAndDecrementWorkerCount(c))returnnull;continue;}try{Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!=null)returnr;timedOut=true;}catch(InterruptedExceptionretry){timedOut=false;}}}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/publicvoidshutdown(){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown();// hook for ScheduledThreadPoolExecutor}finally{mainLock.unlock();}tryTerminate();}
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/privatevoidinterruptIdleWorkers(booleanonlyOne){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{for(Workerw:workers){Threadt=w.thread;if(!t.isInterrupted()&&w.tryLock()){try{t.interrupt();}catch(SecurityExceptionignore){}finally{w.unlock();}}if(onlyOne)break;}}finally{mainLock.unlock();}}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/finalvoidtryTerminate(){for(;;){intc=ctl.get();if(isRunning(c)||runStateAtLeast(c,TIDYING)||(runStateLessThan(c,STOP)&&!workQueue.isEmpty()))return;if(workerCountOf(c)!=0){// Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{if(ctl.compareAndSet(c,ctlOf(TIDYING,0))){try{terminated();}finally{ctl.set(ctlOf(TERMINATED,0));termination.signalAll();container.close();}return;}}finally{mainLock.unlock();}// else retry on failed CAS}}