专栏简介: JavaEE从入门到进阶
题目来源: leetcode,牛客,剑指offer.
创作目标: 记录学习JavaEE学习历程
希望在提升自己的同时,帮助他人,,与大家一起共同进步,互相成长.
学历代表过去,能力代表现在,学习能力代表未来!
目录:
1.阻塞队列的概念
2.标准库中的阻塞队列
3.生产者消费者模型
3.1 生产者消费者模型的意义:
3.2 生产者消费者模型的作用:
3.2 生产者消费者模型:
4.阻塞队列实现
1.阻塞队列的概念
阻塞队列是一种特殊的队列 , 也遵循"先进先出"的原则.
阻塞队列是一种线程安全的数据结构 , 具体特性如下:
- 当队列满时 , 继续向队列中添加元素就会产生阻塞 , 直到其他线程从队列中取走元素.
- 当队列空时 , 继续取队列中的元素就会产生阻塞 , 直到其他线程向队列中添加元素.
阻塞队列的一个典型应用场景就是"生产者消费者模型" , 是一种典型的开发方式.
2.标准库中的阻塞队列
Java标准库中内置了阻塞队列 , 如果我们需要在一些程序中使用阻塞队列 , 直接调用库即可.
阻塞队列队列是一个接口 , 具体的实现类是优先级阻塞队列 , 顺序表阻塞队列 , 链表阻塞队列.
public interface BlockingQueue<E> extends Queue<E>
- put方法用于阻塞式的入队列 , take方法用于阻塞式的出队列
- 阻塞队列也有 poll() , offer() , peek()这些方法 , 但不具有阻塞性
BlockingQueue queue = new LinkedBlockingQueue();queue.put("hello");queue.take();
3.生产者消费者模型
3.1 生产者消费者模型的意义:
生产者消费者模型 , 本质上就是通过一个容器来解决生产者和消费者之间强耦合的问题.
通常意义下的生产者和消费者之间的耦合程度是很高的 , 如果生产者和消费者直接相关联 , 那么二者中任意一个出现错误就会导致另一个也出现错误. 如果要修改其中一个的代码 , 另一个也要修改不少代码 , 还要重新测试 , 重新发布 , 重新部署....非常麻烦.
那么如果我们使用"生产者消费者模型" , 生产者和消费者彼此之间不进行通信 , 而是通过一个阻塞队列来通信 , 所以生成者生成完数据不用直接给消费者 , 而是直接扔给阻塞队列.消费者也不再向生产者所要数据 , 而是直接从阻塞队列中取. 这样生产者和消费者之间的耦合程度就大大降低了.
3.2 生产者消费者模型的作用:
- 1."削峰填谷" , 阻塞队列相当于一个缓冲区 , 平衡了生产者和消费者的处理能力.
在"秒杀"场景下 , 服务器同一时刻可能收到大量的支付请求 , 如果直接处理这些请求 , 服务器可能会崩溃 , 这时如果把这些请求放入阻塞队列中 , 然后再由服务器来慢慢处理一个个请求 , 由此达到"削峰"的效果.
- 2.阻塞队列也能使生成者和消费者之间"解耦".
比如一家人过年包饺子 , 这个流程需要由明确的分工 , 擀饺子皮的人是"生产者" , 包饺子的人是"消费者" , 擀饺子皮的人不必关心谁包饺子 , 包饺子的人也不必关心谁擀饺子皮.
3.2 生产者消费者模型:
创建一个阻塞队列 , 一个生产者线程和一个消费者线程. 为了达到"生产一个 消费一个的效果" , 需要让生产者每生产一个就停顿一下.
public static void main(String[] args) {BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();Thread customer = new Thread(()->{try {while (true){int ret = blockingQueue.take();System.out.println("消费"+ret);}} catch (InterruptedException e) {throw new RuntimeException(e);}});Thread producer = new Thread(()->{int count = 0;while (true){try {blockingQueue.put(count);System.out.println("生产"+count);count++;Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});customer.start();producer.start();}
4.阻塞队列实现
实现阻塞队列首先要实现一个队列 , 本文采用"循环队列的实现方式".
1) 实现循环队列
class MyBlockingQueue{public int head;public int tail;public int size;public int[] queue = new int[1000];//入队public void put(int value){if(size == queue.length){return;}queue[tail] = value;tail++;if(tail>=queue.length){tail = 0;}size++;}//出队public Integer take(){if(size == 0){return null;}int ret = queue[head];head++;if(head>=queue.length){head = 0;}size--;return ret;}
}
2) 改进为阻塞队列
在循环队列的基础上 , 入队时发现队伍满了 , 就使用 wait 阻塞. 出队时发现队伍空了 , 也用 wait 阻塞. 当入队操作阻塞时 , 如果出队操作执行完毕就可以通知入队操作解除阻塞. 当出队操作阻塞时 , 如果入队操作执行完毕就可以通知出队操作解除阻塞.
- 那么是否会出现入队和出队操作同时陷入阻塞的情况?
答案是不会 , 因为入队和出队被同步代码块限定为同一个队列 , 同一个队列不可能出现即空又满的情况 , 否则就会成为薛定谔的队列.
- 那么是否会出现入队的 wait 被唤醒 , 队伍还是满或出队的 wait 被唤醒队伍还是空的情况?
理论上我们实现的代码不会产生这个问题 , 但为了保险起见 , 我们要将返回的 wait 再判定一次 , 看此时的条件是否具备.因此我们可以参考 Java 标准库中的 wait , 使用循环判定.
class MyBlockingQueue{public int head = 0;public int tail = 0;public int size = 0;//表示元素个数public int[] arrQ = new int[1000];//入队 putpublic void put(int value) throws InterruptedException {synchronized (this) {while (size == arrQ.length ){//为了防止出现wait被唤醒还是满的情况.符合标准库的规范//队列满了 , 产生阻塞this.wait();}arrQ[tail] = value;tail++;if(tail>=arrQ.length){tail=0;}size++;//这个notify take()中的waitthis.notify();}}//出队 takepublic Integer take() throws InterruptedException {int ret;synchronized (this) {while (size == 0){this.wait();}ret = arrQ[head];head++;if(head>=arrQ.length){head=0;}size--;//唤醒 put() 中的waitthis.notify();}return ret;}
}
测试自定义阻塞队列:
public class ThreadDemo6 {public static void main(String[] args) throws InterruptedException {MyBlockingQueue myBlockingQueue = new MyBlockingQueue();//创建两个线程来作为生产者和消费者Thread customer = new Thread(()->{while (true){try {int result = myBlockingQueue.take();System.out.println("消费"+result);} catch (InterruptedException e) {throw new RuntimeException(e);}}});customer.start();Thread producer = new Thread(()->{int count = 0;while (true){try {System.out.println("生产"+count);myBlockingQueue.put(count);count++;Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();}
}