数据结构与算法-阻塞队列

news/2024/11/25 4:34:07/

Gitee上开源的数据结构与算法代码库:数据结构与算法Gitee代码库

阻塞队列

  • 1. 概述
  • 2. 代码实现
    • a. 代码接口
    • b. 单锁实现
    • c. 双锁实现

1. 概述

之前的队列在很多场景下都不能很好地工作,例如

  1. 大部分场景要求分离向队列放入(生产者)、从队列拿出(消费者)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
  2. 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
  3. 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试

因此我们需要解决的问题有

  1. 用锁保证线程安全
  2. 用条件变量让等待非空线程等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转

有同学对线程安全还没有足够的认识,下面举一个反例,两个线程都要执行入队操作(几乎在同一时刻)

2. 代码实现

a. 代码接口

public interface BlockingQueue<E> { // 阻塞队列void offer(E e) throws InterruptedException;boolean offer(E e, long timeout) throws InterruptedException;E poll() throws InterruptedException;
}

b. 单锁实现

/*** 单锁实现* @param <E> 元素类型*/
@SuppressWarnings("all")
public class BlockingQueue1<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private int size;private ReentrantLock lock = new ReentrantLock();private Condition headWaits = lock.newCondition();private Condition tailWaits = lock.newCondition();public BlockingQueue1(int capacity){array = (E[]) new Object[capacity];}private boolean isEmpty(){return size == 0;}private boolean isFull(){return size == array.length;}@Overridepublic void offer(E e) throws InterruptedException { // poll 等待队列非空lock.lockInterruptibly(); // 加锁try {while (isFull()){tailWaits.await(); // 线程阻塞}array[tail] = e;if (++tail == array.length){tail = 0;}size++;headWaits.signal(); // 唤醒等待非空的线程}finally {lock.unlock(); // 解锁}}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {lock.lockInterruptibly(); // 加锁try {long t = TimeUnit.MILLISECONDS.toNanos(timeout);while (isFull()){if (t <= 0){return false;}t = tailWaits.awaitNanos(t); // 最多等待多少纳秒  返回值代表剩余时间}array[tail] = e;if (++tail == array.length){tail = 0;}size++;headWaits.signal(); // 唤醒等待非空的线程}finally {lock.unlock(); // 解锁}return false;}@Overridepublic E poll() throws InterruptedException {lock.lockInterruptibly();try{while (isEmpty()){headWaits.await();}E e = array[head];array[head] = null; // help GCif (++head == array.length){head = 0;}size--;tailWaits.signal();return e;}finally {lock.unlock();}}@Overridepublic String toString() {return Arrays.toString(array);}}

c. 双锁实现

单锁的缺点在于:

  • 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
  • 冲突的主要是生产者之间:多个 offer 线程修改 tail
  • 冲突的还有消费者之间:多个 poll 线程修改 head

如果希望进一步提高性能,可以用两把锁

  • 一把锁保护 tail
  • 另一把锁保护 head
ReentrantLock headLock = new ReentrantLock();  // 保护 head 的锁
Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合ReentrantLock tailLock = new ReentrantLock();  // 保护 tail 的锁
Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合
/*** 双锁实现* @param <E> 元素类型*/
@SuppressWarnings("all")
public class BlockingQueue2<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private AtomicInteger size = new AtomicInteger();private ReentrantLock tailLock = new ReentrantLock();private Condition tailWaits = tailLock.newCondition();private ReentrantLock headLock = new ReentrantLock();private Condition headWaits = headLock.newCondition();public BlockingQueue2(int capacity) {this.array = (E[]) new Object[capacity];}private boolean isEmpty() {return size.get() == 0;}private boolean isFull() {return size.get() == array.length;}@Overridepublic String toString() {return Arrays.toString(array);}@Overridepublic void offer(E e) throws InterruptedException {int c; // 添加前元素个数tailLock.lockInterruptibly();try {// 1. 队列满则等待while (isFull()) {tailWaits.await(); //  offer2}// 2. 不满则入队array[tail] = e;if (++tail == array.length) {tail = 0;}// 3. 修改 size/*size = 6*/c = size.getAndIncrement();if (c + 1 < array.length) {tailWaits.signal();}/*1. 读取成员变量size的值  52. 自增 63. 结果写回成员变量size 6*/} finally {tailLock.unlock();}// 4. 如果从0变为非空,由offer这边唤醒等待非空的poll线程//                       0->1   1->2    2->3if(c == 0) {headLock.lock(); // offer_1 offer_2 offer_3try {headWaits.signal();} finally {headLock.unlock();}}}@Overridepublic E poll() throws InterruptedException {E e;int c; // 取走前的元素个数headLock.lockInterruptibly();try {// 1. 队列空则等待while (isEmpty()) {headWaits.await(); // poll_4}// 2. 非空则出队e = array[head];array[head] = null; // help GCif (++head == array.length) {head = 0;}// 3. 修改 sizec = size.getAndDecrement();// 3->2   2->1   1->0// poll_1 poll_2 poll_3if (c > 1) {headWaits.signal();}/*1. 读取成员变量size的值 52. 自减 43. 结果写回成员变量size 4*/} finally {headLock.unlock();}// 4. 队列从满->不满时 由poll唤醒等待不满的 offer 线程if(c == array.length) {tailLock.lock();try {tailWaits.signal(); // ctrl+alt+t} finally {tailLock.unlock();}}return e;}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {return false;}public static void main(String[] args) throws InterruptedException {BlockingQueue2<String> queue = new BlockingQueue2<>(3);queue.offer("元素1");queue.offer("元素2");new Thread(()->{try {queue.offer("元素3");} catch (InterruptedException e) {throw new RuntimeException(e);}}, "offer").start();new Thread(()->{try {queue.poll();} catch (InterruptedException e) {throw new RuntimeException(e);}}, "poll").start();}
}

http://www.ppmy.cn/news/819397.html

相关文章

安装mysql报错error: command ‘gcc‘ failed with exit status 1

错误描述&#xff1a;error: command ‘gcc’ failed with exit status 1 [rootcloud-codec-qa-1d826557e-4 ~]# pip install MySQL-python1.2.5 DEPRECATION: Python 2.7 reached the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 is no…

什么才是好照片?购机A7M3两个月心得

购入相机已经有两个月&#xff0c;说说这这两个月来的心得。 我以前拍照一直在追求构图的精妙&#xff0c;拍人像的时候要求别人配合我&#xff0c;要求别人拍照的时候都要微笑&#xff0c;拍景色的时候我不肯随意按下快门&#xff0c;一定要等一个我觉得构图非常妙的场景才按…

华为开发者大会CodeArts专题论坛直播回放

华为开发者大会2023已圆满结束 没有去现场或者没有看直播的开发者们不要着急 小智为各位开发者整理了CodeArts专题论坛的直播回放 专题论坛&#xff1a;AI“邂逅”一站式软件开发&#xff0c;CodeArts以10倍效能“绘”企业应用远景 内容介绍&#xff1a; 华为云软件开发平台…

docker安装单机版nacos,并把数据保存到MySQL

1.下载镜像(请根据cloud版本选择) docker pull nacos/nacos-server:1.4.12.启动临时镜像并拷贝文件 docker run -p 8848:8848 -p 9848:9848 -p 9849:9849 --name nacos-temp \ -d nacos/nacos-server:1.4.1后面是需要修改的本机路径 docker cp nacos-temp:/home/nacos/logs/…

准备买笔记本电脑了.

来到学校也有一段时间了,我现在在准备补考,等到补考以后我准备买笔记本了,现在真的觉得很无奈呀,想买个笔记本好好的学习以下linux技术和c/c技术,现在没有自己的电脑,大家都用windows,也没有机会学习linux了. 我给爸爸说了&#xff0c;爸爸也同意买笔记本了,我现在想的是买一个…

购买笔记本电脑注意啊

购买笔记本电脑&#xff0c;如果钱足够&#xff0c;建议买品牌的&#xff0c;一次性购买高配置好&#xff1f; …………为什么呢&#xff1f;………… 升级硬件有限:所以在你购买之前考虑好&#xff0c;考虑⚠️清楚&#xff0c;考虑后期升级硬件。 ⚠️重点建议:&#x1f6…

QTcpServer::incomingConnection(qintptr)跨线程传递socket失败

有监听线程A和传输线程B&#xff0c;在线程A中开启监听并通过incomingConnection方法得到native socket句柄&#xff0c;然后通过信号槽传递给线程B&#xff0c;以实现B线程中创建QTcpSocket对象。但线程B中的槽一直不响应。 class TcpServer : public QTcpServer {Q_OBJECT p…

快讯|​2023 FOX Upfront 主打 Tubi

在每月一期的 Tubi 快讯中&#xff0c;你将全面及时地获取 Tubi 最新发展动态&#xff0c;欢迎关注【比图科技】&#xff0c;一起成长变强&#xff01; 2023 FOX Upfront 主打 Tubi 2023 年 5 月 15 日&#xff0c;FOX 在纽约曼哈顿中心举行一年一度的 FOX Upfront&#xff0c…