JDK1.6 ReentrantReadWriteLock

介绍

ReentrantReadWriteLock支持公平/非公平锁。写锁可以降级为读锁,但读锁不能升级为写锁。

写锁(tryAcquire)

(读锁&&写锁)或者(无读锁&&无写锁),根据公平锁判断queue empty或者queue head,再竞争锁,非公平锁直接竞争锁。

读锁(tryAcquireShared)

读锁会阻塞写锁。也会根据公平锁判断queue empty或者queue head,再竞争锁,非公平锁队列头有写请求,那么当前读请求阻塞,返回false,CAS直接竞争锁。如果是queue head,返回true,通过fullTryAcquireShared()中的CAS竞争锁。

源码分析

ReentrantReadWriteLock实现ReadWriteLock接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}

读锁和写锁2个方法。
ReentrantReadWriteLock#Sync基于AbstractQueuedSynchronizer实现。

Sync属性

1
2
3
4
5
6
7
8
9
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

The lower one representing the exclusive (writer) lock hold count, and the upper the shared (reader) hold count.

由注释可以知道低位表示write独占锁持有锁次数,高位表示read持有重入锁次数。

Sync#tryAcquire()

获取write锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. if read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
// 1
int c = getState();
// 2
int w = exclusiveCount(c);
// 3
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 4
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 5
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 6
if ((w == 0 && writerShouldBlock(current)) ||
!compareAndSetState(c, c + acquires))
return false;
// 7
setExclusiveOwnerThread(current);
return true;
}

结合注释
标注代码分析

  1. AbstractQueuedSynchronizer#getState(),线程持有锁的数量。
  2. write独占锁数量。
  3. c!=0当前线程持有锁。当前线程持有read锁。
  4. write独占锁数量是0,说明线程持有reade锁,未持有write锁。无法获取锁。或者持有write锁的线程和当前线程不一样,返回失败。
  5. 当前write独占锁+已经持有write独占锁的次数>锁数量最大值,抛异常。
  6. c == 0当前线程未持有read锁。w == 0线程未持有write锁,说明未持有读锁、写锁。writerShouldBlock分别被公平锁和非公平锁实现。公平锁FairSync#writerShouldBlock()的queue empty或者queue head,则CAS来设置state。非公平锁NonfairSync#writerShouldBlock()返回false,直接设置CAS,无需阻塞。
  7. 当前未持有write锁、read锁,CAS设置成功,则设置AbstractQueuedSynchronizer#setExclusiveOwnerThread,exclusiveOwnerThread是锁独有线程。

Sync#writerShouldBlock()

1
2
3
4
5
6
/**
* Return true if a writer thread that is otherwise
* eligible for lock should block because of policy
* for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock(Thread current);

根据注释及方法名字很容易理解。线程是否阻塞写。
writerShouldBlock是抽象方法由Sync的子类公平锁(FairSync)和非公平锁(NonfairSync)实现这个方法。

Sync#tryRelease()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
int nextc = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 1
if (exclusiveCount(nextc) == 0) {
setExclusiveOwnerThread(null);
setState(nextc);
return true;
} else {// 2
setState(nextc);
return false;
}
}

标注代码分析

  1. writer独占锁数量 == 0,设置持有锁的线程是null。设置重入次数AbstractQueuedSynchronizer#state。
  2. 持有writer独占锁,重新设置重入锁次数。

Sync#HoldCounter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count;
// Use id, not reference, to avoid garbage retention
// 1
final long tid = Thread.currentThread().getId();
/** Decrement if positive; return previous value */
int tryDecrement() {
int c = count;
if (c > 0)
count = c - 1;
return c;
}
}

标注代码分析

  1. 线程id作为HoldCounter#tid。avoid garbage retention有助于垃圾回收。

    A counter for per-thread read hold counts

每个线程持有read锁的次数。

Sync#ThreadLocalHoldCounter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 1
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

/**
* The number of read locks held by current thread.
* Initialized only in constructor and readObject.
*/
transient ThreadLocalHoldCounter readHolds;

/**
* The hold count of the last thread to successfully acquire
* readLock. This saves ThreadLocal lookup in the common case
* where the next thread to release is the last one to
* acquire. This is non-volatile since it is just used
* as a heuristic, and would be great for threads to cache.
*/
// 2
transient HoldCounter cachedHoldCounter;

Sync() {
readHolds = new ThreadLocalHoldCounter();
// 3
setState(getState()); // ensures visibility of readHolds
}

标注代码分析

  1. 线程存储read锁计数的ThreadLocal。
  2. HoldCounter作为缓存,存储线程获取read锁的计数。
  3. volatile的读写操作,似内存屏障,保证readHolds的可见性。

Sync#tryAcquireShared

获取read锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
// 1
int c = getState();
// 2
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 3
if (!readerShouldBlock(current) &&
compareAndSetState(c, c + SHARED_UNIT)) {
HoldCounter rh = cachedHoldCounter;
// 4
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
// 5
rh.count++;
return 1;
}
return fullTryAcquireShared(current);
}

标注代码分析

  1. 线程持有锁的数量。
  2. 持有write独占锁,当前线程不是持有锁的线程,获取锁失败。
  3. readerShouldBlock分别有公平锁和非公平锁实现。公平锁FairSync#readerShouldBlock只有queue empty和queue head才能获取read 锁;非公平锁NonfairSync#readerShouldBlock()的queue有write锁,read锁获取阻塞,readerShouldBlock返回true,执行fullTryAcquireShared()。如果readerShouldBlock()是first queued thread,则执行CAS获取锁,然后把缓存HoldCounter赋值给新HoldCounter作为线程持有read锁的次数。
  4. 没有read锁计数HoldCount,或者read锁计数对象HoldCounter非当前线程的HoldCounter。重新赋值cachedHoldCounter、rh。
  5. read锁计数+1。

Sync#fullTryAcquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final int fullTryAcquireShared(Thread current) {

HoldCounter rh = cachedHoldCounter;
// 1
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
for (;;) {
int c = getState();
int w = exclusiveCount(c);
// 2
if ((w != 0 && getExclusiveOwnerThread() != current) ||
((rh.count | w) == 0 && readerShouldBlock(current)))
return -1;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 3
if (compareAndSetState(c, c + SHARED_UNIT)) {
cachedHoldCounter = rh; // cache for release
rh.count++;
return 1;
}
}
}

标注代码分析

  1. 缓存的read锁计数对象HoldCounter是null,或者holdCounter#id和线程id不一样的时候,重新给HoldCounter对象赋值。
  2. write锁计数!=0,当前线程非锁持有者(其他线程持有锁),结束方法。或者未持有锁,同时read锁是阻塞的,结束方法。
  3. for循环中CAS获取read锁。缓存HoldCounter赋值,HoldCounter对象rh+1。

Sync#tryReleaseShared

释放read锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryReleaseShared(int unused) {
HoldCounter rh = cachedHoldCounter;
Thread current = Thread.currentThread();
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
if (rh.tryDecrement() <= 0)
throw new IllegalMonitorStateException();
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// 1
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

标注代码分析

  1. 循环的进行CAS方法,然后read锁全部释放,返回true。

Sync#tryWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 ||current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}

This is identical in effect to tryAcquire except

从注释和代码知道这和Sync#tryAcquire()类似功能,缺少writerShouldBlock()。

Sync#tryReadLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
rh.count++;
return true;
}
}
}

tryReadLock()和tryWriteLock()一样,tryReadLock()和tryAcquireShared()功能类似,缺少readerShouldBlock()。

ReentrantReadWriteLock#FairSync

公平锁

1
2
3
4
5
6
7
8
9
10
11
final static class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock(Thread current) {
// only proceed if queue is empty or current thread at head
return !isFirst(current);
}
final boolean readerShouldBlock(Thread current) {
// only proceed if queue is empty or current thread at head
return !isFirst(current);
}
}

writerShouldBlock()和readerShouldBlock()都是以AbstractQueuedSynchronizer#isFirst()判断是否运行阻塞。如果是queue head或者queue empty,则非阻塞。在Sync#tryAcquireShared中if (!readerShouldBlock(current) &&这代码用!,所以queue head或者queue empty公平锁才能获取到锁。

ReentrantReadWriteLock#NonfairSync

非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
final static class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
// 1
final boolean writerShouldBlock(Thread current) {
return false; // writers can always barge
}
// 2
final boolean readerShouldBlock(Thread current) {

return apparentlyFirstQueuedIsExclusive();
}
}

标注代码分析

  1. write不允许阻塞。参考Sync#tryAcquire()中if ((w == 0 && writerShouldBlock(current)) ||!compareAndSetState(c, c + acquires)),writerShouldBlock()返回false,非公平锁tryAcquire()每次都可以通过CAS竞争锁。
  2. 如果AQS同步等待队列头有写请求,那么当前读请求阻塞。

ReentrantReadWriteLock#ReadLock#lock()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Acquires the read lock.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1);
}

请求读锁。如果没有其他线程持有写锁,那么请求成功,方法返回。Sync#tryAcquireShared()中if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)有write锁,获取read锁失败。如果当前线程被阻塞,执行fullTryAcquireShared(),直到CAS获取到锁。

ReentrantReadWriteLock#ReadLock#tryLock()

1
2
3
public  boolean tryLock() {
return sync.tryReadLock();
}

如果没有其他线程持有写锁,那么请求读锁成功。exclusiveCount(c) != 0判断是否持有writer锁。

ReentrantReadWriteLock#ReadLock#newCondition()

1
2
3
public Condition newCondition() {
throw new UnsupportedOperationException();
}

{@code ReadLocks} do not support conditions

不支持conditions,所以抛出异常。

ReentrantReadWriteLock#WriteLock#lock()

1
2
3
public void lock() {
sync.acquire(1);
}

根据注释可以知道。

  • 如果没有其他线程持有读锁或者写锁,那么CAS请求写锁成功,并将写锁持有次数设置为acquires。if ((w == 0 && writerShouldBlock(current)) ||!compareAndSetState(c, c + acquires))
  • 如果当前线程已经持有了写锁w > 0,直接执行CAS请求成功并增加持有次数。

ReentrantReadWriteLock#WriteLock#tryLock()

1
2
3
public boolean tryLock( ) {
return sync.tryWriteLock();
}

根据注释可以知道。

  • 如果没有其他线程持有当前锁的读锁或者写锁,那么CAS请求读锁成功。
  • 如果当前线程已经持有当前写锁,直接执行CAS增加写锁持有次数。

锁降级

jdk官方实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
// 1
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
// 2
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
// data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
// 3
rwl.readLock().lock();
rwl.writeLock().unlock(); // Unlock write, still hold read
}

//use(data);
// 4
rwl.readLock().unlock();
}
}

标注代码分析

  1. 读锁。
  2. 先释放读锁,在获取写锁。
  3. 锁降级。释放写锁前,获取读锁。
  4. 已经获取读锁,可以进行业务操作,然后再释放读锁。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×