Object的wait/notify方法

背景

上一篇说到LockSupport的park/unpark,这一篇就来说说Object的wait/notify方法。

Object是一切类的父类,wait方法是在Object中实现的。

public final native void wait(long timeout) throws InterruptedException;

wait

ObjectSynchronizer

Object的wait是在ObjectSynchronizer中实现的。注意:wait方法的调用需要在当前线程持有该对象的监视器的情况下使用,即当前线程竞争到了锁,才可以调用wait方法,通常与synchronized关键字一起用。

wait

void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
  ...//省略
  ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());//获取ObjectMonitor
  DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
  monitor->wait(millis, true, THREAD);//调用ObjectMonitor的wait方法
  ...//省略
}

结合对象头信息来看一下获取ObjectMonitor是通过inflate方法:
20171205222133014

死循环里面获取当前monitor:

inflate

CASE1 当前对象已经有monitor

即是否已经被重量级锁标识

ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
  ...//省略
  for (;;) {
      const markOop mark = object->mark() ;//获取对象的头部mark_word,如上图所示
      assert (!mark->has_bias_pattern(), "invariant") ;

      // The mark can be in one of the following states:
      // *  Inflated     - just return
      // *  Stack-locked - coerce it to inflated
      // *  INFLATING    - busy wait for conversion to complete
      // *  Neutral      - aggressively inflate the object.
      // *  BIASED       - Illegal.  We should never see this

      // CASE: inflated
      if (mark->has_monitor()) {//当前对象已有monitor
          ObjectMonitor * inf = mark->monitor() ;
          assert (inf->header()->is_neutral(), "invariant");
          assert (inf->object() == object, "invariant") ;
          assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
          return inf ;
      }

其中has_monitor的实现:monitor_value=2,对照图中,是***重量级锁***的位数,就是判断对象是否被重量级锁标识。

bool has_monitor() const {
  return ((value() & monitor_value) != 0);
}

获取monitor:对照图来看,就是取重量级锁前30位,即获取ObjectMonitor的指针。

ObjectMonitor* monitor() const {
  assert(has_monitor(), "check");
  // Use xor instead of &~ to provide one extra tag-bit check.
  return (ObjectMonitor*) (value() ^ monitor_value);
}

CASE2 当前锁正在膨胀

重新获取mark_word,进入下一次循环

      // CASE: inflation in progress - inflating over a stack-lock.
      // Some other thread is converting from stack-locked to inflated.
      // Only that thread can complete inflation -- other threads must wait.
      // The INFLATING value is transient.
      // Currently, we spin/yield/park and poll the markword, waiting for inflation to finish.
      // We could always eliminate polling by parking the thread on some auxiliary list.
      if (mark == markOopDesc::INFLATING()) {//如果当前锁正在膨胀,(轻量级锁膨胀为重量级锁),重新获取mark_word,进入下一次循环
         TEVENT (Inflate: spin while INFLATING) ;
         ReadStableMark(object) ;
         continue ;
      }

怎么判断膨胀:为0就代表膨胀
这时候锁标志位00表示为轻量级锁标记,前面指向栈中锁记录的指针为NULL。

static markOop INFLATING() { return (markOop) 0; } 

CASE3 被轻量级锁标记

被轻量级锁标记,则从当前线程中分配ObjectMonitor,尝试膨胀

      // CASE: stack-locked
      // Could be stack-locked either by this thread or by some other thread.
      //
      // Note that we allocate the objectmonitor speculatively, _before_ attempting
      // to install INFLATING into the mark word.  We originally installed INFLATING,
      // allocated the objectmonitor, and then finally STed the address of the
      // objectmonitor into the mark.  This was correct, but artificially lengthened
      // the interval in which INFLATED appeared in the mark, thus increasing
      // the odds of inflation contention.
      //
      // We now use per-thread private objectmonitor free lists.
      // These list are reprovisioned from the global free list outside the
      // critical INFLATING...ST interval.  A thread can transfer
      // multiple objectmonitors en-mass from the global free list to its local free list.
      // This reduces coherency traffic and lock contention on the global free list.
      // Using such local free lists, it doesn't matter if the omAlloc() call appears
      // before or after the CAS(INFLATING) operation.
      // See the comments in omAlloc().

      if (mark->has_locker()) {//被轻量级锁标记
          ObjectMonitor * m = omAlloc (Self) ;//从当前线程中分配ObjectMonitor
          // Optimistically prepare the objectmonitor - anticipate successful CAS
          // We do this before the CAS in order to minimize the length of time
          // in which INFLATING appears in the mark.
          m->Recycle();//置回初始状态
          m->_Responsible  = NULL ;
          m->OwnerIsThread = 0 ;
          m->_recursions   = 0 ;
          m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;   // Consider: maintain by type/class

            //cas方式尝试膨胀,如果失败,进入下一次循环
          markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
          if (cmp != mark) {
             omRelease (Self, m, true) ;
             continue ;       // Interference -- just retry
          }

          markOop dmw = mark->displaced_mark_helper() ;
          assert (dmw->is_neutral(), "invariant") ;

          // Setup monitor fields to proper values -- prepare the monitor
          m->set_header(dmw) ;
          m->set_owner(mark->locker());
          m->set_object(object);
          // TODO-FIXME: assert BasicLock->dhw != 0.

          // Must preserve store ordering. The monitor state must
          // be stable at the time of publishing the monitor address.
          guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
          object->release_set_mark(markOopDesc::encode(m));//将monitor地址set到对象头部
      ...//省略
      return m ;
  }
}

分配ObjectMonitor

然后我们看下怎么分配ObjectMonitor。

ObjectMonitor * ATTR ObjectSynchronizer::omAlloc (Thread * Self) {
    ...//省略
    for (;;) {
        ObjectMonitor * m ;

        // 1: try to allocate from the thread's local omFreeList.
        // Threads will attempt to allocate first from their local list, then
        // from the global list, and only after those attempts fail will the thread
        // attempt to instantiate new monitors.   Thread-local free lists take
        // heat off the ListLock and improve allocation latency, as well as reducing
        // coherency traffic on the shared global list.
        //1 . 如果当前线程中omFreeList不为空,分配当前线程omFreeList的头结点
        m = Self->omFreeList ;
        if (m != NULL) {
           Self->omFreeList = m->FreeNext ;
           Self->omFreeCount -- ;
           // CONSIDER: set m->FreeNext = BAD -- diagnostic hygiene
           guarantee (m->object() == NULL, "invariant") ;
           if (MonitorInUseLists) {
             m->FreeNext = Self->omInUseList;
             Self->omInUseList = m;
             Self->omInUseCount ++;
             // verifyInUse(Self);
           } else {
             m->FreeNext = NULL;
           }
           return m ;
        }

        // 2: try to allocate from the global gFreeList
        // CONSIDER: use muxTry() instead of muxAcquire().
        // If the muxTry() fails then drop immediately into case 3.
        // If we're using thread-local free lists then try
        // to reprovision the caller's free list.
        // 2. 如果全局的gFreelist不为空,从gFreelist中分配
        if (gFreeList != NULL) {
            // Reprovision the thread's omFreeList.
            // Use bulk transfers to reduce the allocation rate and heat
            // on various locks.
            Thread::muxAcquire (&ListLock, "omAlloc") ;
            for (int i = Self->omFreeProvision; --i >= 0 && gFreeList != NULL; ) {
                MonitorFreeCount --;
                ObjectMonitor * take = gFreeList ;
                gFreeList = take->FreeNext ;
                guarantee (take->object() == NULL, "invariant") ;
                guarantee (!take->is_busy(), "invariant") ;
                take->Recycle() ;
                omRelease (Self, take, false) ;
            }
            Thread::muxRelease (&ListLock) ;
            Self->omFreeProvision += 1 + (Self->omFreeProvision/2) ;
            if (Self->omFreeProvision > MAXPRIVATE ) Self->omFreeProvision = MAXPRIVATE ;
            TEVENT (omFirst - reprovision) ;

            const int mx = MonitorBound ;
            if (mx > 0 && (MonitorPopulation-MonitorFreeCount) > mx) {
              // We can't safely induce a STW safepoint from omAlloc() as our thread
              // state may not be appropriate for such activities and callers may hold
              // naked oops, so instead we defer the action.
              InduceScavenge (Self, "omAlloc") ;
            }
            continue;
        }

        // 3: allocate a block of new ObjectMonitors
        // Both the local and global free lists are empty -- resort to malloc().
        // In the current implementation objectMonitors are TSM - immortal.
        //3. new一个128大小的ObjectMonitor数组,第0个元素保留, 建立freelist链表,并与gFreeList连通,gFreeList指向新建立的链表.进入下一次循环
        assert (_BLOCKSIZE > 1, "invariant") ;
        ObjectMonitor * temp = new ObjectMonitor[_BLOCKSIZE];

        // NOTE: (almost) no way to recover if allocation failed.
        // We might be able to induce a STW safepoint and scavenge enough
        // objectMonitors to permit progress.
        if (temp == NULL) {
            vm_exit_out_of_memory (sizeof (ObjectMonitor[_BLOCKSIZE]), OOM_MALLOC_ERROR,
                                   "Allocate ObjectMonitors");
        }
    ...//省略
    }
}

ObjectMonitor

上一步获取到ObjectMonitor之后,最终是调用该类的wait方法。

在HotSpot虚拟机中,ObjectMonitor用于实现monitor。

Object的wait/notify方法最终是通过hotspot的ObjectMoniter来实现的,具体参考hotspot/src/share/vm/runtime/objectMonitor.cpp:

每个线程有两个ObjectMonitor对象列表,ObjectMonitor* omFreeList;ObjectMonitor* omInUseList;。用于await的时候分配ObjectMonitor对象的。就是上一步的omAlloc方法。

ObjectMonitor对象中有两个队列_WaitSet 和 _EntryList,用来保存ObjectWaiter对象列表,_owner 指向获得ObjectMonitor对象的线程。

c86d4327a7035a28e6201d3686e951c5

  • _WaitSet:调用wait的线程会进入_WaitSet而其他线程调用notify之后,本线程就会进入_EntryList
  • _EntryList:处于等待锁block状态的线程会进入_EntryList

wait

我们具体来看一下wait方法

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   Thread * const Self = THREAD ;
   assert(Self->is_Java_thread(), "Must be Java thread!");
   JavaThread *jt = (JavaThread *)THREAD;

   DeferredInitialize () ;

   // Throw IMSX or IEX.
   CHECK_OWNER();

   EventJavaMonitorWait event;

   // check for a pending interrupt
   if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
     // post monitor waited event.  Note that this is past-tense, we are done waiting.
     if (JvmtiExport::should_post_monitor_waited()) {
        // Note: 'false' parameter is passed here because the
        // wait was not timed out due to thread interrupt.
        JvmtiExport::post_monitor_waited(jt, this, false);
     }
     if (event.should_commit()) {
       post_monitor_wait_event(&event, 0, millis, false);
     }
     TEVENT (Wait - Throw IEX) ;
     THROW(vmSymbols::java_lang_InterruptedException());
     return ;
   }

   TEVENT (Wait) ;

   assert (Self->_Stalled == 0, "invariant") ;
   Self->_Stalled = intptr_t(this) ;
   jt->set_current_waiting_monitor(this);//设置当前正在waiting的监视器

   // create a node to be put into the queue
   // Critically, after we reset() the event but prior to park(), we must check
   // for a pending interrupt.
   ObjectWaiter node(Self);//将当前线程封装成一个ObjectWaiter
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag

   // Enter the waiting queue, which is a circular doubly linked list in this case
   // but it could be a priority queue or any data structure.
   // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
   // by the the owner of the monitor *except* in the case where park()
   // returns because of a timeout of interrupt.  Contention is exceptionally rare
   // so we use a simple spin-lock instead of a heavier-weight blocking lock.

   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;//添加到ObjectMonitor的等待队列中_Waitset
   Thread::SpinRelease (&_WaitSetLock) ;

   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }
   intptr_t save = _recursions; // record the old recursion count
   _waiters++;                  // increment the number of waiters
   _recursions = 0;             // set the recursion level to be 1
   //退出对象监视器,即将ObjectMonitor中的_owner置为NULL
   exit (true, Self) ;                    // exit the monitor
   guarantee (_owner != Self, "invariant") ;

   if (node._notified != 0 && _succ == Self) {
      node._event->unpark();
   }
   int ret = OS_OK ;
   int WasNotified = 0 ;
   { // State transition wrappers
     OSThread* osthread = Self->osthread();
     OSThreadWaitState osts(osthread, true);
     {
       ThreadBlockInVM tbivm(jt);
       // Thread is in thread_blocked state and oop access is unsafe.
       jt->set_suspend_equivalent();

       if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
           // Intentionally empty
       } else
       if (node._notified == 0) {//如果此时没有被唤醒
         if (millis <= 0) {
            Self->_ParkEvent->park () ;//挂起当前线程
         } else {
            ret = Self->_ParkEvent->park (millis) ;
         }
       }
       ...//省略
}

addWaiter方法

就是把当前线程包装成的ObjectWaiter添加到_WaitSet中:

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

exit方法

exit方法释放当前的ObjectMonitor对象,这样其它竞争线程就可以获取该ObjectMonitor对象:

  1. 如果是偏向锁,则偏向锁减一后返回
  2. 根据QMode的不同,将ObjectWaiter从_cxq或者_EntryList中取出后唤醒;
  3. 线程唤醒后会
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
   Thread * Self = THREAD ;
   if (THREAD != _owner) {
       assert (_recursions == 0, "invariant") ;
       _owner = THREAD ;
       _recursions = 0 ;
       OwnerIsThread = 1 ;
     } else {
       // NOTE: we need to handle unbalanced monitor enter/exit
       // in native code by throwing an exception.
       // TODO: Throw an IllegalMonitorStateException ?
       TEVENT (Exit - Throw IMSX) ;
       assert(false, "Non-balanced monitor enter/exit!");
       if (false) {
          THROW(vmSymbols::java_lang_IllegalMonitorStateException());
       }
       return;
     }
   }

   //偏向锁判断,偏向次数减一后直接返回
   if (_recursions != 0) {
     _recursions--;        // this is simple recursive enter
     TEVENT (Inflated exit - recursive) ;
     return ;
   }

   // Invariant: after setting Responsible=null an thread must execute
   // a MEMBAR or other serializing instruction before fetching EntryList|cxq.
   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }

#if INCLUDE_TRACE
   // get the owner's thread id for the MonitorEnter event
   // if it is enabled and the thread isn't suspended
   if (not_suspended && Tracing::is_event_enabled(TraceJavaMonitorEnterEvent)) {
     _previous_owner_tid = SharedRuntime::get_java_tid(Self);
   }
#endif

   for (;;) {
      assert (THREAD == _owner, "invariant") ;


      if (Knob_ExitPolicy == 0) {
         OrderAccess::release_store_ptr (&_owner, NULL) ;   // drop the lock
         OrderAccess::storeload() ;                         // See if we need to wake a successor
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            TEVENT (Inflated exit - simple egress) ;
            return ;
         }
         TEVENT (Inflated exit - complex egress) ;

         if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
            return ;
         }
         TEVENT (Exit - Reacquired) ;
      } else {
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            OrderAccess::release_store_ptr (&_owner, NULL) ;   // drop the lock
            OrderAccess::storeload() ;
            // Ratify the previously observed values.
            if (_cxq == NULL || _succ != NULL) {
                TEVENT (Inflated exit - simple egress) ;
                return ;
            }
            if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
               TEVENT (Inflated exit - reacquired succeeded) ;
               return ;
            }
            TEVENT (Inflated exit - reacquired failed) ;
         } else {
            TEVENT (Inflated exit - complex egress) ;
         }
      }

      guarantee (_owner == THREAD, "invariant") ;

      ObjectWaiter * w = NULL ;
      int QMode = Knob_QMode ;


      //QMode = 2,并且_cxq非空:取_cxq队列排头位置的ObjectWaiter对象,调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,此处会立即返回,后面的代码不会执行了;
      if (QMode == 2 && _cxq != NULL) {
          w = _cxq ;
          assert (w != NULL, "invariant") ;
          assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

      //QMode = 3,并且_cxq非空:把_cxq队列首元素放入_EntryList的尾部;
      if (QMode == 3 && _cxq != NULL) {
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;

          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }

          // Append the RATs to the EntryList
          // TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
          ObjectWaiter * Tail ;
          for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
          if (Tail == NULL) {
              _EntryList = w ;
          } else {
              Tail->_next = w ;
              w->_prev = Tail ;
          }

          // Fall thru into code that tries to wake a successor from EntryList
      }

      //QMode = 4,并且_cxq非空:把_cxq队列首元素放入_EntryList的头部;
      if (QMode == 4 && _cxq != NULL) {
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;

          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }

          // Prepend the RATs to the EntryList
          if (_EntryList != NULL) {
              q->_next = _EntryList ;
              _EntryList->_prev = q ;
          }
          _EntryList = w ;

          // Fall thru into code that tries to wake a successor from EntryList
      }

      //如果_EntryList的首元素非空,就取出来调用ExitEpilog方法(唤醒ObjectWaiter对象的线程然后返回)
      w = _EntryList  ;
      if (w != NULL) {
          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;//ExitEpilog方法唤醒ObjectWaiter对象的线程然后返回
          return ;
      }

      //如果_EntryList的首元素为空,就取_cxq的首元素,放入_EntryList,然后再从_EntryList中取出来执行ExitEpilog方法,然后立即返回;
      w = _cxq ;
      if (w == NULL) continue ;

      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
      TEVENT (Inflated exit - drain cxq into EntryList) ;

      assert (w != NULL              , "invariant") ;
      assert (_EntryList  == NULL    , "invariant") ;
      if (QMode == 1) {
         // QMode == 1 : drain cxq to EntryList, reversing order
         // We also reverse the order of the list.
         ObjectWaiter * s = NULL ;
         ObjectWaiter * t = w ;
         ObjectWaiter * u = NULL ;
         while (t != NULL) {
             guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
             t->TState = ObjectWaiter::TS_ENTER ;
             u = t->_next ;
             t->_prev = u ;
             t->_next = s ;
             s = t;
             t = u ;
         }
         _EntryList  = s ;
         assert (s != NULL, "invariant") ;
      } else {
      //QMode = 0,不做什么,继续往下看;
         // QMode == 0 or QMode == 2
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }

      // In 1-0 mode we need: ST EntryList; MEMBAR #storestore; ST _owner = NULL
      // The MEMBAR is satisfied by the release_store() operation in ExitEpilog().

      // See if we can abdicate to a spinner instead of waking a thread.
      // A primary goal of the implementation is to reduce the
      // context-switch rate.
      if (_succ != NULL) continue;

      w = _EntryList  ;
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
   }
}

ObjectWaiter

ObjectWaiter是双向链表的节点,保存了当前thread以及当前的状态TState等数据。每个等待锁的线程都被封装为一个ObjectWaiter对象。

注解中很重要的一条:ObjectWaiter对象存在于WaitSet、EntryList、cxq等集合中,或者正在这些集合中移动

class ObjectWaiter : public StackObj {
 public:
  enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;
  enum Sorted  { PREPEND, APPEND, SORTED } ;
  ObjectWaiter * volatile _next;
  ObjectWaiter * volatile _prev;
  Thread*       _thread;
  jlong         _notifier_tid;
  ParkEvent *   _event;
  volatile int  _notified ;
  volatile TStates TState ;
  Sorted        _Sorted ;           // List placement disposition
  bool          _active ;           // Contention monitoring is enabled
 public:
  ObjectWaiter(Thread* thread);

  void wait_reenter_begin(ObjectMonitor *mon);
  void wait_reenter_end(ObjectMonitor *mon);
};

PlatformEvent

LockSupport也是继承了PlatformEvent,但是重写了park/unpark。

park

park这里调用的是PlatformEvent的park方法,

//通过通过UNIX(对应BSD版本)线程库函数pthread_cond_wait和pthread_cond_signal实现的。
void os::PlatformEvent::park() {       // AKA "down()"
  // Invariant: Only the thread associated with the Event/PlatformEvent
  // may call park().
  // TODO: assert that _Assoc != NULL or _Assoc == Self
  int v ;
  //_Event是个int变量,如果CAS更新成功,即成功将_Event减1,则退出循环
  for (;;) {
      v = _Event ;
      if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
  }
  guarantee (v >= 0, "invariant") ;
  //v=_Event,v=0表示无许可,则需要堵塞等待获得许可;
  if (v == 0) {
     // Do this the hard way by blocking ...
     int status = pthread_mutex_lock(_mutex);
     assert_status(status == 0, status, "mutex_lock");
     guarantee (_nParked == 0, "invariant") ;
     ++ _nParked ;
     //等待许可,调用pthread_cond_wait进行等待
     while (_Event < 0) {
      //等待signal方法唤醒,唤醒之后需要重新获取_mutex锁,方法才能返回
        status = pthread_cond_wait(_cond, _mutex);
        // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
        // Treat this the same as if the wait was interrupted
        if (status == ETIME) { status = EINTR; }
        assert_status(status == 0 || status == EINTR, status, "cond_wait");
     }
     -- _nParked ;
//获取许可之后,设置可用许可数为0;由此可见许可数最大为1
    _Event = 0 ;
     status = pthread_mutex_unlock(_mutex);
     assert_status(status == 0, status, "mutex_unlock");
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other.
    OrderAccess::fence();
  }
  guarantee (_Event >= 0, "invariant") ;
}

unpark方法

void os::PlatformEvent::unpark() {
 //使用原子方法xchg将1放入寄存器,与_Event所指的内容交换,即_Event=1,然后返回_Event原先的值,
 //如果返回值大于等于0,表示有许可有用,方法直接返回;
  // Transitions for _Event:
  //    0 :=> 1
  //    1 :=> 1
  //   -1 :=> either 0 or 1; must signal target thread
  //          That is, we can safely transition _Event from -1 to either
  //          0 or 1. Forcing 1 is slightly more efficient for back-to-back
  //          unpark() calls.
  // See also: "Semaphores in Plan 9" by Mullender & Cox
  //
  // Note: Forcing a transition from "-1" to "1" on an unpark() means
  // that it will take two back-to-back park() calls for the owning
  // thread to block. This has the benefit of forcing a spurious return
  // from the first park() call after an unpark() call which will help
  // shake out uses of park() and unpark() without condition variables.

  if (Atomic::xchg(1, &_Event) >= 0) return;

  // Wait for the thread associated with the event to vacate
  //如果原来的_Event小于0,说明有park方法进入了pthread_cond_wait堵塞
  int status = pthread_mutex_lock(_mutex);
  assert_status(status == 0, status, "mutex_lock");
  int AnyWaiters = _nParked;
  assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");

  //如果WorkAroundNPTLTimedWaitHang=true,会先调用signal,然后释放锁;如果WorkAroundNPTLTimedWaitHang=false,会先释放锁,然后调用signal;
    
  //NPTL存在瑕疵,当pthread_cond_timedwait() 方法时间参数为过去时间,
     //会导致_cond变量被破坏或者线程被hang住;
  if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
    AnyWaiters = 0;
    //调用signal唤醒pthread_cond_wait调用线程,不判断方法执行结果
    pthread_cond_signal(_cond);
  }
  //然后释放_mutex锁
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock");
  if (AnyWaiters != 0) {
  // //调用signal唤醒pthread_cond_wait调用线程,并判断方法执行结果
    status = pthread_cond_signal(_cond);
    assert_status(status == 0, status, "cond_signal");
  }

  // Note that we signal() _after dropping the lock for "immortal" Events.
  // This is safe and avoids a common class of  futile wakeups.  In rare
  // circumstances this can cause a thread to return prematurely from
  // cond_{timed}wait() but the spurious wakeup is benign and the victim will
  // simply re-test the condition and re-park itself.
}

总结

综合来讲,Object的wait方法主要做了这几件事情:

  1. 能进入wait肯定是获取了锁。wait依赖于对象ObjectMonitor的实现,那么首先要获得ObjectMonitor。
  2. 如果对象头里面有,则获取。否则尝试从当前线程分配。首先从omFreelist链表中分配,如果没有则从gFreelist分配,再没有就new一个128大小的ObjectMonitor数组,放入到当前线程的omFreelist,以及全局的gFreelist。
  3. 获取后进入ObjectMonitor的wait方法。该方法将当前线程包装为一个ObjectWaiter,插入到ObjectMonitor的等待队列中_Waitset中,然后退出对象监视器,挂起线程。

notify

notify方法

ObjectMonitor的notify方法

notify方法把notify标志位设置为1,然后进入到enter(synchronized)队列中,准备重新获取锁:

  1. 从_WaitSet中取出第一个;所有wait的线程都被包装为ObjectMoniter对象放入_WaitSet。
  2. 根据Policy的不同,将这个线程放入_EntryList或者_cxq队列中的起始或末尾位置;
void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  //当前没有等待节点,直接返回
  if (_WaitSet == NULL) {
     TEVENT (Empty-Notify) ;
     return ;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;

  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  //获取等待节点
  ObjectWaiter * iterator = DequeueWaiter() ;//选择头结点。所有wait的线程都被包装为ObjectMoniter对象放入_WaitSet。
  //根据不同的策略,将取出来的ObjectWaiter节点,加入到_EntryList或则通过Atomic::cmpxchg_ptr指令进行自旋操作cxq
  if (iterator != NULL) {
     TEVENT (Notify1 - Transfer) ;
     guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
     guarantee (iterator->_notified == 0, "invariant") ;
     if (Policy != 4) {
        iterator->TState = ObjectWaiter::TS_ENTER ;
     }
     iterator->_notified = 1 ;//设置notify为1
     Thread * Self = THREAD;
     iterator->_notifier_tid = Self->osthread()->thread_id();

     ObjectWaiter * List = _EntryList ;//准备进入同步代码块的队列
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }

    //Policy == 0:放入_EntryList队列的排头位置
     if (Policy == 0) {       // prepend to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
             List->_prev = iterator ;
             iterator->_next = List ;
             iterator->_prev = NULL ;
             _EntryList = iterator ;
        }
     } else//Policy == 1:放入_EntryList队列的末尾位置;
     if (Policy == 1) {      // append to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            // CONSIDER:  finding the tail currently requires a linear-time walk of
            // the EntryList.  We can make tail access constant-time by converting to
            // a CDLL instead of using our current DLL.
            ObjectWaiter * Tail ;
            for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
            assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
            Tail->_next = iterator ;
            iterator->_prev = Tail ;
            iterator->_next = NULL ;
        }
     } else
     // prepend to cxq 默认为2,将唤醒的ObjectWaiter放入到cxq队列最前面
     //Policy == 2:_EntryList队列为空就放入_EntryList,否则放入_cxq队列的排头位置;
     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Front = _cxq ;
                iterator->_next = Front ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                    break ;
                }
            }
         }
     } else//Policy == 3:放入_cxq队列中,末尾位置;更新_cxq变量的值的时候,同样要通过CAS注意并发问题;
     if (Policy == 3) {      // append to cxq
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
                iterator->_next = NULL ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                   break ;
                }
            } else {
                while (Tail->_next != NULL) Tail = Tail->_next ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
                break ;
            }
        }
     } else {
        ParkEvent * ev = iterator->_event ;
        iterator->TState = ObjectWaiter::TS_RUN ;
        OrderAccess::fence() ;
        ev->unpark() ;
     }

     if (Policy < 4) {//设置线程状态为正在重新获取锁(进入同步代码块)
       iterator->wait_reenter_begin(this);
     }

     // _WaitSetLock protects the wait queue, not the EntryList.  We could
     // move the add-to-EntryList operation, above, outside the critical section
     // protected by _WaitSetLock.  In practice that's not useful.  With the
     // exception of  wait() timeouts and interrupts the monitor owner
     // is the only thread that grabs _WaitSetLock.  There's almost no contention
     // on _WaitSetLock so it's not profitable to reduce the length of the
     // critical section.
  }

  Thread::SpinRelease (&_WaitSetLock) ;

  if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
     ObjectMonitor::_sync_Notifications->inc() ;
  }
}

DequeueWaiter

获取第一个waiter。

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}

inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev != NULL, "node already removed from list");
  assert(node->_next != NULL, "node already removed from list");
  // when the waiter has woken up because of interrupt,
  // timeout or other spurious wake-up, dequeue the
  // waiter from waiting list
  ObjectWaiter* next = node->_next;
  if (next == node) {
    assert(node->_prev == node, "invariant check");
    _WaitSet = NULL;
  } else {
    ObjectWaiter* prev = node->_prev;
    assert(prev->_next == node, "invariant check");
    assert(next->_prev == node, "invariant check");
    next->_prev = prev;
    prev->_next = next;
    if (_WaitSet == node) {
      _WaitSet = next;
    }
  }
  node->_next = NULL;
  node->_prev = NULL;
}

refer:https://mp.weixin.qq.com/s?__biz=MzI2NzY4MjM1OQ==&mid=2247485074&idx=1&sn=6a8c0b3cfcc34196f4b2576452700e7c&chksm=eafa51cadd8dd8dc59b6b2599940dde9c27bea9c44b785703200e5de2badae9918caea491bb8&scene=21

https://blog.csdn.net/boling_cavalry/article/details/77793224
https://blog.csdn.net/boling_cavalry/article/details/77897108
https://blog.csdn.net/boling_cavalry/article/details/77995069

https://blog.csdn.net/chengzhang1989/article/details/78703818

https://sq.163yun.com/blog/article/188805878260191232

comments powered by Disqus