同步容器类
同步容器类的问题
当通过复合操作并发修改容器时,可能出现线程不安全的问题
class VectorTest {public static Object getLast(Vector list) {int lastIndex = list.size() - 1;return list.get(lastIndex);}public static Object removeLast(Vector list) {int lastIndex = list.size() - 1;return list.remove(lastIndex);}
}
A/B两个线程运行情况可能如下
创建新操作时,应该和容器的其他操作一样都是原子操作,使用容器自身的锁保护方法
class VectorTest {public static Object getLast(Vector list) {synchronized (list) {int lastIndex = list.size() - 1;return list.get(lastIndex);}}public static Object removeLast(Vector list) {synchronized (list){int lastIndex = list.size() - 1;return list.remove(lastIndex);}}
}
同理,在遍历Vector时,size可能被其他线程修改,同样需要加锁
public static void print(Vector list) {synchronized (list) {for (int i = 0; i < list.size(); i++) {System.out.println(list.get(i));}}
}
加锁会影响性能,可以通过克隆容器(克隆时需要加锁),并在副本上进行迭代
隐藏迭代器
如下对set添加删除都加锁,但在打印的时候会调用其toString()方法,内部会迭代容器打印元素,故可能抛出并发修改异常,同理还有hashCode()/equals()等方法
class HiddenIterator {@GuardedBy("this")private final Set<Integer> set = new HashSet<>();public synchronized void add(Integer i) {set.add(i);}public synchronized void remove(Integer i) {set.remove(i);}public void addTenThings() {Random r = new Random();for (int i = 0; i < 10; i++) {add(r.nextInt());}System.out.println("addTenThings:" + set);}}
这里应该用synchronizedSet封装HashSet确保实现同步策略
并发容器
ConcurrentHashMap
ConcurrentHashMap并非将每个方法都在同一个锁上同步,而是使用分段锁,在并发环境下有更高的吞吐量,单线程环境中只损失较小的性能
- 任意数量的读取线程可以并发地访问Map
- 执行读取操作的线程和执行写入操作的线程可以并发地访问Map
- 一定数量的写入线程可以并发地修改Map
只有当应用需要加锁Map就行独占访问时,才使用Hashtable和synchronizedMap,否则应该使用ConcurrentHashMap
CopyOnWriteArrayList
CopyOnWriteArrayList用于替代同步List,并且在迭代期间不需要对容器进行加锁或复制
- 每次修改时都会创建并重新发布一个新的容器副本实现可变性
- 迭代器返回当前底层数组的快照,不必考虑修改操作所带来的影响
仅当迭代操作远远多于修改操作时,才使用CopyOnWriteArrayList,一般用于存储监听器
阻塞队列和生产者-消费者模式
阻塞队列的put/take方法将阻塞直到队列有空间/元素,若队列是无界的,则put永远不会阻塞
在基于阻塞队列构建的生产者-消费者设计中,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据
- LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似
- PriorityBlockingQueue是一个按优先级排序的队列
- SynchronousQueue没有存储功能,而是维护一组线程,这些线程在等待着把元素加入或移出队列
双端队列和工作密取
如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作,具体实现包括ArrayDeque和LinkedBlockingDeque
阻塞方法和中断方法
阻塞操作与执行时间很长的普通操作的差别在于,被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等待I/O操作完成,等待某个锁变成可用
Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。当线程A中断B时,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作——前提是如果线程B愿意停止下来
当在代码中调用了一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应
- 传递InterruptedException
- 调用当前线程上的interrupt方法恢复中断状态
同步工具类
闭锁
闭锁可以延迟线程的进度直到其到达终止状态,用来确保某些活动直到其他活动都完成后继续执行
CountDownLatch
CountDownLatch可以使一个或多个线程等待一组事件发生,如下测试n个线程并发执行某个任务时所需要的时间,需要所有线程创建后一起启动,所有线程执行完之后再计算时间
class TestHarness {public long timeTasks(int nThreads, final Runnable task)throws InterruptedException {final CountDownLatch startGate = new CountDownLatch(1);final CountDownLatch endGate = new CountDownLatch(nThreads);for (int i = 0; i < nThreads; i++) {Thread t = new Thread() {public void run() {try {startGate.await();try {task.run();} finally {endGate.countDown();}} catch (InterruptedException ignored) {}}};t.start();}long start = System.nanoTime();startGate.countDown();endGate.await();long end = System.nanoTime();return end - start;}
}
FutureTask
FutureTask的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,其有3个状态
- 等待运行
- 正在运行
- 运行完成
通过get获取任务的状态,若已完成则返回结果,否则阻塞直到任务完成或抛出异常
Semaphore
计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量
信号量中管理一组许可,其数量可通过构造函数指定,再执行操作时先通过acquire()获得许可(若没有则阻塞),使用完后通过release()释放许可
信号量可用于实现资源池,如线程池,此外还可以将任何一种容器变成有界阻塞容器
class BoundedHashSet<T> {private final Set<T> set;private final Semaphore sem;public BoundedHashSet(int bound) {set = Collections.synchronizedSet(new HashSet<T>());sem = new Semaphore(bound);}public boolean add(T o) throws InterruptedException {sem.acquire();boolean wasAdded = false;try {wasAdded = set.add(o);return wasAdded;} finally {if (!wasAdded)sem.release();}}public boolean remove(Object o) {boolean wasRemoved = set.remove(o);if (wasRemoved)sem.release();return wasRemoved;}}
Barrier
栅栏类似于闭锁,能阻塞一组线程直到某个事件发生。区别在于所有线程必须同时到达栅栏位置才能继续执行
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,当线程到达栅栏位置时调用await(),方法将阻塞,直到所有线程都达到栅栏位置。如果await()超时或中断,所有阻塞的await()调用都将终止并抛出异常
Exchanger是一种两方栅栏,各方在栅栏位置上交换数据