首页 > 学院 > 开发设计 > 正文

ConcurrentHashMap 探索

2019-11-08 01:42:35
字体:
来源:转载
供稿:网友

为什么要使用 ConcurrentHashMap

因为 HashMap 是非线程安全的

在多线程情况下,HashMap 的 put 操作会形成死循环。 原因:Entry 链表形成环形结构。

下面具体说明形成环形结构的过程:

正常的 Rehash 过程

这里写图片描述

并发的 Rehash

rehash 关键代码:

void transfer(Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K,V> e : table) { while(null != e) { Entry<K,V> next = e.next; if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } }}

(1)线程二执行完成

这里写图片描述

注意:因为线程一的 e 指向了 key(3),而 next 指向了 key(7),其在线程二 rehash 后,指向了线程二重组后的链表。

(2)接下来线程一被调度回来执行

这里写图片描述

(3)线程一接着工作。把 key(7)摘下来,放到 newTable[i] 的第一个,然后把e和next往下移。

这里写图片描述

(4)环形链表出现 e.next = newTable[i] 导致 key(3).next 指向了 key(7)。注意:此时的key(7).next 已经指向了key(3), 环形链表就这样出现了。

这里写图片描述

因为 HashTable 效率低下

HashTable 对所有操作都做了同步操作,在线程竞争激烈的情况下因过多的阻塞导致效率低下。

ConcurrentHashMap 锁分段技术提高并发效率

ConcurrentHashMap 实现了线程安全,因此解决了 HashMap 的不足;同时它不像 HashTable 一样对所有的操作都加同步,而是使用锁分段技术,容器中有多把锁,每一把锁管理着一段数据,当一个线程占用锁访问其数据时,其他段的数据也能被其他线程访问到。极大的提升并发效率。

在 HashMap 的介绍中我们知道其结构如下:

这里写图片描述

而 ConcurrentHashMap 的结构如下:

这里写图片描述

HashMap 是一个数组(数组+链表)结构,数组的每个元素都是一个链表的表头;ConcurrentHashMap 是多个数组(数组+链表)结构,每个数组归一个 Segment 管理,因此 ConcurrentHashMap 也可以看做是一个 Segment 数组构成。

ConcurrentHashMap 初始化

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss;}先是根据 concurrentLevel 来 new 出 Segment,这里 Segment 的数量是不大于 concurrentLevel 的最大的 2 的指数(ssize),这样的好处是方便采用移位操作来进行 hash,加快 hash 的过程;接下来就是根据 intialCapacity 确定 Segment 的容量的大小,每一个 Segment 的容量大小也是 2 的指数,同样使为了加快 hash 的过程。

get 操作

public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null;}经过一次 hash 定位到 Segment 并得到 segment 的 HashEntry 数组。定位其在 HashEntry 数组中的哪个位置得到链表头;遍历链表根据 key 得到 value

get 方法并没有加锁,原因是它只读不写,并且将要使用的共享变量都定义成 volatile 类型,比如 HashEntry 的 value。

put 操作

首先定位到 Segment;然后在 Segment 里进行插入操作(加锁): 第一步判断是否需要对 Segment 里的 HashEntry 数组进行扩容;第二步定位添加元素的位置,然后将其放在HashEntry数组里。

Q:是否需要扩容

A:若插入元素前 Segment 里的 HashEntry 数组长度超过阈值,则对数组进行扩容。 值得一提的是,Segment 的扩容判断比HashMap更恰当,因为 HashMap 是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。

Q:如何扩容

A:在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。

size 操作

代码:

public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // PRevious sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size;}

Q:是不是直接把所有 Segment 的 count 相加就可以得到整个ConcurrentHashMap大小了呢? A:不是的。

虽然相加时可以获取每个Segment的count的最新值,但是可能累加前使用的count发生了变化,那么统计结果就不准了。所以,最安全的做法是在统计 size 的时候把所有 Segment 的 put、remove 和 clean 方法全部锁住,但是这种做法显然非常低效。巧妙的方式(如上述代码): 先尝试 2 次通过不锁住 Segment 的方式来统计,如果统计的过程中,容器的 count 发生了变化,则再采用加锁的方式来统计。那么如何判断在统计的时候容器是否发生了变化呢?使用 modCount 变量,所有的 update 操作都会将变量 modCount 进行加 1。

remove 操作

先进行定位得到 segment;然后在 segment 中进行 remove(需加锁) 定位到 segment 中的 HashEntry;遍历 HashEntry 链表找到并删除;
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表