ConcurrentLinkedQueue
ConcurrentLinkedQueue没有实现BlockingQueue接口,所以不是阻塞队列,不能用于线程池中,但是它是线程安全的。它由一个无界的单向链表实现。
属性
head头指针,tail尾指针。head/tail的更新可能落后于节点的入队和出队,因为它不是直接对head/tail指针进行CAS操作的,而是对Node中的item进行操作。
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
内部节点:只有next,是单向链表。
private static class Node<E> {
volatile E item;
volatile Node<E> next;
...//省略
}
初始化
初始化的时候,head和tail都指向null节点。没有容量,是无界的。
//在队列初始化时, head 节点存储的元素为空,tail 节点等于 head 节点。
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
public ConcurrentLinkedQueue(Collection<? extends E> c) {
// 遍历c,并把它元素全部添加到单链表中
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
入队
每次在队尾增加2个节点后,才移动一次tail节点。
假设线程1入队item2:
线程1入队item2
初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:
- 初始的时候p=tail,q=p.next=NULL
- 对p的next执行CAS操作,追加item2,成功之后,p=tail。所以
if (p != t)casTail(t, newNode);
这一段没有执行,直接返回。此时tail指针没有变化。
线程2入队item3
之后,假设线程2要入队item3节点,如下图所示
- 初始的时候p=tail,q=p.next;
- q !=NULL且p != q,走第三个分支,p后移一位,重新回到循环开头,q=p.next;
- 这时候q==NULL,对p的next执行CAS操作,入队item3节点。
if (p != t)casTail(t, newNode);
成立,执行casTail(t, newNode);
,tail指向新节点item3,即后移2个位置,到达队列尾部。
总结
- 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列;
- 只有当p !=tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。
上面步骤可以归结为:
- p去找真正的尾结点,然后CAS next
- 如果tail != p,则casTail到真正尾结点
代码
public boolean add(E e) {
return offer(e);
}
//当一个线程入列一个数据时,会将该数据封装成一个 Node 节点,并先获取到队列的队尾节点,
// 当确定此时队尾节点的 next 值为 null 之后,再通过 CAS 将新队尾节点的 next 值设为新节点。
// 此时 p != t,也就是设置 next 值成功,然后再通过 CAS 将队尾节点设置为当前节点即可。
public boolean offer(E e) {
checkNotNull(e);
//创建入队节点
final Node<E> newNode = new Node<E>(e);
//t,p为尾节点,默认相等,采用失败即重试的方式,直到入队成功
for (Node<E> t = tail, p = t;;) {
//获取队尾节点的下一个节点
Node<E> q = p.next;
//如果q为null,则代表p就是队尾节点
if (q == null) {
//将入列节点设置为当前队尾节点的next节点
// p is last node
if (p.casNext(null, newNode)) {//关键点:是对tail的next指针而不是对tail指针执行CAS操作
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
//判断tail节点和p节点距离达到两个节点
//如果tail不是尾节点则将入队节点设置为tail。
// 如果失败了,那么说明有其他线程已经把tail移动过
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.//每入两个节点,后移一次tail指针,失败也没有关系
return true;
}
// Lost CAS race to another thread; re-read next
}
// 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head; //已经到达队列尾部
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;//定位队列真正的对尾节点 //后移p指针
}
}
出队
出队的时候,把item置为null之后出队原来的item。
假设初始的时候head指向空节点,队列中有item1、item2、item3三个节点。
出队元素
- p=head,head的item为null,p节点的下一个节点(q=p.next)不为null,且p!=q,走最后一个分支,p后移一位,使得p==q;
- 重新回到循环开头,这时候p指向的元素不为null,则CAS置换为null。这时候
if (p != h)
该条件成立,执行updateHead(h, ((q = p.next) != null) ? q : p)
,让header跳过两个null节点,指向非null的header;
总结:
- 出队列的判断并非观察tail 指针的位置,而是依赖于head 指针后续的节点是否为NULL这一条件;
- 只要对节点的item 执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。
//首先获取 head 节点,并判断 item 是否为 null,如果为空,则表示已经有一个线程刚刚进行了出列操作,
// 然后更新 head 节点;如果不为空,则使用 CAS 操作将 head 节点设置为 null,
// CAS 就会成功地直接返回节点元素,否则还是更新 head 节点。
public E poll() {
// 设置起始点
restartFromHead:
for (;;) {
//p获取head节点
for (Node<E> h = head, p = h, q;;) {
//获取头节点元素
E item = p.item;
//如果头节点元素不为null,通过cas设置p节点引用的元素为null
if (item != null && p.casItem(item, null)) {//关键点:在出队列的时候,并没有移动head指针,而是把item置为了null
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time//每生产两个null节点,才把head指针后移2位
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//如果p节点的下一个节点为null,则说明这个队列为空,更新head结点
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//节点出队失败,重新跳到restartFromHead来进行出队
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
判空
因为head/tail指针有滞后性,所以需要从头遍历队列直到查找非null的节点来判空。
public boolean isEmpty() {
return first() == null;
}
//寻找第一个不为NULL的节点
/**
* 这个方法和poll/peek方法差不多,只不过返回的是Node而不是元素。
*
* 之所以peek方法没有复用first方法的原因有2点
* 1. 会增加一次volatile读
* 2. 有可能会因为和poll方法的竞争,导致出现非期望的结果。
* 比如first返回的node非null,里面的item也不是null。
* 但是等到poll方法返回从first方法拿到的node的item的时候,item已经被poll方法CAS为null了。
* 那这个问题只能再peek中增加重试,这未免代价太高了。
*
* 这就是first和peek代码没有复用的原因。
*/
Node<E> first() {
restartFromHead:
for (;;) {//从head开始遍历,获取第一个不为null的节点
for (Node<E> h = head, p = h, q;;) {
//头节点是否有元素
boolean hasItem = (p.item != null);
//头节点有元素或当前链表只有一个节点
if (hasItem || (q = p.next) == null) {//注意,这里是整个节点==null,而不是item==null。所以这里会一直循环直到找到item非null的节点。或者next就是null了
updateHead(h, p);
return hasItem ? p : null;//头节点有值返回节点,否则返回null
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
LinkedTransferQueue
LinkedTransferQueue是阻塞队列,它是SynchronousQueue、LinkedBlockingQueue和ConcurrentLinkedQueue三者的结合体,综合了这三者的方法。
LinkedTransferQueue实现了TransferQueue接口,这个接口继承于BlockingQueue,所以LinkedTransferQueue是带阻塞功能的。
存储结构
LinkedTransferQueue使用的数据结构是dual queue,翻译为双重队列。队列中的节点只有一种,要不全是数据节点,要不全是请求节点。
取元素的时候跟头结点比对,如果头结点是数据节点则CAS其item来匹配,如果是请求节点则入队尾。
放元素的时候跟头结点比对,如果头结点是请求节点则CAS item匹配,如果是数据节点则入队尾。
示例:
属性
头结点、尾结点。
放取元素的几种方式:
- NOW:立即返回,用于非超时的poll()和tryTransfer()方法中
- ASYNC:异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
- SYNC:同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
- TIMED:超时,用于有超时的poll()和tryTransfer()方法中
/** head of the queue; null until first enqueue */
transient volatile Node head;
/** tail of the queue; null until first append */
private transient volatile Node tail;
/*
* Possible values for "how" argument in xfer method.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
节点类:
- isData:是否是数据节点
- 元素的值
- 下一个节点
- 当前阻塞的线程
static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null until waiting
构造器
可以看出来没有容量限制。
/**
* Creates an initially empty {@code LinkedTransferQueue}.
*/
public LinkedTransferQueue() {
}
/**
* Creates a {@code LinkedTransferQueue}
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedTransferQueue(Collection<? extends E> c) {
this();
addAll(c);
}
入队
调用的xfer。
xfer的签名E xfer(E e, boolean haveData, int how, long nanos)
:
- e:表示入队的元素
- haveData:为true表示当前是插入数据
- how:NOW, ASYNC, SYNC, or TIMED之一,表示放取元素的方式
- nanos:超时时间
//阻塞模式
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
出队
出队也是使用的xfer。这里传入的how就有集中了:TIMED、NOW、SYNC。
public E take() throws InterruptedException {
// 同步模式,会阻塞直到取到元素
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E poll() {
return xfer(null, false, NOW, 0);
}
移交元素
这三个方法也是放元素的方法。
public boolean tryTransfer(E e) {
// 立即返回
return xfer(e, true, NOW, 0) == null;
}
// 移交元素
public void transfer(E e) throws InterruptedException {
// 同步模式
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
// 尝试移交元素(有超时时间)
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 有超时时间
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
xfer
我们来看看所有函数都用到的xfer到底是怎么样的:
里面用到的函数:
- tryAppend:从tail开始遍历,把节点放到链表尾部
- awaitMatch:等待被匹配
大致逻辑:
- 从头开始遍历,如果元素与头结点不一致,则尝试CAS Item匹配
- 成功的话后移head节点,唤醒等待的线程,并返回匹配到的元素;
- 不成功则往后移动继续匹配,直到到达队尾都匹配不上,则跳到2
- 如果元素与头结点一致,则退出遍历的循环,尝试入队
- 从tail开始遍历,把s放到链表尾端。入队过程中可能尾结点变了,则后移尾结点,直到CAS成功
- 入队失败则从1开始重试
- 入队成功,如果是异步节点,直接返回。如果是非异步节点,即同步或者有超时,则等待被匹配后返回
- 不断自旋,过程中检查item是否改变了,改变了则是被匹配了,返回被匹配的元素
- 如果item没改变,如果前面有节点且正在匹配中则自旋,随机让出CPU,自旋到一定次数还没被匹配则阻塞
private E xfer(E e, boolean haveData, int how, long nanos) {
// 不允许放入空元素
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
// 外层循环,自旋,失败就重试
retry:
for (;;) { // restart on append race
// 下面这个for循环用于控制匹配的过程
// 同一时刻队列中只会存储一种类型的节点
// 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
// 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
for (Node h = head, p = h; p != null;) { // find & match first node
// p节点的模式
boolean isData = p.isData;
// p节点的值
Object item = p.item;
// p没有被匹配到
if (item != p && (item != null) == isData) { // unmatched
// 如果两者模式一样,则不能匹配,跳出循环后尝试入队
if (isData == haveData) // can't match
break;
// 如果两者模式不一样,则尝试匹配
// 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
if (p.casItem(item, e)) { // match
// 匹配成功
// for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
// 处理head,后移
for (Node q = p; q != h;) {
// 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
Node n = q.next; // update by 2 unless singleton
// 如果head还没变,就把它更新成新的节点
// 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
// 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
// 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
// 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
// 如果新的头节点为空,或者其next为空,或者其next未匹配,就break
// 只有当新的头结点不为空,且其next不为空,且其next被匹配了,则重试
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 唤醒p中等待的线程
LockSupport.unpark(p.waiter);
// 并返回匹配到的元素
return LinkedTransferQueue.<E>cast(item);
}
}
// p已经被匹配了或者尝试匹配的时候失败了
// 也就是其它线程先一步匹配了p
// 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
// 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
//注:如果一个节点的next指向了自己,则这个节点等待被删除
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 到这里肯定是队列中存储的节点类型和自己一样
// 或者队列中没有元素了
// 就入队(不管放元素还是取元素都得入队)
// 入队又分成四种情况:
// NOW,立即返回,没有匹配到立即返回,不做入队操作
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
// 如果不是立即返回
if (how != NOW) { // No matches available
// 新建s节点
if (s == null)
s = new Node(e, haveData);
// 尝试入队
Node pred = tryAppend(s, haveData);
// 入队失败,重试
if (pred == null)
continue retry; // lost race vs opposite mode
// 如果不是异步(同步或者有超时)
// 就等待被匹配
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
private Node tryAppend(Node s, boolean haveData) {
// 从tail开始遍历,把s放到链表尾端
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 如果首尾都是null,说明链表中还没有元素
if (p == null && (p = head) == null) {
// 就让首节点指向s
// 注意,这里插入第一个元素的时候tail指针并没有指向s
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
// 如果p无法处理,则返回null
// 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队
// 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,
// 队列中所有的元素都要保证是同一种类型的节点
// 返回null后外面的方法会重新尝试匹配重新入队等
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
// 如果p的next不为空,说明不是最后一个节点
// 则让p重新指向最后一个节点
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
// 如果CAS更新s为p的next失败
// 则说明有其它线程先一步更新到p的next了
// 就让p指向p的next,重新尝试让s入队
p = p.next; // re-read on CAS failure
else {
// 到这里说明s成功入队了
// 如果p不等于t,就更新tail指针
// 还记得上面插入第一个元素时tail指针并没有指向新元素吗?
// 这里就是用来更新tail指针的
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
// 返回p,即s的前一个元素
return p;
}
}
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 如果是有超时的,计算其超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 自旋次数
int spins = -1; // initialized after first item and cancel checks
// 随机数,随机让一些自旋的线程让出CPU
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 如果s元素的值不等于e,说明它被匹配到了
if (item != e) { // matched
// assert item != s;
// 把s的item更新为s本身
// 并把s中的waiter置为空
s.forgetContents(); // avoid garbage
// 返回匹配到的元素
return LinkedTransferQueue.<E>cast(item);
}
// 如果当前线程中断了,或者有超时的到期了
// 就更新s的元素值指向s本身
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
// 尝试解除s与其前一个节点的关系
// 也就是删除s节点
unsplice(pred, s);
// 返回元素的值本身,说明没匹配到
return e;
}
// 如果自旋次数小于0,就计算自旋次数
if (spins < 0) { // establish spins at/near front
// spinsFor()计算自旋次数
// 如果前面有节点未被匹配就返回0
// 如果前面有节点且正在匹配中就返回一定的次数,等待
if ((spins = spinsFor(pred, s.isData)) > 0)
// 初始化随机数
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
// 还有自旋次数就减1
--spins;
// 并随机让出CPU
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
// 更新s的waiter为当前线程
s.waiter = w; // request unpark then recheck
}
else if (timed) {
// 如果有超时,计算超时时间,并阻塞一定时间
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// 不是超时的,直接阻塞,等待被唤醒
// 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了
LockSupport.park(this);
}
}
}
浅谈Disruptor
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。目前已经很稳定的甚少更新,在Apache Storm、Camel、Log4j2等项目中都有应用到。
我们先来回顾一下之前所学的队列:
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
实际应用中,通常使用的数组或链表,只有在需要有优先级的情况下才使用堆。我们还需要考虑两个条件:
- 需要是有界的,可以防止内存溢出;
- 为了减少垃圾回收,需要尽量使用arraylist或者heap
综上所述,我们只有ArrayBlockingQueue可以选。
ArrayBlockingQueue的问题
ArrayBlockingQueue在实际应用中,因为加锁和伪共享,会造成一定的性能问题:
加锁
Disruptor论文里面的数据,显示了加锁导致的性能问题:
这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
机器环境:2.4G 6核
运算: 64位的计数器累加5亿次
实验数据:
Method | Time (ms) |
---|---|
Single thread | 300 |
Single thread with CAS | 5,700 |
Single thread with lock | 10,000 |
Single thread with volatile write | 4,700 |
Two threads with CAS | 30,000 |
Two threads with lock | 224,000 |
笔者在自己的笔记本上做的实验如下:
代码:
单线程
public class DisruptorExperimentSingle {
long cnt = 500_000_000;
long num = 0;
private static final long numOffset;
private static final Unsafe unsafe;
volatile long vNum = 0;
static {
try {
unsafe = reflectGetUnsafe();
numOffset = unsafe.objectFieldOffset(DisruptorExperimentSingle.class.getDeclaredField("num"));
} catch (Exception ex) { throw new Error(ex); }
}
private static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
void singleUnlock(){
num = 0;
long start = System.currentTimeMillis();
for(long i = 0;i < cnt;++i){
++num;
}
assert num == cnt;
System.out.println("singleUnlock consume " + String.valueOf(System.currentTimeMillis() - start));
}
void singleCAS(){
num = 0;
long start = System.currentTimeMillis();
for(long i = 0;i < cnt;++i) {
unsafe.getAndAddLong(this, numOffset, 1);
}
assert num == cnt;
System.out.println("singleCAS consume " + String.valueOf(System.currentTimeMillis() - start));
}
void singleLock(){
num = 0;
long start = System.currentTimeMillis();
Lock lock = new ReentrantLock();
for(long i = 0;i < cnt;++i){
lock.lock();
++num;
lock.unlock();
}
assert num == cnt;
System.out.println("singleLock consume " + String.valueOf(System.currentTimeMillis() - start));
}
void singleSynchronize(){
num = 0;
long start = System.currentTimeMillis();
Lock lock = new ReentrantLock();
for(long i = 0;i < cnt;++i){
synchronized (this){
++num;
}
}
assert num == cnt;
System.out.println("singleSynchronize consume " + String.valueOf(System.currentTimeMillis() - start));
}
void singleVolatile(){
vNum = 0;
long start = System.currentTimeMillis();
for(long i = 1;i <= cnt;++i){
vNum = i;
}
assert vNum == cnt;
System.out.println("singleVolatile consume " + String.valueOf(System.currentTimeMillis() - start));
}
public static void main(String[] args) {
DisruptorExperimentSingle disruptorExperiment = new DisruptorExperimentSingle();
disruptorExperiment.singleUnlock();
disruptorExperiment.singleCAS();
disruptorExperiment.singleLock();
disruptorExperiment.singleSynchronize();
disruptorExperiment.singleVolatile();
}
}
多线程
public class DisruptorExperimentMulti implements Runnable{
public static int NUM_THREADS = 4;
long cnt = 500_000_000;
long num = 0;
private static final long numOffset;
private static final Unsafe unsafe;
private static int type;
static {
try {
unsafe = reflectGetUnsafe();
numOffset = unsafe.objectFieldOffset(DisruptorExperimentMulti.class.getDeclaredField("num"));
} catch (Exception ex) { throw new Error(ex); }
}
private static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
void multiCAS() throws InterruptedException {
num = 0;
long start = System.currentTimeMillis();
Thread[] threads = new Thread[NUM_THREADS];
for(int i = 0;i < NUM_THREADS;++i){
threads[i] = new Thread(new DisruptorExperimentMulti());
}
type = 1;
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
assert num == cnt;
System.out.println("multiCAS consume " + String.valueOf(System.currentTimeMillis() - start));
}
void multiLock() throws InterruptedException {
long start = System.currentTimeMillis();
num = 0;
Thread[] threads = new Thread[NUM_THREADS];
for(int i = 0;i < NUM_THREADS;++i){
threads[i] = new Thread(new DisruptorExperimentMulti());
}
type = 0;
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
assert num == cnt;
System.out.println("multiLock consume " + String.valueOf(System.currentTimeMillis() - start));
}
@Override
public void run() {
if(type == 0){//lock
for(int i = 0;i < cnt/NUM_THREADS;++i) {
synchronized (DisruptorExperimentMulti.class) {
++num;
}
}
} else {//cas
for(int i = 0;i < cnt/NUM_THREADS;++i) {
unsafe.getAndAddLong(this, numOffset, 1);
}
}
}
public static void main(String[] args) throws InterruptedException {
DisruptorExperimentMulti disruptorExperimentMulti = new DisruptorExperimentMulti();
disruptorExperimentMulti.multiCAS();
disruptorExperimentMulti.multiLock();
}
}
实验结果:其中jvm对volatile long的读写是原子的,这里
指出来:
Writes and reads of volatile long and double values are always atomic.
时间(ms) | 无锁 | lock加锁 | synchronize加锁 | CAS | volatile write |
---|---|---|---|---|---|
单线程 | 270 | 7059 | 8353 | 2363 | 2242 |
多线程 | N/A | N/A | 28106 | 1447 | N/A |
可以看出来其性能:
- 单线程:不加锁 > CAS > 加锁
- 多线程:CAS 大约是20倍加锁性能
综上所述,加锁的性能是最差的,比CAS差了很多个数量级,以致于使用CAS来替换加锁所得到的收益非常大。
伪共享
伪共享的性能,在前面讨论过。
这里详细说一下ArrayBlockingQueue为什么有伪共享的问题。
ArrayBlockingQueue有三个成员变量:
- takeIndex:取数的下标
- putIndex:放数的下标
- count:队列中元素个数
其类型是三个int,他们很可能被放到同一个CacheLine里面去,所以会导致同时put和take的时候,会造成伪共享的问题。
Disruptor的设计方案
那么我们来看看Disruptor是怎么设计来规避上面这些问题的。
- RingBuffer环形数组:
- 数组对处理器的缓存友好
- 相对于使用链表,数组可以避免频繁垃圾回收
- 无锁设计
- 每个生产者/消费者线程先申请一段空间作为自己可以操作的元素。申请到之后,在这片区域写入/读取元素
操作示例
单线程写
首先是申请可写的序号,申请到之后就可以在当前cursor到可写序号之内写入数据。
多线程写
多线程写,需要给每个线程分配一段独享空间。写入后需要设置availableBuffer的相应位置为available。availableBuffer是一个与RingBuffer一样大小的的buffer,写入的时候需要设置相应的位置为已写入。如图,Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。
多线程写的情况下读数据
首先申请读取到序号n,然后需要遍历availableBuffer中是否当前位置已经写入,才可以消费数据。
性能
性能测试报告:
- Async Appender: ArrayBlockingQueue
- Loggers all async: Disruptor
单线程情况下,loggers all async与Async Appender吞吐量相差不大,但是在64个线程的时候,loggers all async的吞吐量比Async Appender增加了12倍,是Sync模式的68倍。