Park/Unpark源码

背景

在读JUC源码的时候经常看到LockSupport.park/unpark阻塞和唤醒线程,我们来看看它们和Object的wait/signal有什么区别。

unpark

LockSupport.java

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

Unsafe.java

public native void unpark(Object thread);

Unsafe native

hotspot\src\share\vm\prims\unsafe.cpp

UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark");
  // 声明一个Parker对象p,它是真正干活的对象
  Parker* p = NULL;
  if (jthread != NULL) {
    // 根据传入的jthread对象,来获取native层的oopDesc*对象,oop是oopDesc* 的宏定义
    oop java_thread = JNIHandles::resolve_non_null(jthread);
    if (java_thread != NULL) {
      // 获取java_thread对象中_park_event_offset的值,该值就是Parker对象的地址
      jlong lp = java_lang_Thread::park_event(java_thread);
      if (lp != 0) {
        // 如果地址有效,直接转为Parker指针
        // This cast is OK even though the jlong might have been read
        // non-atomically on 32bit systems, since there, one word will
        // always be zero anyway and the value set is always the same
        p = (Parker*)addr_from_java(lp);
      } else {
        // 如果地址无效
        // Grab lock if apparently null or using older version of library
        MutexLocker mu(Threads_lock);
        java_thread = JNIHandles::resolve_non_null(jthread);
        if (java_thread != NULL) {
          // 转为native层的JavaThread对象
          JavaThread* thr = java_lang_Thread::thread(java_thread);
          if (thr != NULL) {
            // 将JavaThread的成员变量_parker赋值给p
            p = thr->parker();
            if (p != NULL) { // Bind to Java thread for next time.
              // 将p的地址赋值给_park_event_offset,下次获取时可用
              java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
            }
          }
        }
      }
    }
  }
  if (p != NULL) {
#ifndef USDT2
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
    HOTSPOT_THREAD_UNPARK(
                          (uintptr_t) p);
#endif /* USDT2 */
    // 真正干货的方法,调用了Parker的unpark方法
    p->unpark();
  }
UNSAFE_END

JavaThread类是native的thread,成员变量_threadObj是java层的thread,native层通过它来调用java层的方法。

Parker

Parker类是实现park/unpark的类:hotspot\src\share\vm\runtime\park.hpp

class Parker : public os::PlatformParker {
private:
  volatile int _counter ;
...
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();
...
};

这里的重点是counter,counter>0则允许park通过,通过后counter被赋值为0。unpark将counter置为1,并唤醒当前等待的线程。

其父类PlatformParker声明了互斥量,是一个跨平台的类,有linux、bsd、windows、solaris四种实现,这里以linux实现拿来当例子:hotspot\src\os\linux\vm\os_linux.hpp

class PlatformParker : public CHeapObj<mtInternal> {
  protected:
...
    pthread_mutex_t _mutex [1] ;
    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.
...
};

来看看unpark具体实现:

void Parker::unpark() {
  int s, status ;
  // 先进入_mutex的临界区,声明如下
  // pthread_mutex_t _mutex [1] ;
  // pthread_cond_t  _cond  [2] ; 
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ;
  s = _counter;
  // 将_counter置为1
  _counter = 1;
  // s记录的是unpark之前的_counter数,如果s < 1,说明有可能该线程在等待状态,需要唤醒。
  if (s < 1) {
    // thread might be parked
    // _cur_index代表被使用cond的index
    if (_cur_index != -1) {
      // thread is definitely parked
      // 根据虚拟机参数WorkAroundNPTLTimedWaitHang来做不同的处理,默认该参数是1
      if (WorkAroundNPTLTimedWaitHang) {
        // 先唤醒目标等待的线程
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        // 释放互斥锁,先唤醒后释放锁,可能会导致线程被唤醒后获取不到锁,再次进入等待状态,我的理解是效率可能会低一丢丢
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        // 先释放锁
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        // 后发信号唤醒线程,唤醒操作在互斥代码块外部
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
      }
    } else {
      // 如果线程没有在等待,直接返回
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  } else {
    // 如果线程没有在等待,直接返回
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}

其实就是使用pthread mutex来做线程等待同步等等操作。

park

LockSupport.java

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

Unsafe.java

public native void park(boolean isAbsolute, long time);

Unasfe native

hotspot\src\share\vm\prims\unsafe.cpp

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
#ifndef USDT2
  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
   HOTSPOT_THREAD_PARK_BEGIN(
                             (uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
  HOTSPOT_THREAD_PARK_END(
                          (uintptr_t) thread->parker());
#endif /* USDT2 */
  if (event.should_commit()) {
    oop obj = thread->current_park_blocker();
    event.set_klass((obj != NULL) ? obj->klass() : NULL);
    event.set_timeout(time);
    event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
    event.commit();
  }
UNSAFE_END

也是使用的Parker类。

Parker

void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  // 先原子的将_counter的值设为0,并返回_counter的原值,如果原值>0说明有通行证,直接返回
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
  // 判断线程是否已经被中断
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
  // park方法的传参是isAbsolute = false, time = 0,所以会继续往下走
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  // 这里time为0,如果调用的是parkNanos或者parkUtil,这里time就会>0,
  if (time > 0) {
    // 如果time > 0,unpackTime计算absTime的时间
    unpackTime(&absTime, isAbsolute, time);
  }


  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt);

  // Don't wait if cannot get lock since interference arises from
  // unblocking.  Also. check interrupt before trying wait

  // 再次判断线程是否被中断,如果没有被中断,尝试获得互斥锁,如果获取失败,直接返回
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status ;
  // 如果_counter > 0, 不需要等待,这里再次检查_counter的值
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    // 插入一个写内存屏障,保证可见性,具体实现见下方
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  // 设置JavaThread的_suspend_equivalent为true,表示线程被暂停
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    // 让线程等待_cond[_cur_index]信号,到这里线程进入等待状态
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    // 线程进入有超时时间的等待,内部实现调用了pthread_cond_timedwait系统调用
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  _cur_index = -1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
 // _counter重新设置为0
  _counter = 0 ;
  // 释放互斥锁
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  // 插入写屏障
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

内存屏障:hotspot\src\os_cpu\linux_x86\vm\orderAccess_linux_x86.inline.hpp

inline void OrderAccess::fence() {
// 如果是多核cpu
  if (os::is_MP()) {
    // always use locked addl since mfence is sometimes expensive
#ifdef AMD64
 // 判断是否AMD64 CPU,汇编代码实现写屏障
    __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else
    __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif
  }
}

总结

park/unpark通过counter来表示,counter>0则直接通过,所以如果先调用unpark再调用park,则park会直接通过。如果count=0则会阻塞。
linux平台使用的pthread_mutex_lock。

ref:https://juejin.im/post/6844903938202796039
https://www.jianshu.com/p/82b2d2361fb8

...

comments powered by Disqus