JDK1.6 SynchronousQueue

介绍

  • SynchronousQueue是一种特殊的阻塞队列,它本身没有容量,只有当一个线程从队列取数据的同时,另一个线程才能放一个数据到队列中,反之亦然。存取过程相当于一个线程把数据(安全的)交给另一个线程的过程。
  • SynchronousQueue也支持公平和非公平模式。

源码分析

SynchronousQueue内部采用伪栈和伪队列来实现,分别对应非公平模式和公平模式。

SynchronousQueue#Transferer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Shared internal API for dual stacks and queues.
*/
// 1
static abstract class Transferer {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
// 2
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract Object transfer(Object e, boolean timed, long nanos);
}

伪栈和伪队列的公共基类。
标注代码分析

  1. 转移数据的方法,用来实现put或者take。
  2. 如果不为null,相当于将一个数据交给消费者;如果为null,相当于从一个生产者接收一个消费者交出的数据。

SynchronousQueue#TransferStack 伪栈实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/** Dual stack */
static final class TransferStack extends Transferer {
/*
* This extends Scherer-Scott dual stack algorithm, differing,
* among other ways, by using "covering" nodes rather than
* bit-marked pointers: Fulfilling operations push on marker
* nodes (with FULFILLING bit set in mode) to reserve a spot
* to match a waiting node.
*/

/* Modes for SNodes, ORed together in node fields */
/** Node represents an unfulfilled consumer */
// 1
static final int REQUEST = 0;
/** Node represents an unfulfilled producer */
// 2
static final int DATA = 1;
/** Node is fulfilling another unfulfilled DATA or REQUEST */
// 3
static final int FULFILLING = 2;

/** Return true if m has fulfilling bit set */
// 4
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

/** Node class for TransferStacks. */
static final class SNode {
// 5
volatile SNode next; // next node in stack
// 6
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
// Note: item and mode fields don't need to be volatile
// since they are always written before, and read after,
// other volatile/atomic operations.
// 7
SNode(Object item) {
this.item = item;
}

static final AtomicReferenceFieldUpdater<SNode, SNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(SNode.class, SNode.class, "next");

boolean casNext(SNode cmp, SNode val) {
return (cmp == next &&
nextUpdater.compareAndSet(this, cmp, val));
}

static final AtomicReferenceFieldUpdater<SNode, SNode>
matchUpdater = AtomicReferenceFieldUpdater.newUpdater
(SNode.class, SNode.class, "match");

/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
* Waiters block until they have been matched.
*
* @param s the node to match
* @return true if successfully matched to s
*/
// 8
boolean tryMatch(SNode s) {
if (match == null &&
matchUpdater.compareAndSet(this, null, s)) {
// 9
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
// 10
return match == s;
}

/**
* Tries to cancel a wait by matching node to itself.
*/
// 11
void tryCancel() {
matchUpdater.compareAndSet(this, null, this);
}

boolean isCancelled() {
return match == this;
}
}

/** The head (top) of the stack */
volatile SNode head;

static final AtomicReferenceFieldUpdater<TransferStack, SNode>
headUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferStack.class, SNode.class, "head");

boolean casHead(SNode h, SNode nh) {
return h == head && headUpdater.compareAndSet(this, h, nh);
}
....

TransferStack是伪栈实现。
标注代码分析

  1. 一个没有得到数据的消费者。
  2. 一个没有交出数据的生产者。
  3. 匹配另一个生产者或者消费者。
  4. 判断是否包含正在匹配(FULFILLING)的标记。
  5. 栈中的下一个节点。
  6. 和当前节点完成匹配的节点。
  7. item和mode不需要volatile修饰;是因为它们在其他的volatile/atomic操作之前写,之后读。(HB关系)
  8. 尝试匹配节点s和当前节点,如果匹配成功,唤醒等待线程。(向消费者传递数据或向生产者获取数据)调用tryMatch()来确定它们的等待线程,然后唤醒这个等待线程。
  9. 如果当前节点的match为空,那么CAS设置s为match,然后唤醒waiter。
  10. 如果match不为null,或者CAS设置match失败,那么比较match和s是否为相同对象。如果相同,说明已经完成匹配,匹配成功。
  11. 尝试取消当前节点(有线程等待),通过将match设置为自身。

SynchronousQueue#TransferStack#transfer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/**
* Puts or takes an item.
*/
Object transfer(Object e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
// 1
SNode s = null; // constructed/reused as needed
int mode = (e == null)? REQUEST : DATA;

for (;;) {
SNode h = head;
// 2
if (h == null || h.mode == mode) { // empty or same-mode
// 3
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
// 4
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {// 5
// 6
SNode m = awaitFulfill(s, timed, nanos);
// 7
if (m == s) { // wait was cancelled
// 8
clean(s);
return null;
}
// 9
if ((h = head) != null && h.next == s)
// 10
casHead(h, s.next); // help s's fulfiller
return mode == REQUEST? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 11
// 12
if (h.isCancelled()) // already cancelled
// 13
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {// 14
for (;;) { // loop until matched or waiters disappear
// 15
SNode m = s.next; // m is s's match
// 16
if (m == null) { // all waiters are gone
// 17
casHead(s, null); // pop fulfill node
// 18
s = null; // use new node next time
break; // restart main loop
}
// 19
SNode mn = m.next;
// 20
if (m.tryMatch(s)) {
// 21
casHead(s, mn); // pop both s and m
return (mode == REQUEST)? m.item : s.item;
} else // lost match
// 22
// 23
s.casNext(m, mn); // help unlink
}
}
} else { // 24 // help a fulfiller
// 25
SNode m = h.next; // m is h's match
// 26
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
// 27
SNode mn = m.next;
// 28
if (m.tryMatch(h)) // help match
// 29
casHead(h, mn); // pop both h and m
else //30 // lost match
// 31
h.casNext(m, mn); // help unlink
}
}
}
}

标注代码分析

  1. 基本算法是在一个无限循环中尝试下面三种情况里面的一种:
  • 如果当前栈为空或者包含与给定节点模式相同的节点,尝试将节点压入栈内,并等待一个匹配节点,最后返回匹配节点或者null(如果被取消)。
  • 如果当前栈包含于给定节点模式互补的节点,尝试将这个节点打上FULFILLING标记,然后压入栈中,和相应的节点进行匹配,然后将两个节点(当前节点和互补节点)弹出栈,并返回匹配节点的数据。匹配和删除动作不是必须要做的,因为其他线程会执行动作3。
  • 如果栈顶已经存在一个FULFILLING(正在满足其他节点)的节点,帮助这个节点完成匹配和移除(出栈)的操作。然后继续执行(主循环)。这部分代码基本和动作2的代码一样,只是不会返回节点的数据。
  1. head为null或者head和e的mode相同。
  2. 超时。
  3. 如果h不为null且被取消,弹出h。
  4. 创建一个SNode,赋给s,将原本的head节点做为其next节点,并尝试将其设置为新的head。
  5. 等待其他线程来满足当前线程。
  6. awaitFulfill方法返回后,判断下是否被取消。
  7. 如果取消,清理一下s节点。
  8. 因为上面已经将s设置为head,如果满足这个条件说明有其他节点t插入到s前面,变成了head,而且这个t就是和s匹配的节点,他们已经完成匹配。
  9. 将s的next节点设置为head。相当于把s和t一起移除了。
  10. 如果栈中存在头节点,且和当前节点不是相同模式,那么说明它们是一对儿对等的节点,尝试用当前节点s来满足h节点。
  11. 如果h节点已经被取消。
  12. 将h节点弹出,并将h节点的next节点设置为栈的head。
  13. 尝试将当前节点打上”正在匹配”的标记,并设置为head。
  14. s是当前节点,m是s的next节点,它们是正在匹配的两个节点。
  15. 如果m为空,可能其他节点把m匹配走了。
  16. 将s弹出。
  17. 将s置空,下轮循环的时候还会新建。
  18. 获取m的next节点,如果s和m匹配成功,mn就得补上head的位置了。
  19. 尝试匹配一下,匹配成功的话会把m上等待的线程唤醒。
  20. 如果匹配成功,把s和m弹出。
  21. 没匹配成功的话,说明m可能被其他节点满足了。
  22. 说明m已经被其他节点匹配了,那就把m移除掉。
  23. 说明栈顶的h正在匹配过程中。
  24. m是h的配对儿,h正在和m匹配。
  25. 如果m为空,其他节点把m匹配走了。
  26. 获取m的next节点,如果m和h匹配成功,mn就得补上head的位置了。
  27. 匹配一下m和h。
  28. 匹配成功的话,把h和m弹出。
  29. 没匹配成功的话,说明m可能被其他节点满足了。
  30. 没成功的话,说明m已经被其他节点匹配了,那就把m移除掉。

SynchronousQueue#TransferStack#snode()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Creates or resets fields of a node. Called only from transfer
* where the node to push on stack is lazily created and
* reused when possible to help reduce intervals between reads
* and CASes of head and to avoid surges of garbage when CASes
* to push nodes fail due to contention.
*/
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}

SynchronousQueue#TransferStack#awaitFulfill()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Spins/blocks until node s is matched by a fulfill operation.
*
* @param s the waiting node
* @param timed true if timed wait
* @param nanos timeout value
* @return matched node, or s if cancelled
*/
// 1
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
/*
* When a node/thread is about to block, it sets its waiter
* field and then rechecks state at least one more time
* before actually parking, thus covering race vs
* fulfiller noticing that waiter is non-null so should be
* woken.
*
* When invoked by nodes that appear at the point of call
* to be at the head of the stack, calls to park are
* preceded by spins to avoid blocking when producers and
* consumers are arriving very close in time. This can
* happen enough to bother only on multiprocessors.
*
* The order of checks for returning out of main loop
* reflects fact that interrupts have precedence over
* normal returns, which have precedence over
* timeouts. (So, on timeout, one last check for match is
* done before giving up.) Except that calls from untimed
* SynchronousQueue.{poll/offer} don't check interrupts
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
*/
long lastTime = (timed)? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
int spins = (shouldSpin(s)?
(timed? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
// 2
s.tryCancel();
SNode m = s.match;
if (m != null)
// 3
return m;
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
// 4
s.tryCancel();
continue;
}
}
if (spins > 0)
// 5
spins = shouldSpin(s)? (spins-1) : 0;
else if (s.waiter == null)
// 6
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
// 7
LockSupport.parkNanos(this, nanos);
}
}

标注代码分析

  1. 自旋/阻塞直到节点被匹配。
  2. 如果当前线程被中断了,那么取消当前节点。
  3. 如果已经匹配成功,就返回匹配的节点。
  4. 如果超时,也取消当前节点。
  5. 自旋控制,每次循环都检测是否满足自旋条件,满足的话,自旋值就减去1,然后进入下次循环(一直减到0)
  6. 第一次循环时,会将当前线程设置到s上。
  7. 有超时条件下,会检测超时时间是否大于超时阀值(这应该是一个经验值),大于就阻塞,小于就自旋。

在s节点真正阻塞之前,将当前线程设置到s上面,然后检查中断状态(不少于一次),以确保后续和s匹配的节点来
唤醒当前线程。
当执行此方法时,如果执行节点恰好在栈顶,阻塞之前会做一些自旋,为的是如果有生产者或消费者马上到来,就不需要执行节点阻塞了。这种优化在多核下是有意义的。

SynchronousQueue#TransferStack#shouldSpin()

1
2
3
4
5
6
7
8
9
/**
* Returns true if node s is at head or there is an active
* fulfiller.
*/
// 1
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}

标注代码分析

  1. 如果s节点就是当前栈中头节点,或者头节点正在匹配过程中,那么可以自旋一下。

SynchronousQueue自选参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors();

/**
* The number of times to spin before blocking in timed waits.
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;

/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
*/
static final int maxUntimedSpins = maxTimedSpins * 16;

/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
static final long spinForTimeoutThreshold = 1000L;

SynchronousQueue#TransferStack#clean()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Unlinks s from the stack.
*/
// 1
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread

/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
* it. But we can stop when we see any node known to
* follow s. We use s.next unless it too is cancelled, in
* which case we try the node one past. We don't check any
* further because we don't want to doubly traverse just to
* find sentinel.
*/

SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;

// Absorb cancelled nodes at head
// 2
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);

// Unsplice embedded nodes
// 3
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}

标注代码分析

  1. 当s节点被取消时,才会调用这个方法。
  2. 将从栈顶节点开始到past的连续的取消节点移除。
  3. 如果p本身未取消(上面的while碰到一个未取消的节点就会退出,但这个节点和past节点之间可能还有取消节点),再把p到past之间的取消节点都移除。

SynchronousQueue#TransferQueue 伪队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/** Dual Queue */
static final class TransferQueue extends Transferer {
/*
* This extends Scherer-Scott dual queue algorithm, differing,
* among other ways, by using modes within nodes rather than
* marked pointers. The algorithm is a little simpler than
* that for stacks because fulfillers do not need explicit
* nodes, and matching is done by CAS'ing QNode.item field
* from non-null to null (for put) or vice versa (for take).
*/

/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;

QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}

static final AtomicReferenceFieldUpdater<QNode, QNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(QNode.class, QNode.class, "next");

boolean casNext(QNode cmp, QNode val) {
return (next == cmp &&
nextUpdater.compareAndSet(this, cmp, val));
}

static final AtomicReferenceFieldUpdater<QNode, Object>
itemUpdater = AtomicReferenceFieldUpdater.newUpdater
(QNode.class, Object.class, "item");

boolean casItem(Object cmp, Object val) {
return (item == cmp &&
itemUpdater.compareAndSet(this, cmp, val));
}

/**
* Tries to cancel by CAS'ing ref to this as item.
*/
// 1
void tryCancel(Object cmp) {
itemUpdater.compareAndSet(this, cmp, this);
}

boolean isCancelled() {
return item == this;
}

/**
* Returns true if this node is known to be off the queue
* because its next pointer has been forgotten due to
* an advanceHead operation.
*/
// 2
boolean isOffList() {
return next == this;
}
}

/** Head of queue */
// 3
transient volatile QNode head;
/** Tail of queue */
// 4
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it cancelled.
*/
// 5
transient volatile QNode cleanMe;

TransferQueue() {
// 6
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}

标注代码分析

  1. 尝试取消节点。取消就是将节点的item域指向自身。
  2. 判断节点是否离开了队列。
  3. 队列头节点。
  4. 队列尾节点。
  5. 指向一个被取消的节点,如果取消这个节点时,它是最后一个进入队列的节点,那么这个节点可能还没有离开队列。
  6. 初始化一个哨兵节点。

SynchronousQueue#TransferQueue#transfer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
* Puts or takes an item.
*/
Object transfer(Object e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;
// 1
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 2
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 3
if (t != tail) // inconsistent read
continue;
// 4
if (tn != null) { // lagging tail
// 5
advanceTail(t, tn);
continue;
}
// 6
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
// 7
s = new QNode(e, isData);
// 8
if (!t.casNext(null, s)) // failed to link in
continue;
// 9
advanceTail(t, s); // swing tail and wait
// 10
Object x = awaitFulfill(s, e, timed, nanos);
// 11
if (x == s) { // wait was cancelled
// 12
clean(t, s);
return null;
}
// 13
if (!s.isOffList()) { // not already unlinked
// 14
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null)? x : e;

} else { // complementary-mode
// 15
QNode m = h.next; // node to fulfill
// 16
if (t != tail || m == null || h != head)
continue; // inconsistent read

Object x = m.item;
// 17
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
// 18
advanceHead(h, m); // dequeue and retry
continue;
}
// 19
advanceHead(h, m); // successfully fulfilled
// 20
LockSupport.unpark(m.waiter);
return (x != null)? x : e;
}
}
}

标注代码分析

  1. 如果看到未初始化的头尾节点。
  2. 队列为空或者当前节点和队列中节点模式相同。
  3. 读取到不一致的结果,说明同时有其他线程修改了tail。
  4. 说明其他线程已经添加了新节点tn,但还没将其设置为tail。
  5. 当前线程帮忙推进尾节点,就是尝试将tn设置为尾节点。
  6. 超时。
  7. 初始化s。
  8. 尝试将当前节点s拼接到t后面。
  9. 尝试将s设置为队列尾节点。
  10. 然后等着被匹配。
  11. 如果被取消。
  12. 清理s节点。
  13. 如果s节点还没有离开队列。
  14. 尝试将s设置为头节点,移除t。
  15. 找到能匹配的节点。
  16. 读取到不一致的结果,进入下一轮循环。
  17. 如果m已经被匹配;或者m被取消;如果尝试将数据e设置到m上失败。
  18. 将h出队,m设置为头结点,然后重试。
  19. 成功匹配,推进头节点。
  20. 唤醒m上的等待线程。

基本算法是在一个无限循环中尝试下面两种动作里面的一种:

  • 如果队列为空,或者包含相同模式(存或者取)的节点。 尝试将节点加入等待的队列,直到被匹配(或被取消), 同时返回匹配节点的数据。
  • 如果队列中包含等待的节点,并且当前节点和这个等待节点能相互匹配,那么尝试匹配等待节点并将这个节点出队,然后返回匹配节点的数据。

在每个动作里面,都会检测并帮助其他线程来完成节点推进。在循环开始的时候会做一个非空检测,以避免当前线程看到未初始化的头尾节点。这种情况在当前SynchronousQueue中永远不会发生,但如果调用者持有一个非volatile/final域的话,就有可能会发生。在循环开始的时间做这个非空检测要比在内部(分支里)做性能好一些。

SynchronousQueue#TransferQueue#advanceHead()

1
2
3
4
5
6
7
8
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
if (h == head && headUpdater.compareAndSet(this, h, nh))
h.next = h; // forget old next
}

SynchronousQueue#TransferQueue#advanceTail()

1
2
3
4
5
6
7
/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
tailUpdater.compareAndSet(this, t, nt);
}

SynchronousQueue#TransferQueue#awaitFulfill()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Spins/blocks until node s is fulfilled.
*
* @param s the waiting node
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
long lastTime = (timed)? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
return x;
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}

SynchronousQueue#TransferQueue#clean()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
      /**
* Gets rid of cancelled node s with original predecessor pred.
*/
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
// 1
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
// 2
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
// 3
if (t == h)
return;
QNode tn = t.next;
// 4
if (t != tail)
continue;
if (tn != null) {
// 5
advanceTail(t, tn);
continue;
}
// 6
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
// 7
if (sn == s || pred.casNext(s, sn))
return;
}
// 8
QNode dp = cleanMe;
// 9
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn)))// 10 // d unspliced
casCleanMe(dp, null);
// 11
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))
// 12
return; // Postpone cleaning s
}
}

标注代码分析

  1. 在任意给定的时间点,能删除的节点一定是最后入队的节点。为了满足这个条件,如果当前无法删除s,就将其前驱节点保存为”cleanMe”,先删除之前保存的版本。至少节点s和之前保存的节点里面有一个能被删除,所以方法一定会结束。
  2. 如果head节点的next节点被取消,那么推进一下head节点。
  3. 如果队列为空。
  4. 出现不一致读,重试。
  5. 帮助推进尾节点。
  6. 如果s不是尾节点,移除s。
  7. 如果s已经被移除退出循环,否则尝试断开s。
  8. 下面要做的事情大体就是:如果s是位节点,那么不会马上删除s,而是将s的前驱节点设置为cleanMe,下次清理其他取消节点的时候会顺便把s移除。
  9. 如果dp不为null,说明是前一个被取消节点,将其移除。
  10. 把之前标记为cleanMe节点的next节点d移除。
  11. 说明s的前驱已经是cleanMe了(后续会被删掉)。
  12. 如果当前cleanMe为null,那么将s前驱节点设置为cleanMe,并退出。

SynchronousQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* The transferer. Set only in constructor, but cannot be declared
* as final without further complicating serialization. Since
* this is accessed only at most once per public method, there
* isn't a noticeable performance penalty for using volatile
* instead of final here.
*/
private transient volatile Transferer transferer;

/**
* Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
*/
public SynchronousQueue() {
this(false);
}

/**
* Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
transferer = (fair)? new TransferQueue() : new TransferStack();
}

/**
* Adds the specified element to this queue, waiting if necessary for
* another thread to receive it.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 1
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}

/**
* Inserts the specified element into this queue, waiting if necessary
* up to the specified wait time for another thread to receive it.
*
* @return <tt>true</tt> if successful, or <tt>false</tt> if the
* specified waiting time elapses before a consumer appears.
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 2
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}

/**
* Inserts the specified element into this queue, if another thread is
* waiting to receive it.
*
* @param e the element to add
* @return <tt>true</tt> if the element was added to this queue, else
* <tt>false</tt>
* @throws NullPointerException if the specified element is null
*/
// 3
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}

/**
* Retrieves and removes the head of this queue, waiting if necessary
* for another thread to insert it.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
// 4
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0);
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}

/**
* Retrieves and removes the head of this queue, waiting
* if necessary up to the specified wait time, for another thread
* to insert it.
*
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element is present.
* @throws InterruptedException {@inheritDoc}
*/
// 5
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
Object e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return (E)e;
throw new InterruptedException();
}

/**
* Retrieves and removes the head of this queue, if another thread
* is currently making an element available.
*
* @return the head of this queue, or <tt>null</tt> if no
* element is available.
*/
// 6
public E poll() {
return (E)transferer.transfer(null, true, 0);
}

标注代码分析

  1. 添加一个数据到队列,等到其他线程接收这个数据。
  2. 添加一个数据到队列,等到其他线程接收这个数据或者超时。
  3. 添加一个数据到队列,如果有其他线程正等待接收这个数据且接收成功,返回true;否则返回false。这个方法不阻塞。
  4. 获取并移除队列前端的数据,如果队列中没有数据,就等待其他线程添加一个数据。
  5. 获取并移除队列前端的数据,如果队列中没有数据,就等待其他线程添加一个数据或者超时。
  6. 如果其他线程正在添加数据到队列,那么尝试获取并移除这个数据。这个方法不阻塞。

SynchronousQueue 序列化

序列化比较特别,因为transferer域本身不需要序列化,但需要记住transferer是内部伪栈和伪队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*
* To cope with serialization strategy in the 1.5 version of
* SynchronousQueue, we declare some unused classes and fields
* that exist solely to enable serializability across versions.
* These fields are never used, so are initialized only if this
* object is ever serialized or deserialized.
*/

static class WaitQueue implements java.io.Serializable { }
static class LifoWaitQueue extends WaitQueue {
private static final long serialVersionUID = -3633113410248163686L;
}
static class FifoWaitQueue extends WaitQueue {
private static final long serialVersionUID = -3623113410248163686L;
}
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;

/**
* Save the state to a stream (that is, serialize it).
*
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
// 1
boolean fair = transferer instanceof TransferQueue;
if (fair) {
qlock = new ReentrantLock(true);
waitingProducers = new FifoWaitQueue();
waitingConsumers = new FifoWaitQueue();
}
else {
qlock = new ReentrantLock();
waitingProducers = new LifoWaitQueue();
waitingConsumers = new LifoWaitQueue();
}
s.defaultWriteObject();
}

private void readObject(final java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
if (waitingProducers instanceof FifoWaitQueue)
transferer = new TransferQueue();
else
transferer = new TransferStack();
}

标注代码分析

  1. 序列化时根据TransferQueue类型来创建WaitQueue实例。

总结

伪栈的结构下,新来的线程会作为栈顶节点或者优先和栈顶的等待节点进行匹配,并不是公平的;但伪队列的结构下,新来的线程会在队尾,或者和队头的等待节点(最前到的)进行匹配,能够保证一定的公平性。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×