1. 主源码逻辑
java"> final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 1.计算key对应的hashint hash = spread(key.hashCode());int binCount = 0;// 2. 进行自旋 for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 3. 首次进来的时候table为空if (tab == null || (n = tab.length) == 0)// 4. 进行初始化 此时没有任何锁机制 因此该初始化采用CAS机制,保证只有一个线程能 够初始化完成,其它线程就可以直接使用该初始化的tabletab = initTable();// 5.(n-1)&hash -> 0-15 计算数组下标位置else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 6.如果当前位置为空,则采用cas操作保证原子性直接进行赋值(tabAt casTabAt 都是设置的内存偏移量来取值和设置值的)if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}// 7. 当前节点正在进行table扩容和数据转移操作 则当前线程协助扩容和数据转移else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);// 8. 同步锁代码块 进行值的覆盖或添加else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;// 9.进行值的覆盖if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}// 10.进行值的添加 挂在链表下面Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 11. 若当前节点属于红黑树 则进行红黑树的添加else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 12. 判断是否要进行扩容还是转红黑树if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 13.统计元素个数addCount(1L, binCount);return null;}
2. 首次put操作
假设有3个线程t1,t2,t3此时同时put,t1和t2的key完全相同,与t3的是hash相同,key不同,初次进来时,进行表的初始化,table的初始化位一个CAS操作,因此只有一个线程会进行初始化,另外两个线程此时会不断的自旋等待。
精简主逻辑代码
java"> if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;// 首次自旋 初始化一个默认为16长度的tablefor (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();}
2.1 初始化table(initTable)
java"> private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;// 4.1只要table没有初始化,就不断循环直到初始化完成while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin//4.2 通过CAS来占用一个锁的标记 sizeCTL = -1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 4.3 说明当前线程抢到了锁try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2); // 4.4 保留扩容的阈值 12}} finally {sizeCtl = sc;}break;}}return tab;}
分析: sizeCtl作为一个ConcurrentHashMap的控制标签,贯穿全局 未初始化时默认为0 初始化中 U.compareAndSwapInt(this, SIZECTL, sc, -1) 采用CAS操作保证其原子性,将sizeCtl更新为-1,表示当前table正在初始化中,初始化的table默认长度为16,其扩容阈值sizeCtl = 12
2.2 casTabAt赋值操作
精简主逻辑源码:
java"> final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 线程竞争进行赋值if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}}addCount(1L, binCount);return null;}
此时线程1 线程2 线程3 会同时进入到赋值操作,赋值为CAS操作,假设线程1抢占资源成功,则线程1中的key,value赋值到该table索引处,退出循环,执行addCount 统计元素个数; 线程2,线程3 在此处CAS失败,则会再次自旋,等线程1赋值成功后,走入下面的if判断中。
主逻辑源码精简:
java"> final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {// a.代表该节点下是链表数据结构if (fh >= 0) { binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;// b.进行值的覆盖if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;// c.挂在链表后面if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}}}}}addCount(1L, binCount);return null;}
此时线程2和线程3 进入到a中,线程2的赋值进入到b的代码,线程3的赋值进入到c的代码,随后也进入到addCount中进行元素个数的统计
3.第N次put操作,链表下长度达到阈值TREEIFY_THRESHOLD
假设此时线程4,和线程5又同时进行put操作,且key值的hash计算出来的索引处链表已经达到了长度8,则此时,程4和线程5的值会先新增到原来的链表上,然后进入到代码treeifyBin(tab, i)中,判断是将table进行扩容还是转为红黑树
精简主逻辑代码
java"> if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)// 对table进行扩容或者将链表转为红黑树treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}
java"> private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {// 判断当前table的长度是否达到了阈值64if ((n = tab.length) < MIN_TREEIFY_CAPACITY)// 如果没有,则进行扩容和数据转移操作tryPresize(n << 1);}}
此时table的长度为16,没有达到阈值64,则执行tryPresize,进行table的扩容和数据转移
3.1 table扩容和数据转移 tryPresize
java"> private final void tryPresize(int size) {//1. 计算出一个最大容量,判断table是否已经达到了最大容量int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;//2. 此时sizeCtl = 12(前面的阈值)while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;//3. 如果table没有初始化,则进行初始化if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}// 4.判断是否已经达到了最大阈值,如果已经达到,则不再允许扩容else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {// 5.计算出一个扩容戳,作为扩容标记,保证当前扩容的唯一性 此时n为16,则生成的扩容戳则为 1000 0000 0001 1001int rs = resizeStamp(n);// 6.判断当前是否已经有线程在进行扩容和数据转移操作了// 因为sc = sizeCtl,而sizeCtl如果有线程在进行transfer操作,则会变为负数if (sc < 0) {Node<K,V>[] nt;// 假设当前已经有一个线程在进行操作了,则此时 // 第一个条件: sc = 1000 0000 0001 1001 0000 0000 0000 0010 // rs = 1000 0000 0001 1001 sc右移 16位时与rs相等,故为false// 第二、三个条件也位为false 当有线程在操作时nextTable不为空, // transferIndex也不为空if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// 当前transfer操作未结束,则加入协助,SIZECTL -> 1000 0000 0001 1001 // 0000 0000 0000 0011 在低16位上加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 首次执行该逻辑 将SIZECTL 12-> 1000 0000 0001 1001 0000 0000 0000 0010 // 其中高16位代表当前操作的唯一性,低16位表示当前正在参与transfer操作的线程数else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}}
分析:
3.2 多线程协助数据转移transfer
主要做两件事: 1.table的扩容;2.把旧数据从旧table转移到新table中
transfer代码拆解分析:
3.2.1 table扩容
新table的扩容,由原来的16扩容到32,并用 nextTable记录新Table,transferIndex 记录旧table的长度
java"> private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// 1.旧table长度nint n = tab.length, stride;// 2.计算线程进行数据操作的区间,默认为16if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 3. 扩容后table的初始化 16 -> 32if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}// 4. 记录新tablenextTable = nextTab;// 5. 记录旧table的长度,也可以说是线程转移的索引值transferIndex = n;}}
3.2.2 数据转移
java"> private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// 1. 记录新table的长度 32int nextn = nextTab.length;// 2. 一个占位符,表示当前节点正在迁移中, 该节点的hash值为MOVED -1ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTab// 3.循环迁移 迁移的区间为[bound,i]for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 4. 计算出迁移的区间 [bound,i]while (advance) {int nextIndex, nextBound;// 4. 表示迁移完成 退出循环if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 5.CAS操作, 计算迁移区间 此时stride = 16,nextIndex = 16,所以最后计算出来的 // 期间为[0,15]else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}// 6. 判断数据是否迁移完成if (i < 0 || i >= n || i + n >= nextn) {int sc;// 7. 数据迁移完成,赋值新tableif (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}// 8. cas操作 更新 SIZECTL 减去一个线程 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 9.当所有线程都完成后,进行returnif ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}// 10.进行转移操作,当前索引处为null值时,标记为MOVEDelse if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 11. 如果当前节点的hash为MOVED,表示当前节点已经迁移了else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 12. 进入同步代码块synchronized (f) {if (tabAt(tab, i) == f) {// 13.记录链表的高低位,进行高低位迁移Node<K,V> ln, hn;if (fh >= 0) {// 14. 记录当前数据是否需要迁移 若为1则表示需要迁移,若为0则不需要 int runBit = fh & n;Node<K,V> lastRun = f;// 15. 循环遍历整个链表,进行hash校验for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}// 16.记录下低位 lnif (runBit == 0) {ln = lastRun;hn = null;}// 17. 记录高位 hnelse {hn = lastRun;ln = null;}// 18. 计算出哪些需要迁移hn 哪些不需要迁移ln for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 19.将不需要迁移的仍挂在旧table的该处索引下setTabAt(nextTab, i, ln);//20. 需要迁移的放在新的table中,其间隔为16setTabAt(nextTab, i + n, hn);//21. 更新此处的标记 为MOVEDsetTabAt(tab, i, fwd);advance = true;}}}}}}
分析: 上述代码中第5处的CAS操作,若原数组有64个,则第一个线程进来的时候进行该CAS操作,获得的线程区间就是[47,63],第二个线程再次进来的时候获的数据转移区间就是[31,47]依次类推
确定好数据转移区间后,一种是链表数据的迁移,另一种是红黑树数据的迁移,红黑树数据的迁移在此不进行展开
链表的迁移主要采用的是高低链位迁移法.
具体是怎么做的呢? 为什么fh & n为1就是高位需要迁移了,为0就不需要迁移了。 以4和20举例,原数组长度16计算出来的索引为 4&15=4 20 &15= 4 而15变为二进制是为00001111,当数组长度变为32时, 计算的索引为 4& 31 = 4 20&31 =20 31为00011111,而此时fh&n n 为16,高位为1,4& 16为0则为低位链, 20& 16就为高位链,于是就区分开了低位链和高位链。
4.元素个数的统计
java"> private final void addCount(long x, int check) {CounterCell[] as; long b, s;// 1. 个数的统计if ((as = counterCells) != null ||// 1.1 第一次竞争 竞争修改baseCount!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;// 1.2 baseCount竞争失败,a.CountCells未初始化时,进入fullAddCount进行初始化// b. CountCells[]已经初始化,且索引处没有值时,进入fullAddCount进行添加// c. CountCells处有值时,竞争修改值,竞争失败,进入fullAddCount中if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;// 1.3 baseCount竞争失败,就会统计元素个数s = sumCount();}// 2.多线程并发扩容table,竞争非常激烈的时候,s大于了阈值if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;if (sc < 0) {if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||(nt = nextTable) == null || transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc, rs + 2))transfer(tab, null);s = sumCount();}}}
分析: 1.该统计元素个数,采用的是baseCount计数和CountCells列表计数,保证在并发场景下计数的效率提升。但是该sumCount 没有加锁,不能保证实时一致性,只能保证最终一致性,因为baseCount和CountCells都是CAS操作。
当竞争非常激烈的时候
第一次竞争,竞争baseCount U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)竞争成功,直接在baseCount上加,baseCount竞争失败,则进入第二次竞争
第二次竞争,此时countCells未初始化,进入fullAddCount中,初始化一个长度为2的数组,并把元素个数放入数组中
java"> private final void fullAddCount(long x, boolean wasUncontended) {int h;// 1. 获得线程安全随机数if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonempty// 2.自旋,保证统计个数的成功for (;;) {CounterCell[] as; CounterCell a; int n; long v;// 3.初始化CountCells CAS保证初始化成功else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try { // 初始化为一个长度为2的数组,并把value 元素个数 放入数组中if (counterCells == as) {CounterCell[] rs = new CounterCell[2];rs[h & 1] = new CounterCell(x);counterCells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}}}
第三次竞争:竞争baseCount失败,并且此时CountCells数组索引处没值的话,则fullAddCount进行添加:
java"> if ((as = counterCells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCounterCell r = new CounterCell(x); // Optimistic create//1. CAS 竞争添加 假如一个线程添加成功,另一个线程竞争失败if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash// 2.若竞争空值失败 则直接在有值的地方进行添加,若这里仍有其它线程竞争,再次竞争失败else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break; else if (counterCells != as || n >= NCPU)collide = false; // At max size or staleelse if (!collide)collide = true;// 3. 索引处有值竞争失败,则继续竞争进行CAS操作进行 扩容countCellselse if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {if (counterCells == as) {// Expand table unless stale// 4.扩容为原来的两倍CounterCell[] rs = new CounterCell[n << 1];// 5.遍历原数组,直接放到新数组里面for (int i = 0; i < n; ++i)rs[i] = as[i];counterCells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = ThreadLocalRandom.advanceProbe(h);
// 6.如果上面都竞争失败了,说明CountCells数组竞争已经相当激烈了,该线程回去竞争baseCount去
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base}
第四次竞争: 在索引处有值的时候。如果竞争失败,则进行CountCells数组的扩容,扩容为CAS操作,如果此时也竞争失败,表明CountCells的竞争已经相当激烈了,再回去竞争baseCount去。
元素个数统计整体流程图如下:
5. ConcurrentHashMap总结
1. 存在线程竞争的table初始化、赋值以及扩容、数据转移处采用细粒度的CAS操作提升并发效率,整体主要是CAS机制和同步代码块结合使用
2. 多线程并发协助扩容和数据转移,在数据转移处,采用多线程并发协助转移各个不同的分段区间,来提升table扩容时数据迁移的效率
3.数据迁移时的链表采用高低位快速迁移的方法提升效率
4.元素个数的统计为了应对高并发场景下,单个变量的统计会进行阻塞,则采用baseCount与CountCells数组列表相结合的方式,使得高并发下的统计的元素个数有多个存储的地方,提升效率。