背景
上一篇说到LockSupport的park/unpark,这一篇就来说说Object的wait/notify方法。
Object是一切类的父类,wait方法是在Object中实现的。
public final native void wait(long timeout) throws InterruptedException;
wait
ObjectSynchronizer
Object的wait是在ObjectSynchronizer中实现的。注意:wait方法的调用需要在当前线程持有该对象的监视器的情况下使用,即当前线程竞争到了锁,才可以调用wait方法,通常与synchronized关键字一起用。
wait
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
...//省略
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());//获取ObjectMonitor
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
monitor->wait(millis, true, THREAD);//调用ObjectMonitor的wait方法
...//省略
}
结合对象头信息来看一下获取ObjectMonitor是通过inflate方法:
死循环里面获取当前monitor:
inflate
CASE1 当前对象已经有monitor
即是否已经被重量级锁标识
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
...//省略
for (;;) {
const markOop mark = object->mark() ;//获取对象的头部mark_word,如上图所示
assert (!mark->has_bias_pattern(), "invariant") ;
// The mark can be in one of the following states:
// * Inflated - just return
// * Stack-locked - coerce it to inflated
// * INFLATING - busy wait for conversion to complete
// * Neutral - aggressively inflate the object.
// * BIASED - Illegal. We should never see this
// CASE: inflated
if (mark->has_monitor()) {//当前对象已有monitor
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
其中has_monitor的实现:monitor_value=2,对照图中,是***重量级锁***的位数,就是判断对象是否被重量级锁标识。
bool has_monitor() const {
return ((value() & monitor_value) != 0);
}
获取monitor:对照图来看,就是取重量级锁前30位,即获取ObjectMonitor的指针。
ObjectMonitor* monitor() const {
assert(has_monitor(), "check");
// Use xor instead of &~ to provide one extra tag-bit check.
return (ObjectMonitor*) (value() ^ monitor_value);
}
CASE2 当前锁正在膨胀
重新获取mark_word,进入下一次循环
// CASE: inflation in progress - inflating over a stack-lock.
// Some other thread is converting from stack-locked to inflated.
// Only that thread can complete inflation -- other threads must wait.
// The INFLATING value is transient.
// Currently, we spin/yield/park and poll the markword, waiting for inflation to finish.
// We could always eliminate polling by parking the thread on some auxiliary list.
if (mark == markOopDesc::INFLATING()) {//如果当前锁正在膨胀,(轻量级锁膨胀为重量级锁),重新获取mark_word,进入下一次循环
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
怎么判断膨胀:为0就代表膨胀
这时候锁标志位00表示为轻量级锁标记,前面指向栈中锁记录的指针为NULL。
static markOop INFLATING() { return (markOop) 0; }
CASE3 被轻量级锁标记
被轻量级锁标记,则从当前线程中分配ObjectMonitor,尝试膨胀
// CASE: stack-locked
// Could be stack-locked either by this thread or by some other thread.
//
// Note that we allocate the objectmonitor speculatively, _before_ attempting
// to install INFLATING into the mark word. We originally installed INFLATING,
// allocated the objectmonitor, and then finally STed the address of the
// objectmonitor into the mark. This was correct, but artificially lengthened
// the interval in which INFLATED appeared in the mark, thus increasing
// the odds of inflation contention.
//
// We now use per-thread private objectmonitor free lists.
// These list are reprovisioned from the global free list outside the
// critical INFLATING...ST interval. A thread can transfer
// multiple objectmonitors en-mass from the global free list to its local free list.
// This reduces coherency traffic and lock contention on the global free list.
// Using such local free lists, it doesn't matter if the omAlloc() call appears
// before or after the CAS(INFLATING) operation.
// See the comments in omAlloc().
if (mark->has_locker()) {//被轻量级锁标记
ObjectMonitor * m = omAlloc (Self) ;//从当前线程中分配ObjectMonitor
// Optimistically prepare the objectmonitor - anticipate successful CAS
// We do this before the CAS in order to minimize the length of time
// in which INFLATING appears in the mark.
m->Recycle();//置回初始状态
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
//cas方式尝试膨胀,如果失败,进入下一次循环
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ; // Interference -- just retry
}
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
// Setup monitor fields to proper values -- prepare the monitor
m->set_header(dmw) ;
m->set_owner(mark->locker());
m->set_object(object);
// TODO-FIXME: assert BasicLock->dhw != 0.
// Must preserve store ordering. The monitor state must
// be stable at the time of publishing the monitor address.
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
object->release_set_mark(markOopDesc::encode(m));//将monitor地址set到对象头部
...//省略
return m ;
}
}
分配ObjectMonitor
然后我们看下怎么分配ObjectMonitor。
ObjectMonitor * ATTR ObjectSynchronizer::omAlloc (Thread * Self) {
...//省略
for (;;) {
ObjectMonitor * m ;
// 1: try to allocate from the thread's local omFreeList.
// Threads will attempt to allocate first from their local list, then
// from the global list, and only after those attempts fail will the thread
// attempt to instantiate new monitors. Thread-local free lists take
// heat off the ListLock and improve allocation latency, as well as reducing
// coherency traffic on the shared global list.
//1 . 如果当前线程中omFreeList不为空,分配当前线程omFreeList的头结点
m = Self->omFreeList ;
if (m != NULL) {
Self->omFreeList = m->FreeNext ;
Self->omFreeCount -- ;
// CONSIDER: set m->FreeNext = BAD -- diagnostic hygiene
guarantee (m->object() == NULL, "invariant") ;
if (MonitorInUseLists) {
m->FreeNext = Self->omInUseList;
Self->omInUseList = m;
Self->omInUseCount ++;
// verifyInUse(Self);
} else {
m->FreeNext = NULL;
}
return m ;
}
// 2: try to allocate from the global gFreeList
// CONSIDER: use muxTry() instead of muxAcquire().
// If the muxTry() fails then drop immediately into case 3.
// If we're using thread-local free lists then try
// to reprovision the caller's free list.
// 2. 如果全局的gFreelist不为空,从gFreelist中分配
if (gFreeList != NULL) {
// Reprovision the thread's omFreeList.
// Use bulk transfers to reduce the allocation rate and heat
// on various locks.
Thread::muxAcquire (&ListLock, "omAlloc") ;
for (int i = Self->omFreeProvision; --i >= 0 && gFreeList != NULL; ) {
MonitorFreeCount --;
ObjectMonitor * take = gFreeList ;
gFreeList = take->FreeNext ;
guarantee (take->object() == NULL, "invariant") ;
guarantee (!take->is_busy(), "invariant") ;
take->Recycle() ;
omRelease (Self, take, false) ;
}
Thread::muxRelease (&ListLock) ;
Self->omFreeProvision += 1 + (Self->omFreeProvision/2) ;
if (Self->omFreeProvision > MAXPRIVATE ) Self->omFreeProvision = MAXPRIVATE ;
TEVENT (omFirst - reprovision) ;
const int mx = MonitorBound ;
if (mx > 0 && (MonitorPopulation-MonitorFreeCount) > mx) {
// We can't safely induce a STW safepoint from omAlloc() as our thread
// state may not be appropriate for such activities and callers may hold
// naked oops, so instead we defer the action.
InduceScavenge (Self, "omAlloc") ;
}
continue;
}
// 3: allocate a block of new ObjectMonitors
// Both the local and global free lists are empty -- resort to malloc().
// In the current implementation objectMonitors are TSM - immortal.
//3. new一个128大小的ObjectMonitor数组,第0个元素保留, 建立freelist链表,并与gFreeList连通,gFreeList指向新建立的链表.进入下一次循环
assert (_BLOCKSIZE > 1, "invariant") ;
ObjectMonitor * temp = new ObjectMonitor[_BLOCKSIZE];
// NOTE: (almost) no way to recover if allocation failed.
// We might be able to induce a STW safepoint and scavenge enough
// objectMonitors to permit progress.
if (temp == NULL) {
vm_exit_out_of_memory (sizeof (ObjectMonitor[_BLOCKSIZE]), OOM_MALLOC_ERROR,
"Allocate ObjectMonitors");
}
...//省略
}
}
ObjectMonitor
上一步获取到ObjectMonitor之后,最终是调用该类的wait方法。
在HotSpot虚拟机中,ObjectMonitor用于实现monitor。
Object的wait/notify方法最终是通过hotspot的ObjectMoniter来实现的,具体参考hotspot/src/share/vm/runtime/objectMonitor.cpp:
每个线程有两个ObjectMonitor对象列表,ObjectMonitor* omFreeList;
和ObjectMonitor* omInUseList;
。用于await的时候分配ObjectMonitor对象的。就是上一步的omAlloc方法。
ObjectMonitor对象中有两个队列_WaitSet 和 _EntryList,用来保存ObjectWaiter对象列表,_owner 指向获得ObjectMonitor对象的线程。
- _WaitSet:调用wait的线程会进入_WaitSet而其他线程调用notify之后,本线程就会进入_EntryList
- _EntryList:处于等待锁block状态的线程会进入_EntryList
wait
我们具体来看一下wait方法
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
DeferredInitialize () ;
// Throw IMSX or IEX.
CHECK_OWNER();
EventJavaMonitorWait event;
// check for a pending interrupt
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
// Note: 'false' parameter is passed here because the
// wait was not timed out due to thread interrupt.
JvmtiExport::post_monitor_waited(jt, this, false);
}
if (event.should_commit()) {
post_monitor_wait_event(&event, 0, millis, false);
}
TEVENT (Wait - Throw IEX) ;
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);//设置当前正在waiting的监视器
// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
ObjectWaiter node(Self);//将当前线程封装成一个ObjectWaiter
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;//添加到ObjectMonitor的等待队列中_Waitset
Thread::SpinRelease (&_WaitSetLock) ;
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
intptr_t save = _recursions; // record the old recursion count
_waiters++; // increment the number of waiters
_recursions = 0; // set the recursion level to be 1
//退出对象监视器,即将ObjectMonitor中的_owner置为NULL
exit (true, Self) ; // exit the monitor
guarantee (_owner != Self, "invariant") ;
if (node._notified != 0 && _succ == Self) {
node._event->unpark();
}
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
if (node._notified == 0) {//如果此时没有被唤醒
if (millis <= 0) {
Self->_ParkEvent->park () ;//挂起当前线程
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
...//省略
}
addWaiter方法
就是把当前线程包装成的ObjectWaiter添加到_WaitSet中:
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev == NULL, "node already in list");
assert(node->_next == NULL, "node already in list");
// put node at end of queue (circular doubly linked list)
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
exit方法
exit方法释放当前的ObjectMonitor对象,这样其它竞争线程就可以获取该ObjectMonitor对象:
- 如果是偏向锁,则偏向锁减一后返回
- 根据QMode的不同,将ObjectWaiter从_cxq或者_EntryList中取出后唤醒;
- 线程唤醒后会
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
if (THREAD != _owner) {
assert (_recursions == 0, "invariant") ;
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
// NOTE: we need to handle unbalanced monitor enter/exit
// in native code by throwing an exception.
// TODO: Throw an IllegalMonitorStateException ?
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
//偏向锁判断,偏向次数减一后直接返回
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
// Invariant: after setting Responsible=null an thread must execute
// a MEMBAR or other serializing instruction before fetching EntryList|cxq.
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
#if INCLUDE_TRACE
// get the owner's thread id for the MonitorEnter event
// if it is enabled and the thread isn't suspended
if (not_suspended && Tracing::is_event_enabled(TraceJavaMonitorEnterEvent)) {
_previous_owner_tid = SharedRuntime::get_java_tid(Self);
}
#endif
for (;;) {
assert (THREAD == _owner, "invariant") ;
if (Knob_ExitPolicy == 0) {
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ; // See if we need to wake a successor
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
TEVENT (Inflated exit - complex egress) ;
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
TEVENT (Exit - Reacquired) ;
} else {
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ;
// Ratify the previously observed values.
if (_cxq == NULL || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
TEVENT (Inflated exit - reacquired failed) ;
} else {
TEVENT (Inflated exit - complex egress) ;
}
}
guarantee (_owner == THREAD, "invariant") ;
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
//QMode = 2,并且_cxq非空:取_cxq队列排头位置的ObjectWaiter对象,调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,此处会立即返回,后面的代码不会执行了;
if (QMode == 2 && _cxq != NULL) {
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
ExitEpilog (Self, w) ;
return ;
}
//QMode = 3,并且_cxq非空:把_cxq队列首元素放入_EntryList的尾部;
if (QMode == 3 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Append the RATs to the EntryList
// TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
// Fall thru into code that tries to wake a successor from EntryList
}
//QMode = 4,并且_cxq非空:把_cxq队列首元素放入_EntryList的头部;
if (QMode == 4 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Prepend the RATs to the EntryList
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
// Fall thru into code that tries to wake a successor from EntryList
}
//如果_EntryList的首元素非空,就取出来调用ExitEpilog方法(唤醒ObjectWaiter对象的线程然后返回)
w = _EntryList ;
if (w != NULL) {
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;//ExitEpilog方法唤醒ObjectWaiter对象的线程然后返回
return ;
}
//如果_EntryList的首元素为空,就取_cxq的首元素,放入_EntryList,然后再从_EntryList中取出来执行ExitEpilog方法,然后立即返回;
w = _cxq ;
if (w == NULL) continue ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
TEVENT (Inflated exit - drain cxq into EntryList) ;
assert (w != NULL , "invariant") ;
assert (_EntryList == NULL , "invariant") ;
if (QMode == 1) {
// QMode == 1 : drain cxq to EntryList, reversing order
// We also reverse the order of the list.
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
//QMode = 0,不做什么,继续往下看;
// QMode == 0 or QMode == 2
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
// In 1-0 mode we need: ST EntryList; MEMBAR #storestore; ST _owner = NULL
// The MEMBAR is satisfied by the release_store() operation in ExitEpilog().
// See if we can abdicate to a spinner instead of waking a thread.
// A primary goal of the implementation is to reduce the
// context-switch rate.
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
}
ObjectWaiter
ObjectWaiter是双向链表的节点,保存了当前thread以及当前的状态TState等数据。每个等待锁的线程都被封装为一个ObjectWaiter对象。
注解中很重要的一条:ObjectWaiter对象存在于WaitSet、EntryList、cxq等集合中,或者正在这些集合中移动。
class ObjectWaiter : public StackObj {
public:
enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;
enum Sorted { PREPEND, APPEND, SORTED } ;
ObjectWaiter * volatile _next;
ObjectWaiter * volatile _prev;
Thread* _thread;
jlong _notifier_tid;
ParkEvent * _event;
volatile int _notified ;
volatile TStates TState ;
Sorted _Sorted ; // List placement disposition
bool _active ; // Contention monitoring is enabled
public:
ObjectWaiter(Thread* thread);
void wait_reenter_begin(ObjectMonitor *mon);
void wait_reenter_end(ObjectMonitor *mon);
};
PlatformEvent
LockSupport也是继承了PlatformEvent,但是重写了park/unpark。
park
park这里调用的是PlatformEvent的park方法,
//通过通过UNIX(对应BSD版本)线程库函数pthread_cond_wait和pthread_cond_signal实现的。
void os::PlatformEvent::park() { // AKA "down()"
// Invariant: Only the thread associated with the Event/PlatformEvent
// may call park().
// TODO: assert that _Assoc != NULL or _Assoc == Self
int v ;
//_Event是个int变量,如果CAS更新成功,即成功将_Event减1,则退出循环
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
//v=_Event,v=0表示无许可,则需要堵塞等待获得许可;
if (v == 0) {
// Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
//等待许可,调用pthread_cond_wait进行等待
while (_Event < 0) {
//等待signal方法唤醒,唤醒之后需要重新获取_mutex锁,方法才能返回
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
//获取许可之后,设置可用许可数为0;由此可见许可数最大为1
_Event = 0 ;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other.
OrderAccess::fence();
}
guarantee (_Event >= 0, "invariant") ;
}
unpark方法
void os::PlatformEvent::unpark() {
//使用原子方法xchg将1放入寄存器,与_Event所指的内容交换,即_Event=1,然后返回_Event原先的值,
//如果返回值大于等于0,表示有许可有用,方法直接返回;
// Transitions for _Event:
// 0 :=> 1
// 1 :=> 1
// -1 :=> either 0 or 1; must signal target thread
// That is, we can safely transition _Event from -1 to either
// 0 or 1. Forcing 1 is slightly more efficient for back-to-back
// unpark() calls.
// See also: "Semaphores in Plan 9" by Mullender & Cox
//
// Note: Forcing a transition from "-1" to "1" on an unpark() means
// that it will take two back-to-back park() calls for the owning
// thread to block. This has the benefit of forcing a spurious return
// from the first park() call after an unpark() call which will help
// shake out uses of park() and unpark() without condition variables.
if (Atomic::xchg(1, &_Event) >= 0) return;
// Wait for the thread associated with the event to vacate
//如果原来的_Event小于0,说明有park方法进入了pthread_cond_wait堵塞
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
int AnyWaiters = _nParked;
assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
//如果WorkAroundNPTLTimedWaitHang=true,会先调用signal,然后释放锁;如果WorkAroundNPTLTimedWaitHang=false,会先释放锁,然后调用signal;
//NPTL存在瑕疵,当pthread_cond_timedwait() 方法时间参数为过去时间,
//会导致_cond变量被破坏或者线程被hang住;
if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
AnyWaiters = 0;
//调用signal唤醒pthread_cond_wait调用线程,不判断方法执行结果
pthread_cond_signal(_cond);
}
//然后释放_mutex锁
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
if (AnyWaiters != 0) {
// //调用signal唤醒pthread_cond_wait调用线程,并判断方法执行结果
status = pthread_cond_signal(_cond);
assert_status(status == 0, status, "cond_signal");
}
// Note that we signal() _after dropping the lock for "immortal" Events.
// This is safe and avoids a common class of futile wakeups. In rare
// circumstances this can cause a thread to return prematurely from
// cond_{timed}wait() but the spurious wakeup is benign and the victim will
// simply re-test the condition and re-park itself.
}
总结
综合来讲,Object的wait方法主要做了这几件事情:
- 能进入wait肯定是获取了锁。wait依赖于对象ObjectMonitor的实现,那么首先要获得ObjectMonitor。
- 如果对象头里面有,则获取。否则尝试从当前线程分配。首先从omFreelist链表中分配,如果没有则从gFreelist分配,再没有就new一个128大小的ObjectMonitor数组,放入到当前线程的omFreelist,以及全局的gFreelist。
- 获取后进入ObjectMonitor的wait方法。该方法将当前线程包装为一个ObjectWaiter,插入到ObjectMonitor的等待队列中_Waitset中,然后退出对象监视器,挂起线程。
notify
notify方法
ObjectMonitor的notify方法
notify方法把notify标志位设置为1,然后进入到enter(synchronized)队列中,准备重新获取锁:
- 从_WaitSet中取出第一个;所有wait的线程都被包装为ObjectMoniter对象放入_WaitSet。
- 根据Policy的不同,将这个线程放入_EntryList或者_cxq队列中的起始或末尾位置;
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
//当前没有等待节点,直接返回
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
//获取等待节点
ObjectWaiter * iterator = DequeueWaiter() ;//选择头结点。所有wait的线程都被包装为ObjectMoniter对象放入_WaitSet。
//根据不同的策略,将取出来的ObjectWaiter节点,加入到_EntryList或则通过Atomic::cmpxchg_ptr指令进行自旋操作cxq
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
iterator->_notified = 1 ;//设置notify为1
Thread * Self = THREAD;
iterator->_notifier_tid = Self->osthread()->thread_id();
ObjectWaiter * List = _EntryList ;//准备进入同步代码块的队列
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
//Policy == 0:放入_EntryList队列的排头位置
if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else//Policy == 1:放入_EntryList队列的末尾位置;
if (Policy == 1) { // append to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
// CONSIDER: finding the tail currently requires a linear-time walk of
// the EntryList. We can make tail access constant-time by converting to
// a CDLL instead of using our current DLL.
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
// prepend to cxq 默认为2,将唤醒的ObjectWaiter放入到cxq队列最前面
//Policy == 2:_EntryList队列为空就放入_EntryList,否则放入_cxq队列的排头位置;
if (Policy == 2) { // prepend to cxq
// prepend to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else//Policy == 3:放入_cxq队列中,末尾位置;更新_cxq变量的值的时候,同样要通过CAS注意并发问题;
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {//设置线程状态为正在重新获取锁(进入同步代码块)
iterator->wait_reenter_begin(this);
}
// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}
Thread::SpinRelease (&_WaitSetLock) ;
if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc() ;
}
}
DequeueWaiter
获取第一个waiter。
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev != NULL, "node already removed from list");
assert(node->_next != NULL, "node already removed from list");
// when the waiter has woken up because of interrupt,
// timeout or other spurious wake-up, dequeue the
// waiter from waiting list
ObjectWaiter* next = node->_next;
if (next == node) {
assert(node->_prev == node, "invariant check");
_WaitSet = NULL;
} else {
ObjectWaiter* prev = node->_prev;
assert(prev->_next == node, "invariant check");
assert(next->_prev == node, "invariant check");
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
_WaitSet = next;
}
}
node->_next = NULL;
node->_prev = NULL;
}
https://blog.csdn.net/boling_cavalry/article/details/77793224
https://blog.csdn.net/boling_cavalry/article/details/77897108
https://blog.csdn.net/boling_cavalry/article/details/77995069
https://blog.csdn.net/chengzhang1989/article/details/78703818