多线程-初阶(2)BlockingQueueThreadPoolExecutor

devtools/2024/10/10 23:54:21/

学习目标:

熟悉wait和notify的线程休眠和启动

熟悉多线程的基本案例

1.单例模式的两种设置模式:懒汉模式和饿汉模式

2.阻塞队列(生产者消费者模型)

3.线程池

4.定时器

1.wait和notify 

由于线程之间是抢占式执⾏的, 因此线程之间执⾏的先后顺序难以预知.
但是实际开发中有时候我们希望合理的协调多个线程之间的执⾏先后顺序.
完成这个协调⼯作, 主要涉及到三个⽅法:
wait() / wait(long timeout): 让当前线程进⼊等待状态
notify() / notifyAll(): 唤醒在当前对象上等待的线程

注意: wait, notify, notifyAll 都是 Object 类的⽅法。 

1.1 wait()⽅法 

wait方法的三步:

1.使当前执⾏代码的线程进⾏等待. (把线程放到等待队列中)
2.释放当前的锁
3.满⾜⼀定条件时被唤醒, 重新尝试获取这个锁.

wait 要搭配 synchronized 来使⽤. 脱离 synchronized 使⽤ wait 会直接抛出异常.  

wait 结束等待的条件:  

1.其他线程调⽤该对象的 notify ⽅法.
2.wait 等待时间超时 (wait ⽅法提供⼀个带有 timeout 参数的版本, 来指定等待时间).
3.其他线程调⽤该等待线程的 interrupted ⽅法, 导致 wait 抛出 InterruptedException 异常.
这一点跟sheep相似。

代码⽰例: 观察wait()⽅法使⽤  

1.2 notify()⽅法

notify ⽅法是唤醒等待的线程.
⽅法notify()也要在同步⽅法或同步块中调⽤,该⽅法是⽤来通知那些可能等待该对象的对象锁的其 它线程,对其发出通知notify,并使它们重新获取该对象的对象锁。
如果有多个线程等待,则有线程调度器随机挑选出⼀个呈 wait 状态的线程。(并没有 "先来后到")
在notify()⽅法后,当前线程不会⻢上释放该对象锁,要等到执⾏notify()⽅法的线程将程序执⾏
完,也就是退出同步代码块之后才会释放对象锁。
代码⽰例: 使⽤notify()⽅法唤醒线程
public static Object locker = new Object();public static void main(String[] args) {Thread t1 = new Thread(() ->  {synchronized(locker) {try {System.out.println("wait之前");locker.wait();//让线程进入等待System.out.println("wait之后");} catch (InterruptedException e) {e.printStackTrace();}}});Thread t2 = new Thread(() ->  {synchronized(locker) {try {Thread.sleep(1000);//保证t1线程进入wiat方法} catch (InterruptedException e) {e.printStackTrace();}System.out.println("notify之前");locker.notify();System.out.println("notify之后");}});t1.start();t2.start();}

注意:notify()方法也要放在synchronized里面

输出结果:

但是notify方法只能唤醒一个wiat方法。

代码如下:

public static Object locker = new Object();public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() ->  {synchronized(locker) {try {System.out.println("t1之前");locker.wait();//让线程进入等待System.out.println("t1之后");} catch (InterruptedException e) {e.printStackTrace();}}});Thread t2 = new Thread(() ->  {synchronized(locker) {try {System.out.println("t2之前");locker.wait();//让线程进入等待System.out.println("t2之后");} catch (InterruptedException e) {e.printStackTrace();}}});Thread t3 = new Thread(() ->  {synchronized(locker) {try {System.out.println("t3之前");locker.wait();//让线程进入等待System.out.println("t3之后");} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();t2.start();t3.start();Thread.sleep(1000);synchronized (locker) {System.out.println("notify之前");locker.notify();System.out.println("notify之后");}

输出结果:

1.3 notifyAll()⽅法 

 notify⽅法只是唤醒某⼀个等待线程. 使⽤notifyAll⽅法可以⼀次唤醒所有的等待线程.

范例:使⽤notifyAll()⽅法唤醒所有等待线程, 在上⾯的代码基础上做出修改.  

public static Object locker = new Object();public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() ->  {synchronized(locker) {try {System.out.println("t1之前");locker.wait();//让线程进入等待System.out.println("t1之后");} catch (InterruptedException e) {e.printStackTrace();}}});Thread t2 = new Thread(() ->  {synchronized(locker) {try {System.out.println("t2之前");locker.wait();//让线程进入等待System.out.println("t2之后");} catch (InterruptedException e) {e.printStackTrace();}}});Thread t3 = new Thread(() ->  {synchronized(locker) {try {System.out.println("t3之前");locker.wait();//让线程进入等待System.out.println("t3之后");} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();t2.start();t3.start();Thread.sleep(1000);synchronized (locker) {System.out.println("notifyAll之前");locker.notifyAll();System.out.println("notifyAll之后");}}

输出结果:

wiat方法和notify方法总结:

 1.4 wait 和 sleep 的对⽐(⾯试题)

唯⼀的相同点就是都可以让线程放弃执⾏⼀段时间.

1. wait 需要搭配 synchronized 使⽤. sleep 不需要.

2.wait 是 Object 的⽅法 sleep 是 Thread 的静态⽅法.

2.多线程案例  

2.1 单例模式

单例模式是校招中最常考的设计模式之⼀.

啥是设计模式?

单例模式能保证某个类在程序中只存在唯⼀⼀份实例, ⽽不会创建出多个实例.

单例模式具体的实现⽅式有很多. 最常⻅的是 "饿汉" 和 "懒汉" 两种.

饿汉模式

类加载的同时, 创建实例.
代码:
class Singleton {private static Singleton instance = new Singleton();private Singleton() {}public static Singleton getInstance() {return instance;}
}

使用的时候SingLeton.getInstance()获取实例

懒汉模式-单线程版

类加载的时候不创建实例. 第⼀次使⽤的时候才创建实例.

代码:

 class Singleton {private static Singleton instance = null;private Singleton() {}//防止使用new出来实例所以设置为私有的public static Singleton getInstance() {if (instance == null) {instance = new Singleton();}return instance;}
}

但是是存在线程安全问题的

懒汉模式-多线程版

上⾯的懒汉模式的实现是线程不安全的.

线程安全问题发⽣在⾸次创建实例时. 如果在多个线程中同时调⽤ getInstance ⽅法, 就可能导致创建 出多个实例.
⼀旦实例已经创建好了, 后⾯再多线程环境调⽤ getInstance 就不再有线程安全问题了(不再修改instance 了)

这时候我们选择给写加锁

 

这个时候,我们选择在加一个if判断

 

这里我们引出了volatile的另一个用法:禁止指令重排序

3.阻塞队列

阻塞队列是什么?
阻塞队列是⼀种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列能是⼀种线程安全的数据结构, 并且具有以下特性:
当队列满的时候, 继续⼊队列就会阻塞, 直到有其他线程从队列中取⾛元素.
当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插⼊元素.

 阻塞队列的⼀个典型应⽤场景就是 "⽣产者消费者模型". 这是⼀种⾮常典型的开发模型.

3.1⽣产者消费者模型  

两大作用:

1.阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒. (削峰填⾕)

 2.阻塞队列也能使⽣产者和消费者之间 解耦.

标准库中的阻塞队列

在 Java 标准库中内置了阻塞队列. 如果我们需要在⼀些程序中使⽤阻塞队列, 直接使⽤标准库中的即可.
  1. BlockingQueue 是⼀个接⼝. 真正实现的类是 LinkedBlockingQueue.
  2. put ⽅法⽤于阻塞式的⼊队列, take ⽤于阻塞式的出队列.
  3. BlockingQueue 也有 offer, poll, peek 等⽅法, 但是这些⽅法不带有阻塞特性.

4个实例让你了解BlokingQueue的特性

实例1: 

BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// ⼊队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞. 
String elem = queue.take();

实例2:

    public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);queue.put(1);queue.put(2);queue.take();queue.take();queue.take();//出第三个的时候因为队列里面没有东西一直在堵塞}

输出结果:

实例3:

    public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(2);queue.put(1);queue.put(2);queue.put(3);}

 输出结果:

实例4:

public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(2);//生产者Thread t1 = new Thread(() -> {for(int i = 0; i <= 100; i++) {try {System.out.println("生产"+ i);queue.put(i);} catch (InterruptedException e) {e.printStackTrace();}}});//消费者Thread t2 = new Thread(() -> {while(true) {try {int value = queue.take();System.out.println("消费" + value);} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();t2.start();}

输出结果:

 ⽣产者消费者模型实现

代码:

public class MyBlockingQueue {private String[] data = null;private volatile int head = 0;private volatile int tail = 0;private volatile int size = 0;public MyBlockingQueue(int capacity) {data = new String[capacity];}public void put(String s) throws InterruptedException {synchronized (this) {//满队列if(size == data.length) {this.wait();}data[tail] = s;tail++;if(tail == data.length) {tail = 0;}size++;this.notify();}}public String take() throws InterruptedException {String ret = "";synchronized (this) {if(size == 0) {this.wait();}ret = data[head];head++;if(head >= data.length) {head = 0;}size--;this.notify();return ret;}}
}

解释为什么可以使用相同的锁:

但是还有一个问题:

 怎么解决:

修改后的代码:

public class MyBlockingQueue {private String[] data = null;private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {data = new String[capacity];}public void put(String s) throws InterruptedException {synchronized (this) {//满队列while(size == data.length) {this.wait();}data[tail] = s;tail++;if(tail == data.length) {tail = 0;}size++;this.notify();}}public String take() throws InterruptedException {String ret = "";synchronized (this) {while(size == 0) {this.wait();}ret = data[head];head++;if(head >= data.length) {head = 0;}size--;this.notify();return ret;}}
}

4.线程池

纸面意思就是用来装线程的池。

 线程池最⼤的好处就是减少每次启动、销毁线程的损耗。

比如说:

标准库中的线程池
使⽤ Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池.
-
返回值类型为 ExecutorService
-
通过 ExecutorService.submit 可以注册⼀个任务到线程池中.
代码:
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {@Overridepublic void run() {System.out.println("hello");}
});
Executors 创建线程池的⼏种⽅式
  1. newFixedThreadPool: 创建固定线程数的线程池
  2. newCachedThreadPool: 创建线程数⽬动态增⻓的线程池.
  3. newSingleThreadExecutor: 创建只包含单个线程的线程池.
  4. newScheduledThreadPool: 设定 延迟时间后执⾏命令,或者定期执⾏命令. 是进阶版的 Timer. Executors 本质上是 ThreadPoolExecutor 类的封装.

 ThreadPoolExecutor 提供了更多的可选参数, 可以进⼀步细化线程池⾏为的设定.

这里我们学习最后一个,学会最后一个其他的都不成问题。

 

参数解释:

 

 4.1线程池的使用

 

全部代码:

public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(4);for (int i = 0; i < 100; i++) {int id = i;service.submit(new Runnable() {@Overridepublic void run() {Thread t = Thread.currentThread();//获取当前线程System.out.println("Runnable" + id + "  " + t.getName());}});}service.shutdown();}

4.2简单模拟实现

public class MyThreadPool {private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);public MyThreadPool(int n) {//创建n个线程for (int i = 0; i < n; i++) {Thread t = new Thread(() ->{while(true) {try {Runnable runnable = queue.take();//执行任务runnable.run();} catch (InterruptedException e) {e.printStackTrace();}}});t.start();}}public  void submit(Runnable runnable) {try {queue.put(runnable);//添加任务} catch (InterruptedException e) {e.printStackTrace();}}
}
class Demo4 {public static void main(String[] args) {MyThreadPool pool = new MyThreadPool(4);for (int i = 0; i < 1000; i++) {int id = i;pool.submit(() -> {Thread t = Thread.currentThread();//获取当前线程System.out.println("Runnable" + id + "  " + t.getName());});}}
}

 输出效果:

5.定时器

定时器的构成
  1. ⼀个带优先级队列(不要使⽤ PriorityBlockingQueue, 容易死锁!)
  2. 队列中的每个元素是⼀个 Task 对象.
  3. Task 中带有⼀个时间属性, 队⾸元素就是即将要执⾏的任务
  4. 同时有⼀个 worker 线程⼀直扫描队⾸元素, 看队⾸元素是否需要执⾏

基本使用:

public static void main(String[] args) {Timer timer = new Timer();System.out.println("开始执行");timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("执行结束");}}, 3000);//经过3s后面才执行run里面的内容}

 基本模拟代码:

import java.util.PriorityQueue;class MyTimerTask implements Comparable<MyTimerTask> {private Runnable runnable;private long time;//表示什么时候执行public MyTimerTask(Runnable runnable, Long delay) {this.runnable = runnable;this.time = System.currentTimeMillis() + delay;}public void run() {runnable.run();}public long getTime() {return time;}@Overridepublic int compareTo(MyTimerTask o) {return (int) (this.time - o.time);}
}class MyTimer {Object locker = new Object();private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();public MyTimer() {Thread t = new Thread(() -> {try {while (true) {synchronized (locker) {if (queue.isEmpty()) {continue;}MyTimerTask current = queue.peek();if (System.currentTimeMillis() >= current.getTime()) {current.run();queue.poll();//执行完删除} else {continue;}}}} catch (InterruptedException e) {e.printStackTrace();}});t.start();}public void schedule(Runnable runnable, long delay) {synchronized (locker) {MyTimerTask myTimerTask = new MyTimerTask(runnable, delay);queue.offer(myTimerTask);locker.notify();}}
}public class Demo6 {public static void main(String[] args) {MyTimer myTimer = new MyTimer();myTimer.schedule(() -> {System.out.println("hello3");}, 300);myTimer.schedule(() -> {System.out.println("hello2");}, 200);}
}

但是一些问题:

还有:

 最终代码:

import java.util.PriorityQueue;class MyTimerTask implements Comparable<MyTimerTask> {private Runnable runnable;private long time;//表示什么时候执行public MyTimerTask(Runnable runnable, Long delay) {this.runnable = runnable;this.time = System.currentTimeMillis() + delay;}public void run() {runnable.run();}public long getTime() {return time;}@Overridepublic int compareTo(MyTimerTask o) {return (int) (this.time - o.time);}
}class MyTimer {Object locker = new Object();private PriorityQueue<MyTimerTask> queue = new PriorityQueue<>();public MyTimer() {Thread t = new Thread(() -> {try {while (true) {synchronized (locker) {if (queue.isEmpty()) {locker.wait();}MyTimerTask current = queue.peek();if (System.currentTimeMillis() >= current.getTime()) {current.run();queue.poll();//执行完删除} else {//还有多久时间到执行堆头locker.wait(current.getTime() - System.currentTimeMillis());//Thread.sleep(current.getTime() - System.currentTimeMillis());//固定时间,如果在这一段时间添加时间更短的任务,就出bug了}}}} catch (InterruptedException e) {e.printStackTrace();}});t.start();}public void schedule(Runnable runnable, long delay) {synchronized (locker) {MyTimerTask myTimerTask = new MyTimerTask(runnable, delay);queue.offer(myTimerTask);locker.notify();}}
}

好了今天就到这里了。


http://www.ppmy.cn/devtools/123881.html

相关文章

4.STM32-中断

STM32-中断 需求&#xff1a;红灯每两秒进行闪烁&#xff0c;按键key1控制绿灯亮灭 简单的程序代码无法满足要求 如何让STM32既能执行HAL_DELAY这种耗时的任务&#xff0c;同时又能快速响应按键按下这种突发情况呢 设置中断步骤 1.接入中断 将KEY1输入模式由原先的GPIO_In…

实验 | 使用本地大模型从论文PDF中提取结构化信息

非结构文本、图片、视频等数据是待挖掘的数据矿藏&#xff0c; 在经管、社科等研究领域中谁拥有了_从非结构提取结构化信息的能力_&#xff0c;谁就拥有科研上的数据优势。正则表达式是一种强大的文档解析工具&#xff0c;但它们常常难以应对现实世界文档的复杂性和多变性。而随…

对于JS脚本加标签功能的一些小理解

在JS中加标签&#xff0c;最主要的应用场景就是结合循环代码使用。用标签标识循环或者代码块&#xff0c;以便使用break 和 continue语句来结束循环。个人觉得标签加循环的本质作用是为了增加性能&#xff0c;减少运行代码行&#xff0c;以便提速。示例如下&#xff1a; 打印输…

kotlin 委托

一、类委托 interface DB{fun insert() } class SqliteDB : DB {override fun insert() {println(" SqliteDB insert")} }class MySql : DB{override fun insert() {println(" MySql insert")} }class OracleDB : DB{override fun insert() {println(&quo…

C++面试速通宝典——16

268. 进程之间的通信方式有哪些&#xff1f; 管道&#xff08;Pipe&#xff09;信号&#xff08;Signal&#xff09;消息队列&#xff08;Message Queue&#xff09;共享内存&#xff08;Shared Memory&#xff09;信号量&#xff08;Semaphore&#xff09;套接字&#xff08;…

css优化的方法

CSS优化的方法多种多样&#xff0c;旨在提高网页的加载速度、渲染性能和可维护性。以下是一些常见的CSS优化方法&#xff1a; 一、减少文件体积和请求次数 压缩CSS&#xff1a;使用CSS压缩工具去除不必要的空格、注释和换行&#xff0c;可以显著减少CSS文件的体积。合并CSS文…

新版 Notepad++ 下载与安装教程

一、软件准备&#xff1a;麻烦点我 二、双击下载好的 notepad 软件进行安装&#xff0c;选择 “简体中文”。 三、默认 “下一步” 安装。 四、单击 “我接受” 按钮。 五、自定义安装位置&#xff0c;个人建议安装在 D 盘。 六、选择组件&#xff0c;默认 “下一步”。 七、勾…

SpringBoot开发——SpringSecurity安全框架17个业务场景案例(三)

文章目录 一、Spring Security 常用应用场景介绍二、Spring Security场景案例12 表达式支持(Expression-Based)12.1 Spring Security 配置12.2 业务逻辑代码12.3 控制器13、安全上下文(Security Context)13.1 Spring Security 配置13.2 业务逻辑代码13.3 控制器14、安全过滤…