Amino框架无锁算法实现并发线程安装组件(一)

news/2025/2/19 17:34:20/

 Amino是无锁并行框架,线程安装,该框架封装了无锁算法,提供了可用于线程安全的一些数据结构,同时还内置了一些多线程调度模式。使用Amino进行软件开发有以下的优势:

1.对死锁的问题免疫

2.确保系统并发的整体进度

3.降低高并发下无锁竞争带来的性能开销

4.可以轻松使用一些成熟的无锁结构,而无需执行研发

有一种多线程同步的机制cas(compare and swap),他是基于操作系统的cas指令进行判断。

原理步骤:

1、先取出临界资源的值

2、接着将此值作为期望值,和临界值的最新值对比,如果相同,说明没有其他线程修改,直接更新;如果不相同,则说明被其他线程修改过了,回到步骤1继续

jdk内部已经实现了部分数据结构的cas无锁算法。比如AtomicInteger、AtomicIntegerArray、AtomicLong、AtomicLongArray、AtomicDouble、AtomicDoubleArray、AtomicBoolean。

但是没有对list、set、tree、Grap的实现。

Amino就是这样一款基于cas(compare and swap)无锁算法的框架,高并发,高性能。

提供了

list(LockFreeList、LockFreeVector)、

set(LockFreeSet)、

tree(LockFreeBSTree)、

Grap(UndirectedGrap)

2、Amino如何引入

不提供maven依赖,所以需要把源码下载,然后自己编译,最后把jar放到私服上即可引用。 源码下载地址:Concurrent Building Block - Browse /cbbs at SourceForge.net 编译放到maven私服,然后在pom.xml文件中引入即可融入执行项目进行整合开发,如下图所示:

3、如何使用

直接new 对应的类即可实现

list(LockFreeList、LockFreeVector)

set(LockFreeSet)

tree(LockFreeBSTree)

Grap(UndirectedGrap)

============================================================================

一.List

Amino提供了一组List的实现方式,其中最为重要的两种是LockFreeList和LockFreeVector,他们都实现了java.util.List接口;LockFreeList使用链表的作为底层的数据结构,实现了线程安全的无锁List,而LockFreeVector使用连续的数据作为底层数据结构,实现了线程安全的无锁Vector,LockFreeList和LockFreeVector的关系,就如同LinkedList和ArrayList一样;

下面我们是用LockFreeVector,LockFreeList,Vector,和实现线程安全的LinkedList进行了在高并发环境的性能进行对比。每一个测试线程AccessListThread对每一种List分别作1000次添加和删除操作:

===========================================================================

package org.jd.amino.concurrent.data.chat;import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import org.amino.ds.lockfree.LockFreeSet;
import org.junit.Test;public class TestLockFreeSet {private static final int MAX_THREADS = 2000;private static final int TASK_COUNT = 4000;java.util.Random rand=new java.util.Random();Set set;public class AccessSetThread implements Runnable{protected String name;public AccessSetThread(){}public AccessSetThread(String name){this.name=name;}@Overridepublic void run() {try {for(int i=0;i<500;i++)handleSet(rand.nextInt(1000));Thread.sleep(rand.nextInt(100));} catch (InterruptedException e) {e.printStackTrace();}}}public class CounterPoolExecutor extends ThreadPoolExecutor{private AtomicInteger count =new AtomicInteger(0);public long startTime=0;public String funcname="";public CounterPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}protected void afterExecute(Runnable r, Throwable t) { int l=count.addAndGet(1);if(l==TASK_COUNT){System.out.println(funcname+" spend time:"+(System.currentTimeMillis()-startTime));}}}public Object handleSet(int index){set.add(rand.nextInt(2000));if(set.size()>10000)set.clear();return null;}public void initSet(){set=Collections.synchronizedSet(new HashSet());}public void initFreeLockSet(){set=new LockFreeSet();}//@Testpublic void testSet() throws InterruptedException {initSet();CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());long starttime=System.currentTimeMillis();exe.startTime=starttime;exe.funcname="testSet";for(int i=0;i<TASK_COUNT;i++)exe.submit(new AccessSetThread());Thread.sleep(10000);}@Testpublic void testLockFreeSet() throws InterruptedException {initFreeLockSet();CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());long starttime=System.currentTimeMillis();exe.startTime=starttime;exe.funcname="testLockFreeSet";for(int i=0;i<TASK_COUNT;i++)exe.submit(new AccessSetThread());Thread.sleep(10000);}
}

由测试结果来看,在高并发的情况下,Amino提供的List性能要远超出JDK内置的基于锁的List性能要高出六七倍;

在高并发环境下,使用无锁的集合可以有效的提升系统的吞吐量。通过Amino框架,可以让开发人员轻松使用这种技术;

======================================================================

Amino CBB (Concurrent Building Blocks) 类库将提供优化后的并发线程组件,适用于JDK6.0 及其以后的版本。

Amino Java 类库将涉及下面四个方面的内容:

1) 数据结构 
该组件将提供一套免锁的集合类。因为这些数据结构采用免锁的运算法则来生成,所
以,它们将拥有基本的免锁组件的特性,如可以避免不同类型的死锁,不同类型的线程初始
化顺序等。 
2) 并行模式 
Amino 将为应用程序提供一个或几个大家熟知的并行计算模式。采用这些并行模式可
以使开发者起到事半功倍的效果,这些模式包括 Master-Worker、Map-reduce、Divide and 
conquer, Pipeline 等,线程调度程序可以与这些模式类协同工作,提供了开发效率。 
3) 并行计算中的一般功能 
Amino 将为应用程序提供并行计算中常用的方法,例如: 
a. String、Sequence  和Array  的处理方面。如Sort、Search、Merge、Rank、Compare、
Reverse、 Shuffle、Rotate 和Median 等 
4)原子和STM(软件事务内存模型) 

--------------------------------
在Amino 类库中,主要算法将使用锁无关的(Lock-Free)的数据结构。 

原语Compare-and-swap(CAS)  是实现锁无关数据结构的通用原语。CAS  可以原子
地比较一个内存位置的内容及一个期望值,如果两者相同,则用一个指定值取替这个内存位
罝里的内容,并且提供结果指示这个操作是否成功。

CAS 操作过程是:当处理器要更新一个内存位置的值的时候,它首
先将目前内存位置的值与它所知道的修改前的值进行对比(要知道在多处理的时候,你要更
新的内存位置上的值有可能被其他处理更新过,而你全然不知),如果内存位置目前的值与
期望的原值相同(说明没有被其他处理更新过),那么就将新的值写入内存位置;而如果不
同(说明有其他处理在我不知情的情况下改过这的值咯),那么就什么也不做,不写入新的
值(现在最新的做法是定义内存值的版本号,根据版本号的改变来判断内存值是否被修改,
一般情况下,比较内存值的做法已经满足要求了)。CAS 的价值所在就在于它是在硬件级别
实现的,速度那是相当的快。
————————————————下面提供多份测试代码——————————————

import java.util.Collection;
import java.util.List;
import java.util.Vector;import org.amino.pattern.internal.Doable;
import org.amino.pattern.internal.DynamicWorker;
import org.amino.pattern.internal.MasterWorker;
import org.amino.pattern.internal.MasterWorkerFactory;
import org.amino.pattern.internal.WorkQueue;
import org.junit.Test;public class TestMasterWorker {public class Pow3 implements Doable<Integer,Integer>{@Overridepublic Integer run(Integer input) {return input*input*input;}}public class Pow3Dyn implements DynamicWorker<Integer,Integer>{@Overridepublic Integer run(Integer w, WorkQueue<Integer> wq) {return w*w*w;}}@Testpublic void testStatic() {MasterWorker<Integer,Integer> mw=MasterWorkerFactory.newStatic(new Pow3());List<MasterWorker.ResultKey> keyList=new Vector<MasterWorker.ResultKey>();for(int i=0;i<100;i++){keyList.add(mw.submit(i));}mw.execute();int re=0;while(keyList.size()>0){					//不等待全部执行完成,就开始求和MasterWorker.ResultKey k=keyList.get(0);Integer i=mw.result(k);if(i!=null){re+=i;keyList.remove(0);}}System.out.println(re);}@Testpublic void testDynamic() {MasterWorker<Integer,Integer> mw=MasterWorkerFactory.newDynamic(new Pow3Dyn());List<MasterWorker.ResultKey> keyList=new Vector<MasterWorker.ResultKey>();for(int i=0;i<50;i++)keyList.add(mw.submit(i));mw.execute();								//在已经开始执行的情况下,继续添加任务for(int i=50;i<100;i++)keyList.add(mw.submit(i));int re=0;while(keyList.size()>0){					//不等待全部执行完成,就开始求和MasterWorker.ResultKey k=keyList.get(0);Integer i=mw.result(k);if(i!=null){re+=i;keyList.remove(0);}}System.out.println(re);}}

===========================测试Dictionary===================================


import java.util.HashMap;
import java.util.Set;
import java.util.TreeMap;import org.amino.ds.lockfree.LockFreeDictionary;
import org.junit.Test;public class TestLockFreeDictionaryDemo {@Testpublic void test(){LockFreeDictionary<Integer ,Object> map=new LockFreeDictionary<Integer ,Object>();for(int i=0;i<100;i++)map.put(i, i);Set<Integer> keys=map.keySet();for(Integer i:keys)System.out.println(i);}@Testpublic void testTreeMap(){TreeMap<Integer ,Object> map=new TreeMap<Integer ,Object>();for(int i=0;i<100;i++)map.put(i, i);Set<Integer> keys=map.keySet();for(Integer i:keys)System.out.println(i);}//@Testpublic void testHashMap(){HashMap<Integer ,Object> map=new HashMap<Integer ,Object>();for(int i=0;i<100;i++)map.put(i, i);Set<Integer> keys=map.keySet();for(Integer i:keys)System.out.println(i);}
}

=================================测试Map====================================

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import org.amino.ds.lockfree.LockFreeDictionary;
import org.junit.Test;public class TestLockFreeMap {private static final int MAX_THREADS = 20;private static final int TASK_COUNT = 40;java.util.Random rand=new java.util.Random();private static Object DUMMY=new Object();Map map;public class AccessMapThread implements Runnable{protected String name;public AccessMapThread(){}public AccessMapThread(String name){this.name=name;}@Overridepublic void run() {try {for(int i=0;i<50000;i++)handleMap(rand.nextInt(1000));Thread.sleep(rand.nextInt(100));} catch (InterruptedException e) {e.printStackTrace();}}}public class CounterPoolExecutor extends ThreadPoolExecutor{private AtomicInteger count =new AtomicInteger(0);public long startTime=0;public String funcname="";public CounterPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}protected void afterExecute(Runnable r, Throwable t) { int l=count.addAndGet(1);if(l==TASK_COUNT){System.out.println(funcname+" spend time:"+(System.currentTimeMillis()-startTime));}}}public Object handleMap(int index){map.put(rand.nextInt(2000), DUMMY);return map.get(index);}public void initLockFreeMap(){map=new LockFreeDictionary();for(int i=0;i<1000;i++)map.put(i, DUMMY);}public void initTreeMap(){map=Collections.synchronizedMap(new TreeMap());for(int i=0;i<1000;i++)map.put(i, DUMMY);}@Testpublic void testLockFreeMap() throws InterruptedException {initLockFreeMap();CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());long starttime=System.currentTimeMillis();exe.startTime=starttime;exe.funcname="testLockFreeMap";Runnable r=new AccessMapThread();for(int i=0;i<TASK_COUNT;i++)exe.submit(r);Thread.sleep(10000);}//@Testpublic void testTreeMap() throws InterruptedException {initTreeMap();CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());long starttime=System.currentTimeMillis();exe.startTime=starttime;exe.funcname="testTreeMap";Runnable r=new AccessMapThread();for(int i=0;i<TASK_COUNT;i++)exe.submit(r);Thread.sleep(10000);}
}

======================================================================


 


http://www.ppmy.cn/news/247338.html

相关文章

Nginx优化、Nginx+Tomcat实现负载均衡、动静分离集群部署

Nginx优化、NginxTomcat实现负载均衡、动静分离集群部署 一、Tomcat 优化二、Tomcat多实例部署1、安装好jdk2、安装tomcat3、配置tomcat环境变量4、修改tomcat2中的server.xml文件&#xff0c;要求各tomcat实例配置不能有重复的端口号5、修改各tomcat实例中的startup.sh和shutd…

基于lamp搭建Discuz论坛

Discuz!是腾讯&#xff08;Tencent&#xff09;旗下 Comsenz 公司推出的以社区为基础的专业建站平台&#xff0c;帮助网站实现一站式服务。让论坛&#xff08;BBS&#xff09;、个人空间&#xff08;SNS&#xff09;、门户&#xff08;Portal&#xff09;、群组&#xff08;Gro…

论坛php board,Crossday Discuz! Board 论坛系统Discuz!

Crossday Discuz! Board 论坛系统(简称 Discuz! 论坛&#xff0c;中国国家版权局著作权登记号 2006SR11895)是一个采用 PHP 和 MySQL 等其他多种数据库构建的高效论坛解决方案。作为商业软件产品&#xff0c; Discuz! 在代码质量&#xff0c;运行效率&#xff0c;负载能力&…

Discuz论坛附件下载权限绕过漏洞

近日&#xff0c;有网友在乌云上发布了一则Discuz论坛附件下载权限绕过漏洞&#xff0c;能够任意下载带有权限的附件并且无需扣除自身积分。目前Discuz正在处理中&#xff0c;但暂未放出漏洞补丁&#xff0c;有需要的朋友不妨趁漏洞修补之前到各论坛大肆搜刮一番。 漏洞重现步…

php绕过授权下载,Discuz附件下载权限绕过方法

越权下载含有“阅读权限”的插件、下载插件免扣币 重现步骤&#xff1a; 1、使用管理员账户&#xff0c;上传一个有高阅读权限的附件 2、使用低权限的用户账户&#xff0c;下载附件&#xff0c;这个时候&#xff0c;Discuz会提示无权下载 此时&#xff0c;浏览器中的附件地址形…

Discuz论坛超漂亮手机模板

介绍&#xff1a; 一、首页&#xff1a;门户首页波浪幻灯片显示&#xff08;可选开关&#xff09; 首页添加了“最新”“精华”“热帖”“回复”四种显示浏览方式&#xff1b; 美化帖子列表显示&#xff0c;添加圆弧边样式显示&#xff1b;等级样式&#xff0c;不同的等级有不…

免费discuz 白色简洁论坛类源码下载

网站模板库&#xff1a; 我们提供的模板下载大部分均为网盘下载方式&#xff0c;请打开相应网盘链接输入提取码即可下载&#xff01; 下载地址&#xff1a; http://www.bytepan.com/KHRv0PvXBmo 模板描述&#xff1a; 该模板网站很容易吸引访客点击&#xff0c;提升ip流量和…

discuz论坛安装流程

discuz论坛安装流程 作者&#xff1a;蓝眼泪 实验环境&#xff1a;xshell7&#xff0c;xftp7&#xff0c;centos7.9&#xff0c;win7. 本人用LAMP方式安装discuz论坛&#xff0c;并试验成功。原创不易&#xff0c;且行且珍惜。免费分享给大家。一起交流&#xff0c;共同进步…