【Java】多线程和高并发编程(四):阻塞队列(上)基础概念、ArrayBlockingQueue

devtools/2025/3/7 6:01:02/

文章目录

  • 四、阻塞队列
    • 1、基础概念
      • 1.1 生产者消费者概念
      • 1.2 JUC阻塞队列的存取方法
    • 2、ArrayBlockingQueue
      • 2.1 ArrayBlockingQueue的基本使用
      • 2.2 生产者方法实现原理
        • 2.2.1 ArrayBlockingQueue的常见属性
        • 2.2.2 add方法实现
        • 2.2.3 offer方法实现
        • 2.2.4 offer(time,unit)方法
        • 2.2.5 put方法
      • 2.3 消费者方法实现原理
        • 2.3.1 remove方法
        • 2.4.2 poll方法
        • 2.4.3 poll(time,unit)方法
        • 2.4.4 take方法
        • 2.4.5 虚假唤醒

在这里插入图片描述
个人主页:道友老李
欢迎加入社区:道友老李的学习社区

四、阻塞队列

1、基础概念

1.1 生产者消费者概念

生产者消费者是设计模式的一种。让生产者和消费者基于一个容器来解决强耦合问题。

生产者 消费者彼此之间不会直接通讯的,而是通过一个容器(队列)进行通讯。

所以生产者生产完数据后扔到容器中,不通用等待消费者来处理。

消费者不需要去找生产者要数据,直接从容器中获取即可。

而这种容器最常用的结构就是队列。

1.2 JUC阻塞队列的存取方法

常用的存取方法都是来自于JUC包下的BlockingQueue

生产者存储方法

java">add(E)     	// 添加数据到队列,如果队列满了,无法存储,抛出异常
offer(E)    // 添加数据到队列,如果队列满了,返回false
offer(E,timeout,unit)   // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回false
put(E)      // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!

消费者取数据方法

java">remove()    // 从队列中移除数据,如果队列为空,抛出异常
poll()      // 从队列中移除数据,如果队列为空,返回null,么的数据
poll(timeout,unit)   // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取
take()     // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取

2、ArrayBlockingQueue

2.1 ArrayBlockingQueue的基本使用

ArrayBlockingQueue在初始化的时候,必须指定当前队列的长度。

因为ArrayBlockingQueue是基于数组实现的队列结构,数组长度不可变,必须提前设置数组长度信息。

java">public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {// 必须设置队列的长度ArrayBlockingQueue queue = new ArrayBlockingQueue(4);// 生产者扔数据queue.add("1");queue.offer("2");queue.offer("3",2,TimeUnit.SECONDS);queue.put("2");// 消费者取数据System.out.println(queue.remove());System.out.println(queue.poll());System.out.println(queue.poll(2,TimeUnit.SECONDS));System.out.println(queue.take());
}

2.2 生产者方法实现原理

生产者添加数据到队列的方法比较多,需要一个一个查看

2.2.1 ArrayBlockingQueue的常见属性

ArrayBlockingQueue中的成员变量

lock = 就是一个ReentrantLock
count = 就是当前数组中元素的个数
iterms = 就是数组本身
# 基于putIndex和takeIndex将数组结构实现为了队列结构
putIndex = 存储数据时的下标
takeIndex = 去数据时的下标
notEmpty = 消费者挂起线程和唤醒线程用到的Condition(看成sync的wait和notify)
notFull = 生产者挂起线程和唤醒线程用到的Condition(看成sync的wait和notify)
2.2.2 add方法实现

add方法本身就是调用了offer方法,如果offer方法返回false,直接抛出异常

java">public boolean add(E e) {if (offer(e))return true;else// 抛出的异常throw new IllegalStateException("Queue full");
}
2.2.3 offer方法实现
java">public boolean offer(E e) {// 要求存储的数据不允许为null,为null就抛出空指针checkNotNull(e);// 当前阻塞队列的lock锁final ReentrantLock lock = this.lock;// 为了保证线程安全,加锁lock.lock();try {// 如果队列中的元素已经存满了,if (count == items.length)// 返回falsereturn false;else {// 队列没满,执行enqueue将元素添加到队列中enqueue(e);// 返回truereturn true;}} finally {// 操作完释放锁lock.unlock();}
}//==========================================================
private void enqueue(E x) {// 拿到数组的引用final Object[] items = this.items;// 将元素放到指定位置items[putIndex] = x;// 对inputIndex进行++操作,并且判断是否已经等于数组长度,需要归位if (++putIndex == items.length)// 将索引设置为0putIndex = 0;// 元素添加成功,进行++操作。count++;// 将一个Condition中阻塞的线程唤醒。notEmpty.signal();
}
2.2.4 offer(time,unit)方法

生产者在添加数据时,如果队列已经满了,阻塞一会。

  • 阻塞到消费者消费了消息,然后唤醒当前阻塞线程
  • 阻塞到了time时间,再次判断是否可以添加,不能,直接告辞。
java">// 如果线程在挂起的时候,如果对当前阻塞线程的中断标记位进行设置,此时会抛出异常直接结束
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {// 非空检验checkNotNull(e);// 将时间单位转换为纳秒long nanos = unit.toNanos(timeout);// 加锁final ReentrantLock lock = this.lock;// 允许线程中断并排除异常的加锁方式lock.lockInterruptibly();try {// 为什么是while(虚假唤醒)// 如果元素个数和数组长度一致,队列慢了while (count == items.length) {// 判断等待的时间是否还充裕if (nanos <= 0)// 不充裕,直接添加失败return false;// 挂起等待,会同时释放锁资源(对标sync的wait方法)// awaitNanos会挂起线程,并且返回剩余的阻塞时间// 恢复执行时,需要重新获取锁资源nanos = notFull.awaitNanos(nanos);}// 说明队列有空间了,enqueue将数据扔到阻塞队列中enqueue(e);return true;} finally {// 释放锁资源lock.unlock();}
}
2.2.5 put方法

如果队列是满的, 就一直挂起,直到被唤醒,或者被中断

java">public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)// await方法一直阻塞,直到被唤醒或者中断标记位notFull.await();enqueue(e);} finally {lock.unlock();}
}

2.3 消费者方法实现原理

2.3.1 remove方法
java">// remove方法就是调用了poll
public E remove() {E x = poll();// 如果有数据,直接返回if (x != null)return x;// 没数据抛出异常elsethrow new NoSuchElementException();
}
2.4.2 poll方法
java">// 拉取数据
public E poll() {// 加锁操作final ReentrantLock lock = this.lock;lock.lock();try {// 如果没有数据,直接返回null,如果有数据,执行dequeue,取出数据并返回return (count == 0) ? null : dequeue();} finally {lock.unlock();}
}//==========================================================
// 取出数据
private E dequeue() {// 将成员变量引用到局部变量final Object[] items = this.items;// 直接获取指定索引位置的数据E x = (E) items[takeIndex];// 将数组上指定索引位置设置为nullitems[takeIndex] = null;// 设置下次取数据时的索引位置if (++takeIndex == items.length)takeIndex = 0;// 对count进行--操作count--;// 迭代器内容,先跳过if (itrs != null)itrs.elementDequeued();// signal方法,会唤醒当前Condition中排队的一个Node。// signalAll方法,会将Condition中所有的Node,全都唤醒notFull.signal();// 返回数据。return x;
}
2.4.3 poll(time,unit)方法
java">public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 转换时间单位long nanos = unit.toNanos(timeout);// 竞争锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 如果没有数据while (count == 0) {if (nanos <= 0)// 没数据,也无法阻塞了,返回nullreturn null;// 没数据,挂起消费者线程nanos = notEmpty.awaitNanos(nanos);}// 取数据return dequeue();} finally {lock.unlock();}
}
2.4.4 take方法
java">public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 虚假唤醒while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
}
2.4.5 虚假唤醒

阻塞队列中,如果需要线程挂起操作,判断有无数据的位置采用的是while循环 ,为什么不能换成if

肯定是不能换成if逻辑判断

线程A,线程B,线程E,线程C。 其中ABE生产者,C属于消费者

假如线程的队列是满的

java">// E,拿到锁资源,还没有走while判断
while (count == items.length)// A醒了// B挂起notFull.await();
enqueue(e)

C此时消费一条数据,执行notFull.signal()唤醒一个线程,A线程被唤醒

E走判断,发现有空余位置,可以添加数据到队列,E添加数据,走enqueue

如果判断是if,A在E释放锁资源后,拿到锁资源,直接走enqueue方法。

此时A线程就是在putIndex的位置,覆盖掉之前的数据,造成数据安全问题


http://www.ppmy.cn/devtools/164842.html

相关文章

网络原理 初识[Java EE]

目录 网络发展史 独立模式 网络互联 局域网 LAN 1. 基于网络直连 2. 基于集线器(Hub)组建 3. 基于交换机(Switch)组建 4. 基于交换机和路由器(Router)组建 广域网 WAN 网络通信基础 IP 地址 1. 概念 2. 格式 端口号 1. 概念 2.格式 认识协议 1. 概念 2. 作用…

CSS—元素水平居中:2分钟掌握常用的水平居中

个人博客&#xff1a;haichenyi.com。感谢关注 1. 目录 1–目录2–行内元素水平居中3–块级元素水平居中 2. 行内元素水平居中 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" …

pytorch 模型测试

在使用 PyTorch 进行模型测试时,一般包含加载测试数据、加载训练好的模型、进行推理以及评估模型性能等步骤。以下为你详细介绍每个步骤及对应的代码示例。 1. 导入必要的库 import torch import torch.nn as nn import torchvision import torchvision.transforms as trans…

一、Prometheus架构

Prometheus 云原生十二要素是一套最佳实践和规范,旨在帮助开发人员更好地构建云原生应用 这十二个要素分别是: 单一职责独立部署无状态声明式API服务发现容错处理自适应算法自动化运维响应式编程通信协议服务注册与发现数据持久化一、Prometheus 是什么 Prometheus 是一个…

RabbitMQ 最新版:安装、配置 与Java 接入详细教程

目录 一、RabbitMQ 简介二、RabbitMQ 的安装1. 安装 Erlang下载 Erlang安装 Erlang2. 安装 RabbitMQ下载 RabbitMQ安装 RabbitMQ3. 配置环境变量4. 启用管理插件三、RabbitMQ 的配置1. 创建用户和设置权限2. 配置文件四、Java 接入 RabbitMQ1. 添加依赖2. 创建连接3. 创建通道4…

简说spring 的设计模式

spring 的设计模式&#xff08;23种…) &#xff08;面试题&#xff09;说说BeanFactory和FactoryBean的实现原理和区别&#xff1f; spring 中你还知道哪些设计模式&#xff1f;&#xff1f; 1.简单工厂模式 实质&#xff1a; 由一个工厂类根据传入的参数&#xff0c;动态决…

基于vue3和spring boot实现大文件上传

前端&#xff08;Vue 3&#xff09; 1. 文件选择与分片上传 首先&#xff0c;我们需要一个文件选择器来选择要上传的文件&#xff0c;并将其分割成多个小块&#xff08;分片&#xff09;进行上传。我们还需要记录每个分片的上传状态以便支持断点续传。 <template><…

linux常见操作命令

查看目录和文件 ls&#xff1a;列出目录内容。 常用选项&#xff1a; -l&#xff1a;以长格式显示&#xff0c;显示文件的权限、所有者、大小、修改时间等详细信息。-a&#xff1a;显示所有文件和目录&#xff0c;包括隐藏文件&#xff08;以 . 开头的文件&#xff09;。-h&…