在我之前的博客 Java:初识多线程、原理及实现 提及,多线程下对数据的非原子性操作会造成数据错误,为此 JDK 也提供了一些线程安全的容器。本篇主要介绍 List 类的安全容器:Vector 和 CopyOnWriteArrayList,以及 Map 类的 HashTable 和 ConcurrentHashMap。从他们之前的区别,后者对前者的改进,具体的实现原理这几个角度进行分析。
List 类的安全容器
1. Vector 与 SynchronizedList
1.1 Vector
Vector
类通过将所有的方法声明 synchronized 来保证数据的线程安全。但同步的确是整个 vector 实例,导致一个线程在访问实例时,其他线程都需要等待的尴尬情形,较大的同步开销完全不符合并发设计的初衷。此外,多线程下对 Verctor 的复合操作(如遍历+修改)仍会导致 并发修改异常。
public synchronized boolean add(E e);
public synchronized boolean isEmpty();
public synchronized E get(int index);
public synchronized void sort(Comparator<? super E> c);
1.2 SynchronizedList
SynchronizedList
类和 vector 类似,只不过是在方法内进行同步锁:
public E get(int index) {
synchronized (mutex) {return list.get(index);}}
public E set(int index, E element) {
synchronized (mutex) {return list.set(index, element);}}
public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}}
public E remove(int index) {
synchronized (mutex) {return list.remove(index);}}
1.3 Vector 和 SynchronizedList 可能会出现的问题
虽然 Vector 和 SynchronizedList 的单个方法都是线程安全的,但是对其的复合操作仍然需要用户手动地同步。SynchronizedList 在使用迭代器遍历的时候也会有同样的问题,源码已经提醒我们要 手动加锁 了。
public ListIterator<E> listIterator() {
return list.listIterator(); // Must be manually synched by user
}
public ListIterator<E> listIterator(int index) {
return list.listIterator(index); // Must be manually synched by user
}
2. CopyOnWriteArrayList
总的来说,JUC 下支持并发的容器与老一代的线程安全类相比,总结起来就是加锁粒度的问题。
2.1 CopyOnWriteArrayList 实现原理
首先引用维基上对 Copy-on-Write 机制的解释:
Copy-on-Write 机制:如果有多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正 复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。优点是如果调用者没有修改该资源,就不会有副本(private copy)被建立,因此多个调用者只是读取操作时可以共享同一份资源。
再看下源码文档的解释:
CopyOnWriteArrayList 是 ArrayList 线程安全的一种变体,底层的所有修改操作通过复制数组的方式来实现。这通常会过于昂贵,但在遍历操作远大于修改操作时更加高效;并且无需在并发遍历时进行同步。Iterator 通过引用在迭代器被创建时的“快照”形式的列表,保证列表在迭代周期不发生改变来排除干扰,保证不抛出并发修改异常。在迭代器上的修改操作将不被支持,它们会抛出 UnsupportedOperationException。
2.2 CopyOnWriteArrayList 的结构与方法
看源码可知 CopyOnWriteArrayList 底层是数据,加锁通过 ReentrantLock
来实现。
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
final Object[] getArray() {
return array;
}
相比 Vector / SynchronizedList 需要在遍历时手动加锁的情况,CopyOnWriteArrayList 在使用迭代器时不需要显式加锁,可以看下它的 mutation 方法如 add:
可以看到在进行 add 时就上锁,复制一个新数组,增加操作在新数组上完成,将 array 指向到新数组中,最后解锁。这种同步形式对于其他的修改操作 remove、clear、set 等也是类似的。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 复制新数组,并在长度上加一
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
// 将实例指向新数组
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
至此,CopyOnWriteArrayList 线程安全的策略就很明显了:
- 所有的非修改操作不加锁,修改操作加锁。
- 在修改时,先复制出一个新数组,修改的操作在新数组中完成,最后将新数组交由 array 变量指向。
2.3 为什么 CopyOnWriteArrayList 迭代时不需要显式加锁
通过观察 CopyOnWriteArrayList 的迭代器不难发现,它在迭代器内使用了 final 修饰的引用类型数组 snapshot,这就使所有的修改操作中将实例指向新数组的 setArray(newElements)
这个操作在迭代器的生命周期内无法执行,以此保证在迭代过程中不会抛出并发修改异常。
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
2.4 CopyOnWriteArrayList 的缺点
- 内存占用:如果 CopyOnWriteArrayList 经常要增删改里面的数据,经常要执行
add()、set()、remove()
的话,那是比较耗费内存的。- 因为这些修改操作都要 复制一个数组 出来。
- 数据一致性:CopyOnWrite 容器 只能保证数据的最终一致性,不能保证数据的实时一致性。
- 从上面的例子也可以看出来,比如线程 A 在迭代 CopyOnWriteArrayList 容器的数据。线程B在线程A迭代的间隙中将 CopyOnWriteArrayList 部分的数据修改了(已经调用
setArray()
了)。但是线程 A 迭代出来的是原有的数据。
- 从上面的例子也可以看出来,比如线程 A 在迭代 CopyOnWriteArrayList 容器的数据。线程B在线程A迭代的间隙中将 CopyOnWriteArrayList 部分的数据修改了(已经调用
Map 类的安全容器
1. HashTable
Hashtable
:将 get / put 所有相关操作都 synchronized 化,这相当于给整个哈希表加了一把 大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作 串行化,在竞争激烈的并发场景中性能就会非常差。
HashTable 与 Vector 一样,由于较大的同步开销和复合操作的并发异常 已被弃用,如今以 HashMap 和 ConcurrentHashMap 来代替其使用。
2. JDK 1.7 的 ConcurrentHashMap
ConcurrentHashMap
:使用继承了 ReentrantLock
的 segments 数组代替整个桶数据,segment 实例包含一个 HashEntry<K, V>[]
table,即 segment 就是一个小的 HashMap,并且自带了一把锁。每个 segment 守护着若干个 HashEntry 键值对链表,当对这些 HashEntry 链表数据进行修改时,必须首先获得对应的 Segment 实例的锁。每个 segment 只锁容器其中一部分数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。
final Segment<K,V>[] segments;
static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 并发级别
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
}
看下源码中的构造器可以得知:
- ConcurrentHashMap 根据初始容量 initialCapacity 和并发级别
concurrencyLevel
来决定需要创建的 segments 数组的数量,以及每个 segment 中键值对的桶数组大小。(这两个数都为 2 的次方数) - 新建 segments 数组及一个 segment 实例,并将实例放到数组的第 0 个位置。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) { // 根据并发级别决定segment数组大小
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; // 根据c算出每个segment里有几个桶数组
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
观察其 put 方法:
- put 时会先计算 key 的哈希值所在的 segments 数组下标
- 判断所要 put 的 segment 是否为空,通过
s = ensureSegment(j)
构造 segment 实例对象。在该方法内,会先拿到构造器中创建的 segments 数组中第 0 个segment 作为模版 prototype,拿到模版 segment 中的桶数组大小、装载因子信息来创建当前需要的 segment 实例。 - 调用 segment 中的 put 方法:
- 使用
tryLock()
尝试获取锁(segment实例),获取失败后会通过scanAndLockForPut(key, hash, value)
用循环的方式反复请求这把锁。 - 算得 segment 中哈希桶的数组下标,拿到桶内第一个键值对 first,接下来的 put 操作和在 HashMap 中的类似,不再赘述。
modCount
用来保证 fail-fast 机制。
- 使用
public V put(K key, V value) {
Segment<K,V> s;
if (value == null) // value值不允许为null
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask; // segments 数组下标
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
// segment 中的 put 方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value); // 加锁
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash; // segment中哈希桶的数组下标
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock(); // 解锁
}
return oldValue;
}
3. JDK 1.8 的 ConcurrentHashMap
到了 JDK 1.8 的时候已经摒弃了分段锁,而是使用 synchronized 桶数组头节点 Node 和 CAS 来实现并发控制。( JDK 1.6 以后 对 synchronized 锁做了很多优化, 整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK 1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本)
3.1 ConcurrentHashMap 源码文档解读
源码的注释文档主要说了这么几点:
- 支持高并发的检索和更新,即使所有的操作都是线程安全,但检索时是不加锁的,也不支持锁定整个表。
- 检索 / get 操作不会阻塞,检索会反映最近一次完成更新时的结果(即在当前时间点的值)。虽然不会抛出并发修改异常,迭代器却是被设计为在单线程下使用的。
- 一些查询状态的方法如 size、isEmpty、containsValue 在多线程下只反映表的临时状态,结果可用于监控状态,而不能用于条件判定(高并发下统计数据如 size 等其实是没有意义的,因为它在下一时刻就被改变了)。
- 支持批量操作。
- ConcurrentHashMap 的 key 和 Value 都不能为 null。
3.2 ConcurrentHashMap 线程安全实现原理
相比 Java 1.7 的 ConcurrentHashMap 使用 “锁分段” 来保证数据的线程安全,Java 1.8 的ConcurrentHashMap 通过 同步指定哈希桶的第一个节点 来实现这个功能。
- 若头节点不存在则会调用
casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null))
以线程安全的方式创建一个新的头节点。 volatile Node<K, V> next;
用 volatile 修饰节点来保证每次获取的都是最新设置的值。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 获取桶数组下标
if (casTabAt(tab, i, null, // 如果桶为空,执行cas方法来new一个新节点,同时保证并发数据安全
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 当前表正在转移数据,用当前线程帮忙转移
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { // 以第一个节点作为锁
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 遍历链表找key更新value
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 遍历到链表尾部,插入一个新节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树的put
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 判断是否树化
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}