LockSupport源码分析

concurrent包基于AQS框架,AQS框架基于两个类:

  • Unsafe(提供CAS操作)
  • LockSupport(提供park/unpark操作)

概念

LockSupport

public static void park() {
        UNSAFE.park(false, 0L);
    }
public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

Unsafe

Unsafe类中对应的方法:
park是将当前调用Thread阻塞,unpark是将指定线程Thread唤醒。

    //park
    public native void park(boolean isAbsolute, long time);
    
    //unpack
    public native void unpark(Object var1);

与Object的wait/notify机制对比,park/unpack的优点

  • 以Thread为操作对象更符合阻塞线程的直观定义;
  • 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性。

“许可”的概念

  1. park/unpark的设计原理核心是“许可”,park是等待一个许可,unpark是为某线程提供一个许可。

如果某个线程A调用park,则需要等待另一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。

  1. unpark可以在park之前。

先提供许可,当某线程调用park时,已经有许可了,就消费这个许可,然后可以继续运行。

  1. “许可”不可叠加,是一次性的。

如果线程B运行了三次unpark,当线程A调用park消费掉这个“许可”之后,再次调用park需要进入等待状态。

源码

mutex和condition保护了一个_counter的变量,当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。

每个Java线程都有一个Parker实例

private:
  Parker*    _parker;
public:
  Parker*     parker() { return _parker; }

void JavaThread::initialize() {
  // Initialize fields
  _parker = Parker::Allocate(this) ;
}

Park类: _counter就是“许可”

class Parker : public os::PlatformParker {  
private:  
  volatile int _Event ;  
  ...  
public:  
  void park(bool isAbsolute, jlong time);  
  void unpark();  
  ...  
}  
class PlatformParker : public CHeapObj<mtInternal> {  
  protected:  
    pthread_mutex_t _mutex [1] ;  
    pthread_cond_t  _cond  [1] ;  
    ...  
}  

park

先尝试能否直接获得“许可”,即_counter>0。如果成功,则把_counter设置为0并返回。如果不成功,则获取锁,等待信号发生。获取许可后,设置可用许可数为0,unlock mutex并返回。

void os::PlatformEvent::park() { 
//_Event是个int变量,如果CAS更新成功,即成功将_Event减1,则退出循环
  int v ;
  for (;;) {
      v = _Event ;
      //使用原子方法xchg将0放入寄存器,与_Event所指的内容交换,即_Event=0,然后返回_Event原先的值
      if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
  }
  guarantee (v >= 0, "invariant") ;
 //v=_Event,v=0表示无许可,则需要堵塞等待获得许可;
  if (v == 0) {
     int status = pthread_mutex_lock(_mutex);//获取锁
     assert_status(status == 0, status, "mutex_lock");
     guarantee (_nParked == 0, "invariant") ;
     ++ _nParked ;
    //等待许可,调用pthread_cond_wait进行等待
     while (_Event < 0) {
       //pthread_cond_wait会加入等待队列,同时释放_mutex锁,
      //等待signal方法唤醒,唤醒之后需要重新获取_mutex锁,方法才能返回
        status = pthread_cond_wait(_cond, _mutex);
        if (status == ETIME) { status = EINTR; }
        assert_status(status == 0 || status == EINTR, status, "cond_wait");
     }
     -- _nParked ;
     //获取许可之后,设置可用许可数为0;由此可见许可数最大为1
    _Event = 0 ;
     status = pthread_mutex_unlock(_mutex);//释放锁
     assert_status(status == 0, status, "mutex_unlock");
    OrderAccess::fence();//内存屏障语句
  }
  guarantee (_Event >= 0, "invariant") ;
}

unpark

设置_Event值后,如果原始值大于0表示之前没有park在等待,直接返回。

否则需要通知park线程:
如果WorkAroundNPTLTimedWaitHang=true,会先调用singal再释放锁。如果为false会先释放锁再调用signal。

void os::PlatformEvent::unpark() {
 //使用原子方法xchg将1放入寄存器,与_Event所指的内容交换,即_Event=1,然后返回_Event原先的值,
 //如果返回值大于等于0,表示有许可有用,方法直接返回;
  if (Atomic::xchg(1, &_Event) >= 0) return;
 //如果原来的_Event小于0,说明有park方法进入了pthread_cond_wait堵塞
  int status = pthread_mutex_lock(_mutex);//获取锁
  assert_status(status == 0, status, "mutex_lock");
  int AnyWaiters = _nParked;
  assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
   //NPTL存在瑕疵,当pthread_cond_timedwait() 方法时间参数为过去时间,
   //会导致_cond变量被破坏或者线程被hang住;
   //WorkAroundNPTLTimedWaitHang 是JVM的运行参数,默认为1
  if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
    AnyWaiters = 0;
    //调用signal唤醒pthread_cond_wait调用线程,不判断方法执行结果
    pthread_cond_signal(_cond);
  }
  //然后释放_mutex锁
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock");
  if (AnyWaiters != 0) {
   // //调用signal唤醒pthread_cond_wait调用线程,并判断方法执行结果
    status = pthread_cond_signal(_cond);
    assert_status(status == 0, status, "cond_signal");
  }
}

Atomic方法

上面使用到了Atomic的xchg和cmpxchg方法,这两个方法是采用汇编实现的:

asm ( assembler template  
    : output operands                   (optional)  
    : input operands                    (optional)  
    : clobbered registers list          (optional)  
    );  

output operands和inpupt operands指定参数,它们从左到右依次排列,用','分割,编号从0开始。

inline jint     Atomic::xchg    (jint     exchange_value, volatile jint*     dest) {
  __asm__ volatile (  "xchgl (%2),%0"
                    : "=r" (exchange_value)
                    : "0" (exchange_value), "r" (dest)
                    : "memory");
  return exchange_value;
}

将exchange_value(%0)的值放入通用寄存器,与dest(%2)所指的内容进行交换,返回dest指针原指向内容的值;

  • %0为exchange_value,%1为exchange_value,%2为dest;
  • "r"表示将dest的值读到一个通用寄存器;
  • "0"表示和%0使用同样的通用寄存器,此处表示将exchange_value值读入通用寄存器;
  • "=r"表示将结果写入到exchange_value,而且要使用通用寄存器,由于通用寄存器的内容已经被设置为dest所指向的内容,因此exchange_value等于原dest所指向的内容;
  • asm指示编译器在此插入汇编语句;
  • volatile告诉编译器,严禁将此处的汇编语句与其它的语句重组合优化,即原原本本按原来的样子处理这里的汇编;
  • memory强制gcc编译器假设RAM所有内存单元均被汇编指令修改,这样cpu中的registers和cache中已缓存的内存单元中的数据将作废。cpu将不得不在需要的时候重新读取内存中的数据。
int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
static inline bool is_MP() {
    assert(_processor_count > 0, "invalid processor count");
    return _processor_count > 1;
  }
  • mp表示是否属于多核cpu环境,如果是则LOCK_IF_MP会插入lock指令;
  • %0为exchange_value,%1为exchange_value,%2为compare_value,%3为dest,%4为mp;
  • "a" (compare_value)表示将compare_value读入eax寄存器,与%3(dest)进行比较,如果相等则将%1(exchange_value)写入dest;
  • "=a" (exchange_value)表示将eax寄存器的内容写入到exchange_value;
comments powered by Disqus