- 介绍
- 源码分析
- Sync属性
- Sync#tryAcquire()
- Sync#writerShouldBlock()
- Sync#tryRelease()
- Sync#HoldCounter
- Sync#ThreadLocalHoldCounter
- Sync#tryAcquireShared
- Sync#fullTryAcquireShared
- Sync#tryReleaseShared
- Sync#tryWriteLock
- Sync#tryReadLock
- ReentrantReadWriteLock#FairSync
- ReentrantReadWriteLock#NonfairSync
- ReentrantReadWriteLock#ReadLock#lock()
- ReentrantReadWriteLock#ReadLock#tryLock()
- ReentrantReadWriteLock#ReadLock#newCondition()
- ReentrantReadWriteLock#WriteLock#lock()
- ReentrantReadWriteLock#WriteLock#tryLock()
- 锁降级
介绍
ReentrantReadWriteLock支持公平/非公平锁。写锁可以降级为读锁,但读锁不能升级为写锁。
写锁(tryAcquire)
(读锁&&写锁)或者(无读锁&&无写锁),根据公平锁判断queue empty或者queue head,再竞争锁,非公平锁直接竞争锁。
读锁(tryAcquireShared)
读锁会阻塞写锁。也会根据公平锁判断queue empty或者queue head,再竞争锁,非公平锁队列头有写请求,那么当前读请求阻塞,返回false,CAS直接竞争锁。如果是queue head,返回true,通过fullTryAcquireShared()中的CAS竞争锁。
源码分析
ReentrantReadWriteLock实现ReadWriteLock接口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}
读锁和写锁2个方法。
ReentrantReadWriteLock#Sync基于AbstractQueuedSynchronizer实现。
Sync属性
1 | static final int SHARED_SHIFT = 16; |
The lower one representing the exclusive (writer) lock hold count, and the upper the shared (reader) hold count.
由注释可以知道低位表示write独占锁持有锁次数,高位表示read持有重入锁次数。
Sync#tryAcquire()
获取write锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. if read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
// 1
int c = getState();
// 2
int w = exclusiveCount(c);
// 3
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 4
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 5
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 6
if ((w == 0 && writerShouldBlock(current)) ||
!compareAndSetState(c, c + acquires))
return false;
// 7
setExclusiveOwnerThread(current);
return true;
}
结合注释
标注代码分析
- AbstractQueuedSynchronizer#getState(),线程持有锁的数量。
- write独占锁数量。
- c!=0当前线程持有锁。当前线程持有read锁。
- write独占锁数量是0,说明线程持有reade锁,未持有write锁。无法获取锁。或者持有write锁的线程和当前线程不一样,返回失败。
- 当前write独占锁+已经持有write独占锁的次数>锁数量最大值,抛异常。
- c == 0当前线程未持有read锁。w == 0线程未持有write锁,说明未持有读锁、写锁。writerShouldBlock分别被公平锁和非公平锁实现。公平锁FairSync#writerShouldBlock()的queue empty或者queue head,则CAS来设置state。非公平锁NonfairSync#writerShouldBlock()返回false,直接设置CAS,无需阻塞。
- 当前未持有write锁、read锁,CAS设置成功,则设置AbstractQueuedSynchronizer#setExclusiveOwnerThread,exclusiveOwnerThread是锁独有线程。
Sync#writerShouldBlock()
1 | /** |
根据注释及方法名字很容易理解。线程是否阻塞写。
writerShouldBlock是抽象方法由Sync的子类公平锁(FairSync)和非公平锁(NonfairSync)实现这个方法。
Sync#tryRelease()
1 | protected final boolean tryRelease(int releases) { |
标注代码分析
- writer独占锁数量 == 0,设置持有锁的线程是null。设置重入次数AbstractQueuedSynchronizer#state。
- 持有writer独占锁,重新设置重入锁次数。
Sync#HoldCounter
1 | /** |
标注代码分析
- 线程id作为HoldCounter#tid。
avoid garbage retention
有助于垃圾回收。A counter for per-thread read hold counts
每个线程持有read锁的次数。
Sync#ThreadLocalHoldCounter
1 | // 1 |
标注代码分析
- 线程存储read锁计数的ThreadLocal。
- HoldCounter作为缓存,存储线程获取read锁的计数。
- volatile的读写操作,似内存屏障,保证readHolds的可见性。
Sync#tryAcquireShared
获取read锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 1
int c = getState();
// 2
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 3
if (!readerShouldBlock(current) &&
compareAndSetState(c, c + SHARED_UNIT)) {
HoldCounter rh = cachedHoldCounter;
// 4
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
// 5
rh.count++;
return 1;
}
return fullTryAcquireShared(current);
}
标注代码分析
- 线程持有锁的数量。
- 持有write独占锁,当前线程不是持有锁的线程,获取锁失败。
- readerShouldBlock分别有公平锁和非公平锁实现。公平锁FairSync#readerShouldBlock只有queue empty和queue head才能获取read 锁;非公平锁NonfairSync#readerShouldBlock()的queue有write锁,read锁获取阻塞,readerShouldBlock返回true,执行fullTryAcquireShared()。如果readerShouldBlock()是
first queued thread
,则执行CAS获取锁,然后把缓存HoldCounter赋值给新HoldCounter作为线程持有read锁的次数。 - 没有read锁计数HoldCount,或者read锁计数对象HoldCounter非当前线程的HoldCounter。重新赋值cachedHoldCounter、rh。
- read锁计数+1。
Sync#fullTryAcquireShared
1 | final int fullTryAcquireShared(Thread current) { |
标注代码分析
- 缓存的read锁计数对象HoldCounter是null,或者holdCounter#id和线程id不一样的时候,重新给HoldCounter对象赋值。
- write锁计数!=0,当前线程非锁持有者(其他线程持有锁),结束方法。或者未持有锁,同时read锁是阻塞的,结束方法。
- for循环中CAS获取read锁。缓存HoldCounter赋值,HoldCounter对象rh+1。
Sync#tryReleaseShared
释放read锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15protected final boolean tryReleaseShared(int unused) {
HoldCounter rh = cachedHoldCounter;
Thread current = Thread.currentThread();
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
if (rh.tryDecrement() <= 0)
throw new IllegalMonitorStateException();
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// 1
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
标注代码分析
- 循环的进行CAS方法,然后read锁全部释放,返回true。
Sync#tryWriteLock
1 | final boolean tryWriteLock() { |
This is identical in effect to tryAcquire except
从注释和代码知道这和Sync#tryAcquire()类似功能,缺少writerShouldBlock()。
Sync#tryReadLock
1 | final boolean tryReadLock() { |
tryReadLock()和tryWriteLock()一样,tryReadLock()和tryAcquireShared()功能类似,缺少readerShouldBlock()。
ReentrantReadWriteLock#FairSync
公平锁1
2
3
4
5
6
7
8
9
10
11final static class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock(Thread current) {
// only proceed if queue is empty or current thread at head
return !isFirst(current);
}
final boolean readerShouldBlock(Thread current) {
// only proceed if queue is empty or current thread at head
return !isFirst(current);
}
}
writerShouldBlock()和readerShouldBlock()都是以AbstractQueuedSynchronizer#isFirst()判断是否运行阻塞。如果是queue head或者queue empty,则非阻塞。在Sync#tryAcquireShared中if (!readerShouldBlock(current) &&
这代码用!
,所以queue head或者queue empty公平锁才能获取到锁。
ReentrantReadWriteLock#NonfairSync
非公平锁1
2
3
4
5
6
7
8
9
10
11
12final static class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
// 1
final boolean writerShouldBlock(Thread current) {
return false; // writers can always barge
}
// 2
final boolean readerShouldBlock(Thread current) {
return apparentlyFirstQueuedIsExclusive();
}
}
标注代码分析
- write不允许阻塞。参考Sync#tryAcquire()中
if ((w == 0 && writerShouldBlock(current)) ||!compareAndSetState(c, c + acquires))
,writerShouldBlock()返回false,非公平锁tryAcquire()每次都可以通过CAS竞争锁。 - 如果AQS同步等待队列头有写请求,那么当前读请求阻塞。
ReentrantReadWriteLock#ReadLock#lock()
1 | /** |
请求读锁。如果没有其他线程持有写锁,那么请求成功,方法返回。Sync#tryAcquireShared()中if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)
有write锁,获取read锁失败。如果当前线程被阻塞,执行fullTryAcquireShared(),直到CAS获取到锁。
ReentrantReadWriteLock#ReadLock#tryLock()
1 | public boolean tryLock() { |
如果没有其他线程持有写锁,那么请求读锁成功。exclusiveCount(c) != 0
判断是否持有writer锁。
ReentrantReadWriteLock#ReadLock#newCondition()
1 | public Condition newCondition() { |
{@code ReadLocks} do not support conditions
不支持conditions,所以抛出异常。
ReentrantReadWriteLock#WriteLock#lock()
1 | public void lock() { |
根据注释可以知道。
- 如果没有其他线程持有读锁或者写锁,那么CAS请求写锁成功,并将写锁持有次数设置为acquires。
if ((w == 0 && writerShouldBlock(current)) ||!compareAndSetState(c, c + acquires))
- 如果当前线程已经持有了写锁
w > 0
,直接执行CAS请求成功并增加持有次数。
ReentrantReadWriteLock#WriteLock#tryLock()
1 | public boolean tryLock( ) { |
根据注释可以知道。
- 如果没有其他线程持有当前锁的读锁或者写锁,那么CAS请求读锁成功。
- 如果当前线程已经持有当前写锁,直接执行CAS增加写锁持有次数。
锁降级
jdk官方实例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import java.util.concurrent.locks.ReentrantReadWriteLock;
public class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
// 1
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
// 2
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
// data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
// 3
rwl.readLock().lock();
rwl.writeLock().unlock(); // Unlock write, still hold read
}
//use(data);
// 4
rwl.readLock().unlock();
}
}
标注代码分析
- 读锁。
- 先释放读锁,在获取写锁。
- 锁降级。释放写锁前,获取读锁。
- 已经获取读锁,可以进行业务操作,然后再释放读锁。