Amino是无锁并行框架,线程安装,该框架封装了无锁算法,提供了可用于线程安全的一些数据结构,同时还内置了一些多线程调度模式。使用Amino进行软件开发有以下的优势:
1.对死锁的问题免疫
2.确保系统并发的整体进度
3.降低高并发下无锁竞争带来的性能开销
4.可以轻松使用一些成熟的无锁结构,而无需执行研发
有一种多线程同步的机制cas(compare and swap),他是基于操作系统的cas指令进行判断。
原理步骤:
1、先取出临界资源的值
2、接着将此值作为期望值,和临界值的最新值对比,如果相同,说明没有其他线程修改,直接更新;如果不相同,则说明被其他线程修改过了,回到步骤1继续
jdk内部已经实现了部分数据结构的cas无锁算法。比如AtomicInteger、AtomicIntegerArray、AtomicLong、AtomicLongArray、AtomicDouble、AtomicDoubleArray、AtomicBoolean。
但是没有对list、set、tree、Grap的实现。
Amino就是这样一款基于cas(compare and swap)无锁算法的框架,高并发,高性能。
提供了
list(LockFreeList、LockFreeVector)、
set(LockFreeSet)、
tree(LockFreeBSTree)、
Grap(UndirectedGrap)
2、Amino如何引入
不提供maven依赖,所以需要把源码下载,然后自己编译,最后把jar放到私服上即可引用。 源码下载地址:Concurrent Building Block - Browse /cbbs at SourceForge.net 编译放到maven私服,然后在pom.xml文件中引入即可融入执行项目进行整合开发,如下图所示:
3、如何使用
直接new 对应的类即可实现
list(LockFreeList、LockFreeVector)
set(LockFreeSet)
tree(LockFreeBSTree)
Grap(UndirectedGrap)
============================================================================
一.List
Amino提供了一组List的实现方式,其中最为重要的两种是LockFreeList和LockFreeVector,他们都实现了java.util.List接口;LockFreeList使用链表的作为底层的数据结构,实现了线程安全的无锁List,而LockFreeVector使用连续的数据作为底层数据结构,实现了线程安全的无锁Vector,LockFreeList和LockFreeVector的关系,就如同LinkedList和ArrayList一样;
下面我们是用LockFreeVector,LockFreeList,Vector,和实现线程安全的LinkedList进行了在高并发环境的性能进行对比。每一个测试线程AccessListThread对每一种List分别作1000次添加和删除操作:
===========================================================================
package org.jd.amino.concurrent.data.chat;import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import org.amino.ds.lockfree.LockFreeSet;
import org.junit.Test;public class TestLockFreeSet {private static final int MAX_THREADS = 2000;private static final int TASK_COUNT = 4000;java.util.Random rand=new java.util.Random();Set set;public class AccessSetThread implements Runnable{protected String name;public AccessSetThread(){}public AccessSetThread(String name){this.name=name;}@Overridepublic void run() {try {for(int i=0;i<500;i++)handleSet(rand.nextInt(1000));Thread.sleep(rand.nextInt(100));} catch (InterruptedException e) {e.printStackTrace();}}}public class CounterPoolExecutor extends ThreadPoolExecutor{private AtomicInteger count =new AtomicInteger(0);public long startTime=0;public String funcname="";public CounterPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}protected void afterExecute(Runnable r, Throwable t) { int l=count.addAndGet(1);if(l==TASK_COUNT){System.out.println(funcname+" spend time:"+(System.currentTimeMillis()-startTime));}}}public Object handleSet(int index){set.add(rand.nextInt(2000));if(set.size()>10000)set.clear();return null;}public void initSet(){set=Collections.synchronizedSet(new HashSet());}public void initFreeLockSet(){set=new LockFreeSet();}//@Testpublic void testSet() throws InterruptedException {initSet();CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());long starttime=System.currentTimeMillis();exe.startTime=starttime;exe.funcname="testSet";for(int i=0;i<TASK_COUNT;i++)exe.submit(new AccessSetThread());Thread.sleep(10000);}@Testpublic void testLockFreeSet() throws InterruptedException {initFreeLockSet();CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());long starttime=System.currentTimeMillis();exe.startTime=starttime;exe.funcname="testLockFreeSet";for(int i=0;i<TASK_COUNT;i++)exe.submit(new AccessSetThread());Thread.sleep(10000);}
}
由测试结果来看,在高并发的情况下,Amino提供的List性能要远超出JDK内置的基于锁的List性能要高出六七倍;
在高并发环境下,使用无锁的集合可以有效的提升系统的吞吐量。通过Amino框架,可以让开发人员轻松使用这种技术;
======================================================================
Amino CBB (Concurrent Building Blocks) 类库将提供优化后的并发线程组件,适用于JDK6.0 及其以后的版本。
Amino Java 类库将涉及下面四个方面的内容:
1) 数据结构
该组件将提供一套免锁的集合类。因为这些数据结构采用免锁的运算法则来生成,所
以,它们将拥有基本的免锁组件的特性,如可以避免不同类型的死锁,不同类型的线程初始
化顺序等。
2) 并行模式
Amino 将为应用程序提供一个或几个大家熟知的并行计算模式。采用这些并行模式可
以使开发者起到事半功倍的效果,这些模式包括 Master-Worker、Map-reduce、Divide and
conquer, Pipeline 等,线程调度程序可以与这些模式类协同工作,提供了开发效率。
3) 并行计算中的一般功能
Amino 将为应用程序提供并行计算中常用的方法,例如:
a. String、Sequence 和Array 的处理方面。如Sort、Search、Merge、Rank、Compare、
Reverse、 Shuffle、Rotate 和Median 等
4)原子和STM(软件事务内存模型)
--------------------------------
在Amino 类库中,主要算法将使用锁无关的(Lock-Free)的数据结构。
原语Compare-and-swap(CAS) 是实现锁无关数据结构的通用原语。CAS 可以原子
地比较一个内存位置的内容及一个期望值,如果两者相同,则用一个指定值取替这个内存位
罝里的内容,并且提供结果指示这个操作是否成功。
CAS 操作过程是:当处理器要更新一个内存位置的值的时候,它首
先将目前内存位置的值与它所知道的修改前的值进行对比(要知道在多处理的时候,你要更
新的内存位置上的值有可能被其他处理更新过,而你全然不知),如果内存位置目前的值与
期望的原值相同(说明没有被其他处理更新过),那么就将新的值写入内存位置;而如果不
同(说明有其他处理在我不知情的情况下改过这的值咯),那么就什么也不做,不写入新的
值(现在最新的做法是定义内存值的版本号,根据版本号的改变来判断内存值是否被修改,
一般情况下,比较内存值的做法已经满足要求了)。CAS 的价值所在就在于它是在硬件级别
实现的,速度那是相当的快。
————————————————下面提供多份测试代码——————————————
import java.util.Collection;
import java.util.List;
import java.util.Vector;import org.amino.pattern.internal.Doable;
import org.amino.pattern.internal.DynamicWorker;
import org.amino.pattern.internal.MasterWorker;
import org.amino.pattern.internal.MasterWorkerFactory;
import org.amino.pattern.internal.WorkQueue;
import org.junit.Test;public class TestMasterWorker {public class Pow3 implements Doable<Integer,Integer>{@Overridepublic Integer run(Integer input) {return input*input*input;}}public class Pow3Dyn implements DynamicWorker<Integer,Integer>{@Overridepublic Integer run(Integer w, WorkQueue<Integer> wq) {return w*w*w;}}@Testpublic void testStatic() {MasterWorker<Integer,Integer> mw=MasterWorkerFactory.newStatic(new Pow3());List<MasterWorker.ResultKey> keyList=new Vector<MasterWorker.ResultKey>();for(int i=0;i<100;i++){keyList.add(mw.submit(i));}mw.execute();int re=0;while(keyList.size()>0){ //不等待全部执行完成,就开始求和MasterWorker.ResultKey k=keyList.get(0);Integer i=mw.result(k);if(i!=null){re+=i;keyList.remove(0);}}System.out.println(re);}@Testpublic void testDynamic() {MasterWorker<Integer,Integer> mw=MasterWorkerFactory.newDynamic(new Pow3Dyn());List<MasterWorker.ResultKey> keyList=new Vector<MasterWorker.ResultKey>();for(int i=0;i<50;i++)keyList.add(mw.submit(i));mw.execute(); //在已经开始执行的情况下,继续添加任务for(int i=50;i<100;i++)keyList.add(mw.submit(i));int re=0;while(keyList.size()>0){ //不等待全部执行完成,就开始求和MasterWorker.ResultKey k=keyList.get(0);Integer i=mw.result(k);if(i!=null){re+=i;keyList.remove(0);}}System.out.println(re);}}
===========================测试Dictionary===================================
import java.util.HashMap;
import java.util.Set;
import java.util.TreeMap;import org.amino.ds.lockfree.LockFreeDictionary;
import org.junit.Test;public class TestLockFreeDictionaryDemo {@Testpublic void test(){LockFreeDictionary<Integer ,Object> map=new LockFreeDictionary<Integer ,Object>();for(int i=0;i<100;i++)map.put(i, i);Set<Integer> keys=map.keySet();for(Integer i:keys)System.out.println(i);}@Testpublic void testTreeMap(){TreeMap<Integer ,Object> map=new TreeMap<Integer ,Object>();for(int i=0;i<100;i++)map.put(i, i);Set<Integer> keys=map.keySet();for(Integer i:keys)System.out.println(i);}//@Testpublic void testHashMap(){HashMap<Integer ,Object> map=new HashMap<Integer ,Object>();for(int i=0;i<100;i++)map.put(i, i);Set<Integer> keys=map.keySet();for(Integer i:keys)System.out.println(i);}
}
=================================测试Map====================================
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import org.amino.ds.lockfree.LockFreeDictionary;
import org.junit.Test;public class TestLockFreeMap {private static final int MAX_THREADS = 20;private static final int TASK_COUNT = 40;java.util.Random rand=new java.util.Random();private static Object DUMMY=new Object();Map map;public class AccessMapThread implements Runnable{protected String name;public AccessMapThread(){}public AccessMapThread(String name){this.name=name;}@Overridepublic void run() {try {for(int i=0;i<50000;i++)handleMap(rand.nextInt(1000));Thread.sleep(rand.nextInt(100));} catch (InterruptedException e) {e.printStackTrace();}}}public class CounterPoolExecutor extends ThreadPoolExecutor{private AtomicInteger count =new AtomicInteger(0);public long startTime=0;public String funcname="";public CounterPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}protected void afterExecute(Runnable r, Throwable t) { int l=count.addAndGet(1);if(l==TASK_COUNT){System.out.println(funcname+" spend time:"+(System.currentTimeMillis()-startTime));}}}public Object handleMap(int index){map.put(rand.nextInt(2000), DUMMY);return map.get(index);}public void initLockFreeMap(){map=new LockFreeDictionary();for(int i=0;i<1000;i++)map.put(i, DUMMY);}public void initTreeMap(){map=Collections.synchronizedMap(new TreeMap());for(int i=0;i<1000;i++)map.put(i, DUMMY);}@Testpublic void testLockFreeMap() throws InterruptedException {initLockFreeMap();CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());long starttime=System.currentTimeMillis();exe.startTime=starttime;exe.funcname="testLockFreeMap";Runnable r=new AccessMapThread();for(int i=0;i<TASK_COUNT;i++)exe.submit(r);Thread.sleep(10000);}//@Testpublic void testTreeMap() throws InterruptedException {initTreeMap();CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());long starttime=System.currentTimeMillis();exe.startTime=starttime;exe.funcname="testTreeMap";Runnable r=new AccessMapThread();for(int i=0;i<TASK_COUNT;i++)exe.submit(r);Thread.sleep(10000);}
}
======================================================================