BlockingQueue
BlockingQueue是一类带阻塞功能的队列,使用的时候如果队列为空则阻塞,不需要调用方不断去轮询。
BlockingQueue是个接口,接口定义如下:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
......
}
- 入队时已满:add、offer非阻塞,put阻塞
- 出队时为空:remove、peek非阻塞,take阻塞
总结一下:
操作 | 抛出异常 | 返回特定值 | 阻塞 | 超时 |
---|---|---|---|---|
入队 | add(e) | offer(e)——false | put(e) | offer(e, timeout, unit) |
出队 | remove() | poll()——null | take() | poll(timeout, unit) |
检查 | element() | peek()——null | N/A | N/A |
其子类有以下几个:
下面分别介绍几种子类实现。
ArrayBlockingQueue
ArrayBlockingQueue有以下特性:
- 用数组实现的环形队列
- 有队头takeIndex、队尾putIndex两个指针,用于循环利用数组。
- 容量在初始化时指定容量,不扩容。
- 并发安全是通过一个重入锁+两个条件实现的,分别是未空的notEmpty和未满的notFull。
// 使用数组存储元素
final Object[] items;//数组以及队头、队尾指针
/** items index for next take, poll, peek or remove */
// 取元素的指针
int takeIndex;
/** items index for next put, offer, or add */
// 放元素的指针
int putIndex;
/** Number of elements in the queue */
// 元素数量
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
//其核心是一把锁+两个条件
/** Main lock guarding all access */
// 保证并发访问的锁
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
put
- 加锁;
- 如果队列满了,则阻塞;
- 插入数据,putIndex+1,通知非空条件。
- 解锁
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//可中断
try {
while (count == items.length)
notFull.await();//队列满了,阻塞于"非满"条件
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();//put进去之后通知非空条件
}
take
- 加锁
- 如果队列为空则阻塞
- 获取元素,takeIndex+1,通知"非满"条件
- 解锁
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();//队列为空,则阻塞在"非空"条件
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//take了之后通知"非满"条件
return x;
}
ArrayBlockingQueue还有非阻塞的入队、出队实现,这里就不展开了。
LinkedBlockingQueue
ArrayBlockingQueue只用一个锁来做出队和入队的同步,效率较低,LinkedBlockingQueue借助分段的思想把入队出队分裂成两个锁。
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();//原子变量
transient Node<E> head;//单向链表的头部
private transient Node<E> last;//单向链表的尾部
//两把锁+两个条件
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
初始化
初始化中传入容量,如果不传则用默认Integer.MAX_VALUE,相当于不受限。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
put
流程:
- put锁加锁
- 如果满了则阻塞,被唤醒后入队。如果没满则直接入队
- 如果现队列长度小于容量,就再唤醒一个阻塞在notFull条件上的线程
- 解锁
- 如果自增前的count为0,即意味着可能有take线程阻塞,那么需要调用signalNotEmpty通知take线程
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();//获得count,赋值给c后完成自增操作
// 如果现队列长度如果小于容量
// 就再唤醒一个阻塞在notFull条件上的线程
// 这里为啥要唤醒一下呢?
// 因为可能有很多线程阻塞在notFull这个条件上的
// 而取元素时只有取之前队列是满的才会唤醒notFull
// 为什么队列满的才唤醒notFull呢?
// 因为唤醒是需要加putLock的,这是为了减少锁的次数
// 所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程
// 说白了,这也是锁分离带来的代价
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
take
流程:
- 加take锁;
- 如果为空则阻塞,被唤醒后获取队头元素,如果不为空则直接获取队头元素;
- 如果获取后容量还不为空,则再唤醒一个阻塞在notEmpty条件上的线程。注意c是自减前的值。
- 解锁
- 如果队列中还有一个剩余空间,则通知可能阻塞在notFull条件上的线程。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();//获得count,赋值给c后完成自增操作
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)//如果队列中还有一个剩余空间
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
总结
LinkedBlockingQueue使用了两把锁:
- put和put之间、take与take之间互斥
- put和take之间不互斥
LinkedBlockingQueue的通知和ArrayBlockingQueue的不一样,多了几个通知:
- put会通知take
- take会通知put
- put发现非满时会通知其他put线程
- take发现非空时会通知其他take线程
之所以比ArrayBlockingQueue多了put通知put、take通知take的流程,是因为LinkedBlockingQueue使用了两把锁。我们来假设这样一种常见:P1、P2、P3在等待notFull,C1消耗后唤醒P1,运行过程中C2消耗唤醒P2,P2需要等put锁,过程中P2超时了,那么P1最后需要检查一下,以避免P3无意义的等待。
总结就是,因为Producer和Consumer使用了两把锁,它们之间操作不互斥。
PriorityBlockingQueue
PriorityBlockingQueue是juc包下面的优先级阻塞队列,它是线程安全的。它使用一个锁+一个条件notEmpty:
- 入队:加锁,判断扩容,自下而上堆化,唤醒notEmpty,解锁
- 出队:加锁,为空则阻塞在notEmpty,出队,自上而下堆化,解锁
因为是无限队列,满了会自动扩容,所以没有非满,只有非空。
private transient Object[] queue;//用数组实现的二叉小根堆
//一把锁+一个条件,没有非满的条件
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
初始化
初始化数组,初始大小11。默认是小顶堆。
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* Creates a {@code PriorityBlockingQueue} with the specified
* initial capacity that orders its elements according to their
* {@linkplain Comparable natural ordering}.
*
* @param initialCapacity the initial capacity for this priority queue
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
put
流程:
- 加锁
- 如果满了,则自动扩容
- 没有定义比较操作符,使用元素自带的比较功能,否则使用定义的比较功能插入数据:
- 获取parent,如果parent大于插入数据e,则parent和e互换。比较直到parent小于e。这里说明的是默认的小顶堆,其他情况也适用。
- 通知非满条件
- 解锁
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);//扩容
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)//没有定义比较操作符,使用元素自带的比较功能
siftUpComparable(n, e, array);//元素入堆,执行siftUp操作
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
扩容
队列满了会自动扩容:
- 解锁
- 增大容量,原来的数组长度小于64的话,自增一倍+2,大于等于64则增大1/2
- 加锁
- 替换queue,把原来的数据搬过来
扩容的时候使用allocationSpinLock来做同步控制,只有成功CAS把allocationSpinLock置为1的那个线程才可以扩容。
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
take
小顶堆的出队:
- 加锁
- 如果为空,则阻塞在notEmpty,否则出队:
- 获取第0个元素作为result
- 获取最后一个元素放到第0个位置,shift down:从第0个开始,获取左右子节点较小的那个,和本节点比较,和最小的那个子节点互换,直到子节点都大于本节点。
- 返回第一步获取的第0个元素
- 解锁
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
//小顶堆的出队
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
DelayQueue
DelayQueue即延迟队列,是一个按延迟时间从小到大出队的PriorityQueue。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 用于控制并发的锁
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();//优先级队列
// 用于标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
//一把锁+一个非空条件
// 条件,用于表示现在是否有可取的元素
private final Condition available = lock.newCondition();
}
DelayQueue里面的元素必须实现Delayed接口,它只有一个方法getDelay。getDelay的返回值小于或等于0,则说明元素到期了,需要从队列中拿出来执行。
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
put
流程:
- 加锁;
- 直接放入二叉堆,二叉堆里面会找到合适的位置插入;
- 如果放进去后发现元素在堆顶,即延迟时间是最小的,则需要通知等待的线程,这个等待的线程是后面的take线程;
- 解锁。
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);//元素放入二叉堆
//如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;
if (q.peek() == e) {//如果放进去的元素刚好在堆顶,说明放入的元素延迟时间是最小的,那么需要通知等待的线程,否则放入的元素不在堆顶,没有必要通知等待的线程
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
take
流程:
- 加锁
- 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
- 否则获取堆顶元素的delay time,小于0即表示已经到期了,出队返回
- 如果堆顶元素大于0,那么需要阻塞,接下来是阻塞的步骤
- leader不为空,则表示已经有其他线程在等这个元素,则继续在available条件上阻塞
- leader为空,设置leader为当前线程,阻塞delay time时间
- 唤醒后设置leader为空,让其它线程有机会获取元素。
- 再返回步骤2重复执行
- 最后,如果leader为空且出队后队列不为空则调用available.signal把等待的线程挪到同步队列
- 解锁
简而言之就是如果队列中没有元素则阻塞,如果有元素,则看是否首元素到期了,到期则直接获取返回,否则看是否当前已经有线程在等待,如果有则无限期阻塞,否则阻塞delay时间。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 堆顶元素
E first = q.peek();//栈顶
// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
if (first == null)
available.await();
else {
// 堆顶元素的到期时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
if (delay <= 0)
return q.poll();
// 如果delay大于0 ,则下面要阻塞了
// 将first置为空方便gc,因为有可能其它元素弹出了这个元素
// 这里还持有着引用不会被清理
first = null; // don't retain ref while waiting
// 如果前面有其它线程在等待,直接进入等待
if (leader != null)//已经有其他线程在等待这个元素,则无限期阻塞
available.await();
else {
// 如果leader为null,把当前线程赋值给它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待delay时间后自动醒过来
// 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期
// 这里即使醒过来后也不一定能获取到元素
// 因为有可能其它线程先一步获取了锁并弹出了堆顶元素
// 条件锁的唤醒分成两步,先从Condition的队列里出队
// 再入队到AQS的队列中,当其它线程调用LockSupport.unpark(t)的时候才会真正唤醒
available.awaitNanos(delay);//阻塞有限时间
} finally {
// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)//自己是leader,已经获取了堆顶元素,唤醒其他线程
// signal()只是把等待的线程放到AQS的队列里面,并不是真正的唤醒
available.signal();
// 解锁,这才是真正的唤醒
lock.unlock();
}
}
take的阻塞条件:
- 队列为空
- 队列不为空,但是delay time大于0
用一个Thread leader变量记录了等待堆顶元素的第1个线程:
- 如果leader不为空,即有其他线程已经在等待该元素,则无限期阻塞
- 如果leader为空,那么调用condition.awaitNanos等待一个有限的时间即可
SynchronousQueue
SynchronousQueue是juc包下面的无缓冲阻塞队列,用于在两个线程间移交元素。多个线程的情况下,先调用n次put,n个线程都会阻塞;直到另外的线程调n次take,2n个线程才同时解锁,反之亦然。
CachedThreadPool形式的线程池就使用到了SynchronousQueue。
初始化
属性:阻塞前会自旋
/** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超时的情况自旋多少次,当CPU数量小于2的时候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 没有超时的情况自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 针对有超时的情况,自旋了多少次后,如果剩余时间大于1000纳秒就使用带时间的LockSupport.parkNanos()这个方法
static final long spinForTimeoutThreshold = 1000L;
构造器:使用TransferQueue实现fair模式,TransferStack实现unfair模式,默认是unfair模式
- 公平模式采用先进先出,第一个take线程会匹配第一个put线程;
- 非公平模式采用后到先匹配,最新的put匹配第一个take
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
主要内部类
传输元素的抽象类:
abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
}
TransferQueue
TransferQueue是公平模式的实现,是一个基于单向链表实现的队列,通过head和tail两个指针记录头部和尾部。初始的时候head和tail都指向一个dummy节点。其并发操作是通过CAS操作head、tail指针来进行的。
举个例子:
- 阶段(a):队列中是一个空的节点,head/tail都指向这个空节点。
- 阶段(b):3个线程分别调用put,生成3个QNode,进入队列。
- 阶段(c):来了一个线程调用take,会和队列头部的第1个QNode进行配对。
- 阶段(d):第1个QNode出队列。
todo 改图
队列中只有一种元素,要么全是put要么全是take。
transfer方法:
- 如果队列中没有元素或者队列中元素和需要入队的元素是同一种
- 生成节点,CAS加入队列,后移tail指针;
- 调用awaitFulfill阻塞等待唤醒,其中有自旋到一定次数后阻塞的逻辑;
- 被唤醒后返回item,因为匹配的时候item已经CAS为对方的item,所以这里是返回匹配的数据。
- 如果队列中元素和需要入队的元素不同,则匹配后出队
- 不断重试cas 头指针的item为入队元素e,成功后后移head指针,唤醒原节点的waiter(即原节点的等待线程)
static final class TransferQueue<E> extends Transferer<E> {
/** Node class for TransferQueue. */
// 队列中的节点
static final class QNode {
//单向链表
// 下一个节点
volatile QNode next; // next node in queue
//如果是put,则item != null,如果是take,则item==null
// 存储的元素
volatile Object item; // CAS'ed to or from null
//put/take对应的阻塞线程,和上面的item对应
// 等待着的线程
volatile Thread waiter; // to control park/unpark
//put,isData为true。否则为false。
// 是否是数据节点
final boolean isData;
...//省略
}
//单向链表的头部和尾部
/** Head of queue */
// 队列的头节点
transient volatile QNode head;
/** Tail of queue */
// 队列的尾节点
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
/**
大家知道 删除一个节点 直接 A.CASNext(B, B.next) 就可以,但是当 节点 B 是整个队列中的末尾元素时,
一个线程删除节点B, 一个线程在节点B之后插入节点 这样操作容易致使插入的节点丢失, 这个cleanMe很像
ConcurrentSkipListMap 中的 删除添加的 marker 节点, 他们都是起着相同的作用
*/
transient volatile QNode cleanMe;//要删除的节点的前一个节点,cleanMe的下一个节点是因中断或超时需要删除的节点,
//这个节点所在线程被中断了,或者节点超时了,放在上面不要紧,取节点时候x==m,跳过即可。
//在清除 队列最尾端节点时, 不直接删除这个节点, 而是间删除节点的前继节点标示为 cleanMe 节点, 为下次删除做准备,
//队列里面只能全是生产者或者消费者,不可能生产和消费者共存。,然后消费者过来移除节点。(队列即可能是生产者队列也可能是消费者队列)
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
...//省略
/**
* Puts or takes an item.
*/
//TransferQueue 的 transfer方法。transfer谁在调用。每个节点的线程调用,共享同一个队列。
//transfer里面有加入队列,阻塞awaitFulfill,阻塞后唤醒,取节点内容,唤醒节点,取到节点返回,被唤醒返回 代码。
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);//生产者isData=true,消费者isData=false
for (;;) {
//多线程操作queue的成员变量tail,head,是会跟着改变的。t,h就不会变。t,h还是之前的那个tail和head。
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
//队列为空或者当前线程和队列中元素为同一种模式,都是put或者get
//1 加入队列排队阻塞(即可能是生产者队列也可能是消费者队列),然后等待唤醒。
if (h == t || t.isData == isData) { // empty or same-mode//第一个节点或者进来的节点跟尾节点生产消费类型一样,就加入排队,否则就是来取节点的。
QNode tn = t.next;
if (t != tail) // inconsistent read// 其他线程改变了尾节点,再次来
continue;
if (tn != null) { // lagging tail//其他线程为尾节点添加了next,但是还没有将tail指向新的节点,再次来
advanceTail(t, tn);//帮助将tail指向新的节点
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);//新建一个节点
//加入尾部
//加入队列,成功则T一定是是S的前一个节点
// 尾节点的next从null变为新节点,每一处都有可能被停住。失败了说明有其他线程将尾节点的next从null变为其他新节点
if (!t.casNext(null, s)) // failed to link in
continue;
//后移tail指针
advanceTail(t, s); // swing tail and wait// 改变尾节点,失败了,说明别的线程已经做了,自己什么都不用做。
//进入阻塞状态
//当前线程再这个函数的这里阻塞,唤醒時候,从这里执行,函數参数还是原来的。
//2 加入队列后排队阻塞等待
Object x = awaitFulfill(s, e, timed, nanos);//生产者或者消费者阻塞,等待唤醒。即可能是生产者被唤醒也可能是消费者被唤醒。
if (x == s) { // wait was cancelled//线程中断超时返回的x是节点s,否则就是正常取出返回。
clean(t, s);//节点的删除是节点上的线程自己来删除的,失效节点不影响正常的入队和出队,跳过即可。
return null;
}
//不是中断的返回。x是item,只不过被改变了。
//从阻塞中唤醒,确定已经处于队列中的第一个元素
if (!s.isOffList()) { // not already unlinked// 若s节点还没有从队列删除
advanceHead(t, s); // unlink if head// 改变头节点
if (x != null) // and forget fields// item != null,说明是消费者被生产者唤醒了,
s.item = s;//item变为自己
s.waiter = null;//节点等待的线程变为null。
}
return (x != null) ? (E)x : e;//生产者被唤醒x=null,返回原来的e,消费者被唤醒x!=null,返回x。
//当前线程可以和队列中的第一个元素进行配对
//取节点,取到节点后,取唤醒别人。
} else { // complementary-mode//如果队列里面之前是生产者,现在消费者过来了。
//取队列中的第一个元素// 获取头节点的下一个节点,,每次都是从头结点下一个节点开始获取数据
QNode m = h.next; // node to fulfill
//不一致读,重新执行for循环
if (t != tail || m == null || h != head)
continue; // inconsistent read// 其他线程改变了头尾节点,再次来
Object x = m.item;
//已经配对过
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled// m节点的item=节点自己,这个节点m的线程被中断了或者超时了,就跳过这个节点,不使用,
// 消费者过来,就把节点m的内容从节点内容变为null,生产者过来,就把节点内容从null变为生产者的内容。此时节点的内容不再等于节点的内容。
!m.casItem(x, e)) { // lost CAS 尝试配对
advanceHead(h, m); // dequeue and retry 已经配对过,直接出队列 // 将m设置为头结点,h出列,然后重试
continue;
}
// 成功匹配了,m设置为头结点h出列,向前推进,移除匹配到的节点
//配对成功,出队列
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
/**
* Spins/blocks until node s is fulfilled.
*
* @param s the waiting node
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
//awaitFulfill里面有自旋,阻塞,唤醒后代码
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {//s是节点,e是节点元素
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;// 计算超时时间点
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?//这个节点是第一个节点,超时旋转32次不超时旋转512次,不是第一个节点不旋转。
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())// 被中断的线程不一定要立即停止正在做的事情。相反,中断是礼貌地请求另一个线程在它愿意并且方便的时候停止它正在做的事情。
s.tryCancel(e);//线程中断,就将这个线程所在的节点的item内容设置成节点自己
Object x = s.item;//节点的内容不在等于节点的内容(即可能是生产者被唤醒也可能是消费者被唤醒),生产者被唤醒了返回null,消费者被唤醒了返回e。
if (x != e)//线程中断了,item=s,item!=e,此时也返回,只不过返回的x=节点自己。
//线程中断了,item=s,item!=e,此时也返回,只不过返回的x=节点自己。
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
/**
* Gets rid of cancelled node s with original predecessor pred.
*/
//执行清除的时候,入队和出队正常执行,取到这个节点时候跳过即可。
//清除这个节点的线程就是这个节点上阻塞的线程来循环移除。cleanMe只有一个,多个尾节点失效时候就会有多线程来调用clean方法,
void clean(QNode pred, QNode s) {//s是提出的节点,pred是前一个节点
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {//删除的节点是第一个节点h.next
advanceHead(h, hn);//推进头节点,头节点的next=头节点,pred.next = pred != s,
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;// 队列为空,直接return null
QNode tn = t.next;
if (t != tail)// 不一致,说明有其他线程改变了tail节点,重新开始
continue;
if (tn != null) {// tn != null 推进tail节点,重新开始
advanceTail(t, tn);
continue;
}
//列表上最后插入的节点不能被删除。 我们将其前驱设置为“CleanMe”,
if (s != t) { // If not tail, try to unsplice// 移除的节点不是尾节点,接以pred.casNext(s, s.next)方式来进行删除
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
// s是尾节点,
QNode dp = cleanMe;
//此时cleanMe != null,先删除cleanMe标记需要删除的节点,然后将cleanMe置为null,让后再将pred赋值给cleanMe
if (dp != null) { // Try unlinking previous cancelled node// 如果dp不为null,先删除上次需要删除的节点d,
QNode d = dp.next;//cleanMe标记需要删除的节点d,
QNode dn;
if (d == null || // d is gone or// 节点d已经删除
d == dp || // d is off list or// 原来的节点 cleanMe 已经通过 advanceHead 进行删除
!d.isCancelled() || // d not cancelled or// 原来的节点 s已经删除
(d != t && // d not tail and// d 不是tail节点
(dn = d.next) != null && // has successor
dn != d && // that is on list
// 删除cleanMe标记的节点d,多线程访问只有一个成功,失败的继续循环,
// 此时这个尾节点有可能是中间节点了,直接删除。真正尾节点不会删除。
dp.casNext(d, dn))) // d unspliced
// 清除 cleanMe 置为null,
casCleanMe(dp, null);
if (dp == pred)//dp == pred 若成立, 说明这次删除的就是上次要删除的, 直接return, 不然的话要再次循环来删除这次需要删除的节点。
return; // s is already saved node
//cleanMe == null, 上次没有要删除的节点,这次因为是尾节点也不删除,则 前继节点pred标记为 cleanMe, 为下次删除做准备。下次是别的失效的节点的线程。
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
...//省略
}
TransferStack
TransferStack因为是栈,所以用一个单向链表实现,只有head指针。
栈中的节点有两种:
- REQUEST表示消费者
- DATA表示生产者
FULFILLING表示正在匹配中。
下面举个例子:
- 阶段(a):head指向NULL。不同于TransferQueue,这里没有空的头节点。
- 阶段(b):3个线程调用3次put,依次入栈。
- 阶段(c):线程4调用take,和栈顶的第1个元素配对,生成FULLFILLING节点,入栈。
- 阶段(d):栈顶的2个元素同时出栈。
// 传输器,即两个线程交换元素使用的东西
static final class TransferStack<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual stack algorithm, differing,
* among other ways, by using "covering" nodes rather than
* bit-marked pointers: Fulfilling operations push on marker
* nodes (with FULFILLING bit set in mode) to reserve a spot
* to match a waiting node.
*/
/* Modes for SNodes, ORed together in node fields */
/** Node represents an unfulfilled consumer */
// 栈中节点的几种类型:
// 1. 消费者(请求数据的)
static final int REQUEST = 0;//消费者
// 2. 生产者(提供数据的
/** Node represents an unfulfilled producer */
static final int DATA = 1;//生产者
// 3. 二者正在匹配中
/** Node is fulfilling another unfulfilled DATA or REQUEST */
static final int FULFILLING = 2;//表示正在进行交易的节点。
/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }//FULFILLING返回true,是否是正在进行交易的生产者或者消费者。
/** Node class for TransferStacks. */
// 栈中的节点
static final class SNode {
//单向链表
// 下一个节点
volatile SNode next; // next node in stack
//配对的节点
// 匹配者
volatile SNode match; // the node matched to this
//对应的阻塞线程
// 等待着的线程
volatile Thread waiter; // to control park/unpark
//item域和mode域不需要使用volatile修饰,因为它们在volatile/atomic操作之前写,之后读
Object item; // data; or null for REQUESTs
//三种模式
// 模式,也就是节点的类型,是消费者,是生产者,还是正在匹配中
int mode;
// Note: item and mode fields don't need to be volatile
// since they are always written before, and read after,
// other volatile/atomic operations.
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
* Waiters block until they have been matched.
*
* @param s the node to match
* @return true if successfully matched to s
*/
// SNode里面的方向,调用者m是s的下一个节点
// 这时候m节点的线程应该是阻塞状态的
boolean tryMatch(SNode s) {//匹配成功,则unpark等待线程
// 如果m还没有匹配者,就把s作为它的匹配者
if (match == null &&//设置本结点的匹配为s节点
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
// 唤醒m中的线程,两者匹配完毕
LockSupport.unpark(w);
}
// 匹配到了返回true
return true;
}
// 可能其它线程先一步匹配了m,返回其是否是s
return match == s;
}
/**
* Tries to cancel a wait by matching node to itself.
*/
//取消这个节点,match从原来的null变为this
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
...//省略
}
/**
* Creates or resets fields of a node. Called only from transfer
* where the node to push on stack is lazily created and
* reused when possible to help reduce intervals between reads
* and CASes of head and to avoid surges of garbage when CASes
* to push nodes fail due to contention.
*/
static SNode snode(SNode s, Object e, SNode next, int mode) {//入账,新进来节点下一个节点是head节点。
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
//transfer()方法同时实现了取元素和放元素的功能
E transfer(E e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
SNode s = null; // constructed/reused as needed
// 根据e是否为null决定是生产者还是消费者
int mode = (e == null) ? REQUEST : DATA;//消费者是0生产者是1
for (;;) {
// 自旋+CAS,熟悉的套路,熟悉的味道
// 栈顶元素
SNode h = head;
// 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的
// 也就是都是生产者节点或者都是消费者节点
//同一种模式
//入队2步:构建新节点新节点.next=原来头节点,原来头节点变为新节点。
if (h == null || h.mode == mode) { // empty or same-mode
// 如果有超时而且已到期
if (timed && nanos <= 0) { // can't wait
// 如果头节点不为空且是取消状态
if (h != null && h.isCancelled())
// 就把头节点弹出,并进入下一次循环
casHead(h, h.next); // pop cancelled node
else
// 否则,直接返回null(超时返回null)
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {//入栈//线程栈里面构建节点s,头节点指向最新进来的节点s,s.next=原来头节点
// 入栈成功(因为是模式相同的,所以只能入栈)
// 调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到
SNode m = awaitFulfill(s, timed, nanos);//阻塞等待// 等待 匹配,线程阻塞时候局部变量保留,唤醒时候再次使用不变。
// 如果m等于s,说明取消了,那么就把它清除掉,并返回null
// 返回match到的节点m == s节点自己, 表示该节点被取消了或者超时、中断了
if (m == s) { // wait was cancelled m是match节点
clean(s);
// 被取消了返回null
return null;
}
// 先唤醒再移除节点,唤醒之后这里执行,唤醒这里执行时候有可能还没有移除,这里就帮助移除。
// 唤醒的是第一个生产节点s=62,此时head是交易节点81, h.next == s
// 到这里说明匹配到元素了
// 因为从awaitFulfill()里面出来要不被取消了要不就匹配到了
// 如果头节点不为空,并且头节点的下一个节点是s
// 就把头节点换成s的下一个节点
// 也就是把h和s都弹出了
// 也就是把栈顶两个元素都弹出了
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller// 将s.next节点设置为head,相当于取消节点h、s,帮助移除。
// 根据当前节点的模式判断返回m还是s中的值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 取节点,头节点不是正在取节点的节点,头节点没有配对。
//取节点:线程过来调用transfer方法,线程在堆里面创建一个节点改变模式为FULFILLING,加到Stack里面去,然后配对,移除2个节点,返回值
} else if (!isFulfilling(h.mode)) { // try to fulfill 非同一种模式,待匹配
// 到这里说明头节点和当前节点模式不一样
// 如果头节点不是正在匹配中
// 如果头节点已经取消了,就把它弹出栈
if (h.isCancelled()) // already cancelled// 取节点看头节点有没有取消
casHead(h, h.next); // pop and retry
//消费者s进来也要入队成为头节点,将这个节点s的模式变为FULFILLING,s是交易节点,
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//生成一个FULFILLING节点入栈
// 头节点没有在匹配中,就让当前节点先入队,再让他们尝试匹配
// 且s成为了新的头节点,它的状态是正在匹配中
for (;;) { // loop until matched or waiters disappear// 比while效率高
SNode m = s.next; // m is s's match//s是交易节点81,m是被匹配的节点62
// 如果m为null,说明除了s节点外的节点都被其它线程先一步匹配掉了
// 就清空栈并跳出内部循环,到外部循环再重新入栈判断
if (m == null) { // all waiters are gone// m == null,其他帮助配对的线程移除了
casHead(s, null); // pop fulfill node// 将s弹出
s = null; // use new node next time// 将s置空,下轮循环的时候还会新建,帮助GC
break; // restart main loop// 退出该循环,继续主循环
}
SNode mn = m.next;
// 如果m和s尝试匹配成功,就弹出栈顶的两个元素m和s
if (m.tryMatch(s)) {//尝试匹配//s节点和m节点匹配,配对时候唤醒节点
casHead(s, mn); // pop both s and m 被匹配的两个节点一起出栈// 匹配成功,将s 、 m弹出,完成交易之后将两个节点一起弹出,并且返回交易的数据。
// 返回匹配结果
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 尝试匹配失败,说明m已经先一步被其它线程匹配了
// 就协助清除它
s.casNext(m, mn); // help unlink// 如果没有匹配成功,m有可能取消了,那么就需要把m从栈中移除。s继续跟mn做交易 没有人指向m就会被回收。
}
}
//如果栈顶已经存在一个模式为FULFILLING的节点,说明栈顶的节点正在进行匹配,那么就帮助这个栈顶节点快速完成交易,然后继续交易。
} else { // help a fulfiller 已经匹配过了,出栈
// 到这里说明当前节点和头节点模式不一样
// 且头节点是正在匹配中
SNode m = h.next; // m is h's match// h=81,m=62,h是交易节点,m是被配对节点,
if (m == null) // waiter is gone// m == null ,执行到这一行时候,配对已经被别的帮助线程或者交易节点线程自己执行完了,
// 如果m为null,说明m已经被其它线程先一步匹配了
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;//跟取节点差不多mn=56
// 协助匹配,如果m和s尝试匹配成功,就弹出栈顶的两个元素m和s
if (m.tryMatch(h)) // help match//62和81配对
// 将栈顶的两个元素弹出后,再让s重新入栈
casHead(h, mn); // pop both h and m 配对,一起出栈//帮助移除。h和m
else // lost match
// 尝试匹配失败,说明m已经先一步被其它线程匹配了
// 就协助清除它
h.casNext(m, mn); // help unlink//帮助匹配失败,h的下一个节点变为下一个的下一个节点。 没有人指向m就会被回收。
}
}
}
}
/**
* Spins/blocks until node s is matched by a fulfill operation.
*
* @param s the waiting node
* @param timed true if timed wait
* @param nanos timeout value
* @return matched node, or s if cancelled
*/
// 三个参数:需要等待的节点,是否需要超时,超时时间
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
/*
* When a node/thread is about to block, it sets its waiter
* field and then rechecks state at least one more time
* before actually parking, thus covering race vs
* fulfiller noticing that waiter is non-null so should be
* woken.
*
* When invoked by nodes that appear at the point of call
* to be at the head of the stack, calls to park are
* preceded by spins to avoid blocking when producers and
* consumers are arriving very close in time. This can
* happen enough to bother only on multiprocessors.
*
* The order of checks for returning out of main loop
* reflects fact that interrupts have precedence over
* normal returns, which have precedence over
* timeouts. (So, on timeout, one last check for match is
* done before giving up.) Except that calls from untimed
* SynchronousQueue.{poll/offer} don't check interrupts
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
*/
// 到期时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 自旋次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 当前线程中断了,尝试清除s
if (w.isInterrupted())
s.tryCancel();
// 检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s)
SNode m = s.match;
// 如果匹配到了,直接返回m
if (m != null)
return m;
// 如果需要超时
if (timed) {
// 检查超时时间如果小于0了,尝试清除s
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0)
// 如果还有自旋次数,自旋次数减一,并进入下一次自旋
spins = shouldSpin(s) ? (spins-1) : 0;
// 后面的elseif都是自旋次数没有了
else if (s.waiter == null)
// 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
// 如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
// 如果允许超时且还有剩余时间,就阻塞相应时间
LockSupport.parkNanos(this, nanos);
}
}
...//省略
}
transfer方法:分三种情况
- 如果栈为空或者如果插入数据跟栈里面元素的mode相同:
- CAS head指针入栈
- awaitFulfill里面自旋后阻塞,节点的waiter设置为本线程
- 被唤醒后返回匹配的item
- 如果插入数据跟栈里面元素mode不同:
- 从头找到第一个非取消状态的节点
- 生成一个FULFILLING节点入栈
- 匹配前两个节点,一起出栈
- 如果header节点为FULFILLING,则头结点正在匹配,往后找节点匹配出栈。
SynchronousQueue在队列中无可用元素的时候是生成一个单链表保存阻塞的线程,所以严格来说不算无缓冲。如果生产者、消费者之间速度匹配不上,则有可能会产生OOM。
put
直接调用传输器的transfer()方法,传入传输的元素,是否需要超时,超时的时间。
public void put(E e) throws InterruptedException {
// 元素不可为空
if (e == null) throw new NullPointerException();
// 直接调用传输器的transfer()方法
// 三个参数分别是:传输的元素,是否需要超时,超时的时间
if (transferer.transfer(e, false, 0) == null) {
// 如果传输失败,直接让线程中断并抛出中断异常
Thread.interrupted();
throw new InterruptedException();
}
}
take
也是直接调用传输器的transfer()方法,第一个参数为null表示消费者,要取元素。
public E take() throws InterruptedException {
// 直接调用传输器的transfer()方法
// 三个参数分别是:null,是否需要超时,超时的时间
// 第一个参数为null表示是消费者,要取元素
E e = transferer.transfer(null, false, 0);
// 如果取到了元素就返回
if (e != null)
return e;
// 否则让线程中断并抛出中断异常
Thread.interrupted();
throw new InterruptedException();
}