Gitee上开源的数据结构与算法代码库:数据结构与算法Gitee代码库
阻塞队列
- 1. 概述
- 2. 代码实现
- a. 代码接口
- b. 单锁实现
- c. 双锁实现
1. 概述
之前的队列在很多场景下都不能很好地工作,例如
- 大部分场景要求分离向队列放入(生产者)、从队列拿出(消费者)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
- 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
- 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试
因此我们需要解决的问题有
- 用锁保证线程安全
- 用条件变量让等待非空线程与等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转
有同学对线程安全还没有足够的认识,下面举一个反例,两个线程都要执行入队操作(几乎在同一时刻)
2. 代码实现
a. 代码接口
public interface BlockingQueue<E> { // 阻塞队列void offer(E e) throws InterruptedException;boolean offer(E e, long timeout) throws InterruptedException;E poll() throws InterruptedException;
}
b. 单锁实现
/*** 单锁实现* @param <E> 元素类型*/
@SuppressWarnings("all")
public class BlockingQueue1<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private int size;private ReentrantLock lock = new ReentrantLock();private Condition headWaits = lock.newCondition();private Condition tailWaits = lock.newCondition();public BlockingQueue1(int capacity){array = (E[]) new Object[capacity];}private boolean isEmpty(){return size == 0;}private boolean isFull(){return size == array.length;}@Overridepublic void offer(E e) throws InterruptedException { // poll 等待队列非空lock.lockInterruptibly(); // 加锁try {while (isFull()){tailWaits.await(); // 线程阻塞}array[tail] = e;if (++tail == array.length){tail = 0;}size++;headWaits.signal(); // 唤醒等待非空的线程}finally {lock.unlock(); // 解锁}}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {lock.lockInterruptibly(); // 加锁try {long t = TimeUnit.MILLISECONDS.toNanos(timeout);while (isFull()){if (t <= 0){return false;}t = tailWaits.awaitNanos(t); // 最多等待多少纳秒 返回值代表剩余时间}array[tail] = e;if (++tail == array.length){tail = 0;}size++;headWaits.signal(); // 唤醒等待非空的线程}finally {lock.unlock(); // 解锁}return false;}@Overridepublic E poll() throws InterruptedException {lock.lockInterruptibly();try{while (isEmpty()){headWaits.await();}E e = array[head];array[head] = null; // help GCif (++head == array.length){head = 0;}size--;tailWaits.signal();return e;}finally {lock.unlock();}}@Overridepublic String toString() {return Arrays.toString(array);}}
c. 双锁实现
单锁的缺点在于:
- 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
- 冲突的主要是生产者之间:多个 offer 线程修改 tail
- 冲突的还有消费者之间:多个 poll 线程修改 head
如果希望进一步提高性能,可以用两把锁
- 一把锁保护 tail
- 另一把锁保护 head
ReentrantLock headLock = new ReentrantLock(); // 保护 head 的锁
Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合ReentrantLock tailLock = new ReentrantLock(); // 保护 tail 的锁
Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合
/*** 双锁实现* @param <E> 元素类型*/
@SuppressWarnings("all")
public class BlockingQueue2<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private AtomicInteger size = new AtomicInteger();private ReentrantLock tailLock = new ReentrantLock();private Condition tailWaits = tailLock.newCondition();private ReentrantLock headLock = new ReentrantLock();private Condition headWaits = headLock.newCondition();public BlockingQueue2(int capacity) {this.array = (E[]) new Object[capacity];}private boolean isEmpty() {return size.get() == 0;}private boolean isFull() {return size.get() == array.length;}@Overridepublic String toString() {return Arrays.toString(array);}@Overridepublic void offer(E e) throws InterruptedException {int c; // 添加前元素个数tailLock.lockInterruptibly();try {// 1. 队列满则等待while (isFull()) {tailWaits.await(); // offer2}// 2. 不满则入队array[tail] = e;if (++tail == array.length) {tail = 0;}// 3. 修改 size/*size = 6*/c = size.getAndIncrement();if (c + 1 < array.length) {tailWaits.signal();}/*1. 读取成员变量size的值 52. 自增 63. 结果写回成员变量size 6*/} finally {tailLock.unlock();}// 4. 如果从0变为非空,由offer这边唤醒等待非空的poll线程// 0->1 1->2 2->3if(c == 0) {headLock.lock(); // offer_1 offer_2 offer_3try {headWaits.signal();} finally {headLock.unlock();}}}@Overridepublic E poll() throws InterruptedException {E e;int c; // 取走前的元素个数headLock.lockInterruptibly();try {// 1. 队列空则等待while (isEmpty()) {headWaits.await(); // poll_4}// 2. 非空则出队e = array[head];array[head] = null; // help GCif (++head == array.length) {head = 0;}// 3. 修改 sizec = size.getAndDecrement();// 3->2 2->1 1->0// poll_1 poll_2 poll_3if (c > 1) {headWaits.signal();}/*1. 读取成员变量size的值 52. 自减 43. 结果写回成员变量size 4*/} finally {headLock.unlock();}// 4. 队列从满->不满时 由poll唤醒等待不满的 offer 线程if(c == array.length) {tailLock.lock();try {tailWaits.signal(); // ctrl+alt+t} finally {tailLock.unlock();}}return e;}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {return false;}public static void main(String[] args) throws InterruptedException {BlockingQueue2<String> queue = new BlockingQueue2<>(3);queue.offer("元素1");queue.offer("元素2");new Thread(()->{try {queue.offer("元素3");} catch (InterruptedException e) {throw new RuntimeException(e);}}, "offer").start();new Thread(()->{try {queue.poll();} catch (InterruptedException e) {throw new RuntimeException(e);}}, "poll").start();}
}