使用rust写一个Web服务器——多线程版本

devtools/2024/10/18 13:27:06/

文章目录

    • 模拟慢请求
    • 多线程Web服务器实现
      • 为每个请求单独生成一个线程
      • 限制创建线程的数量
      • ThreadPool的初始化
      • ThreadPool的存储
      • ThreadPool的设计
    • 关闭和资源清理
      • 为ThreadPool实现Drop
      • 停止工作线程
      • 测试

仓库地址: 1037827920/web-server: 使用rust编写的简单web服务器 (github.com)

模拟慢请求

一个单线程版本的web服务器只能一次处理一个请求,可是如果一个请求持续的时间太长,就会导致其他请求有可能饥饿,下面使用sleep方式让每次请求持续5s,模拟真实的慢请求:

rust">use std::{fs,io::{prelude::*, BufReader},net::{TcpListener, TcpStream},thread,time::Duration,
};fn main() {// 监听本地8080端口let listener = TcpListener::bind("localhost:8080").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();// 处理连接handle_connection(stream);}
}fn handle_connection(mut stream: TcpStream) {let buf_reader = BufReader::new(&mut stream);// 使用next而不是lines,因为我们只需要读取第一行,判断具体的request方法let request_line = buf_reader.lines().next().unwrap().unwrap();// match方法不会像之前的方法那样自动做引用或解引用,因此我们需要显式调用let (status_line, filename) = match &request_line[..] {"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), // 请求 / 资源"GET /sleep HTTP/1.1" => { // 请求 /sleep 资源thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK", "hello.html")}_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),};// 读取文件内容let contents = fs::read_to_string(filename).unwrap();let length = contents.len();// 格式化HTTP Responselet response =format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");// 将response写入streamstream.write_all(response.as_bytes()).unwrap();
}

运行代码后访问localhost:8080/sleep,然后紧接着继续运行localhost:8080,会发现后者的请求必须等待前者完成后才能被处理,下面使用线程池改善吞吐量

多线程Web服务器实现

线程池: 包含一组已经生成的线程,它们时刻等待着接收并处理新的任务,当程序接收到新任务时,它会将线程池中的一个线程指派给该任务,在该线程忙着处理时,新来的任务交给池中剩余的线程进行处理,最终,当执行任务的线程处理完后,它会被重新放入到线程池中,准备处理新任务。注意: 需要限制线程池中的线程数量,以保护服务器免受拒绝服务攻击(DoS)的影响:如果针对每个请求创建一个新线程,那么一个人向我们的服务器发出1000万个请求,会直接耗尽资源,导致后续用户的请求无法被处理,这也是拒绝服务名称的来源。

因此,需要对线程池进行一定的架构设计,首先是设定最大线程数的上限,其次是维护一个请求队列。池中的线程去队列中依次弹出请求并处理。

为每个请求单独生成一个线程

修改main函数,每次处理一个任务就创建一个新的线程并执行任务

rust">use std::{fs,io::{prelude::*, BufReader},net::{TcpListener, TcpStream},thread,time::Duration,
};fn main() {let listener = TcpListener::bind("localhost:8080").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();thread::spawn(|| {handle_connection(stream);});}
}fn handle_connection(mut stream: TcpStream) {let buf_reader = BufReader::new(&mut stream);// 使用next而不是lines,因为我们只需要读取第一行,判断具体的request方法let request_line = buf_reader.lines().next().unwrap().unwrap();// match方法不会像之前的方法那样自动做引用或解引用,因此我们需要显式调用let (status_line, filename) = match &request_line[..] {"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), // 请求 / 资源"GET /sleep HTTP/1.1" => { // 请求 /sleep 资源thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK", "hello.html")}_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),};// 读取文件内容let contents = fs::read_to_string(filename).unwrap();let length = contents.len();// 格式化HTTP Responselet response =format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");// 将response写入streamstream.write_all(response.as_bytes()).unwrap();
}

这样简单粗暴就能实现多线程的Web服务器,但是这样不能达到限制线程池中线程数的效果

限制创建线程的数量

利用线程池,继续修改main函数

rust">fn main() {let listener = TcpListener::bind("localhost:8080").unwrap();// 首先创建一个包含4个线程的线程池let pool = ThreadPool::new(4);for stream in listener.incoming() {let stream = stream.unwrap();// 分发执行请求pool.execute(|| {handle_connection(stream)});}
}

可以看出,我们至少要实现ThreadPool这个结构体和execute方法

ThreadPool的初始化

首先要确定使用new还是build来初始化ThreadPool实例,new往往用于简单初始化一个实例,而build往往会完成更加复杂的构建工作,我们并不需要在初始化线程池的同时创建相应的线程,因此new更合适。

在src/lib.rs写入以下代码:

rust">pub struct ThreadPool;impl ThreadPool {/// # 函数功能/// 创建一个新的线程池pub fn new(size: usize) -> ThreadPool {assert!(size > 0);ThreadPool}/// # 函数功能/// 执行传入的函数fpub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static{todo!();}
}

在src/main.rs中导入lib.rs的ThreadPool:

rust">use <project_name>::ThreadPool;

ThreadPool的存储

ThreadPool作为一个线程池,肯定是要能够存储线程的对吧,继续修改ThreadPool,添加threads字段,使其能够存储线程

rust">use std::thread::{self, Thread};pub struct ThreadPool {threads: Vec<thread::JoinHandle<()>>,
}impl ThreadPool {/// # 函数功能/// 创建一个新的线程池pub fn new(size: usize) -> ThreadPool {assert!(size > 0);// 使用with_capacity可以提前分配好内存空间,比Vec::new的性能好let mut threads = Vec::with_capacity(size);for _ in 0..size {// 创建线程并将其存储在vector中todo!();}ThreadPool { threads }}/// # 函数功能/// 执行传入的函数fpub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static{todo!();}
}

ThreadPool的设计

使用thread::spawn是生成线程的最好方式,但是它会立即执行传入的任务,我们需要的是创建线程和执行任务是要分离的。也就是说,我们可以先创建线程后这个线程就进入loop循环等待,直到有执行任务的信号过来这个线程才会执行任务。

可以考虑创建一个Worker结构体,存放id和对应的线程。作为ThreadPool和任务线程联系的桥梁,通过channel,ThreadPool持有Sender,通过execute方法将任务发送给Worker,而Worker持有Receiver,在loop循环中接收ThreadPool发送过来的任务。

ThreadPool结构体:

rust">use std::{sync::{mpsc, Arc, Mutex},thread,
};pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}impl ThreadPool {/// # 函数功能/// 创建一个新的线程池pub fn new(size: usize) -> ThreadPool {assert!(size > 0);// 获得Sender和Receiverlet (sender, receiver) = mpsc::channel();// receiver会在多线程中移动,因此要保证线程安全,需要使用Arc和Mutex。Arc可以允许多个Worker同时持有Receiver,而Mutex可以确保一次只有一个Worker能从Receiver中获取任务,防止任务被多次执行let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}    /// # 函数功能/// 执行传入的函数fpub fn execute<F>(&self, f: F) whereF: FnOnce() + Send + 'static{let job = Box::new(f);// Sender往通道中发送任务self.sender.send(job).unwrap();}
}

Worker结构体:

rust">// 闭包的大小编译是未知的,使用Box可以在堆上动态分配内存,从而存储闭包
type Job = Box<dyn FnOnce() + Send + 'static>;struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {// Receiver会阻塞直到有任务let job = receiver.lock().unwrap().recv().unwrap();println!("Workder {id} got a job; executing");// 执行任务job();});// 让每个Worker都拥有自己的唯一idWorker { id, thread }}
}

关闭和资源清理

为ThreadPool实现Drop

当线程池被Drop时,需要等待所有的子线程完成它们的工作,然后再退出:

rust">struct Worker {id: usize,// 因为Worker中的thread字段的JoinHandle类型没有实现copy trait,可以修改Worker的thread字段,使用Option,然后通过take可以拿走内部值的所有权,同时留下一个Nonethread: Option<thread::JoinHandle<()>>,
}
impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv().unwrap();println!("Workder {id} got a job; executing");job();});// 让每个Worker都拥有自己的唯一idWorker { id, thread: Some(thread)}}
}impl Drop for ThreadPool {fn drop(&mut self) {for worker in &mut self.workers {println!("Shuting down worker {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}

停止工作线程

虽然调用了join,但是目标线程依然不会停止,原因在于它们在无限地loop循环等待,需要channel的drop机制:释放sender后,receiver会收到错误,然后再退出

rust">pub struct ThreadPool {workers: Vec<Worker>,// 增加Option封装,这样可以用take拿走所有权sender: Option<mpsc::Sender<Job>>,
}impl ThreadPool {pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender: Some(sender)}}pub fn execute<F>(&self, f: F) whereF: FnOnce() + Send + 'static{let job = Box::new(f);self.sender.as_ref().unwrap().send(job).unwrap();}
}impl Drop for ThreadPool {fn drop(&mut self) {// 主动调用drop关闭senderdrop(self.sender.take());for worker in &mut self.workers {println!("Shuting down worker {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}

当sender被关闭后,将关闭对应的channel,所以loop的receiver就会收到一个错误,根据错误再进一步的错误:

rust">impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let message = receiver.lock().unwrap().recv();match message {Ok(job) => {println!("Worker {id} got a job; executing");job();}Err(_) => {println!("Worker {id} disconnected; shutting down.");break;}}});Worker {id,thread: Some(thread),}}
}

测试

为了验证代码的正确性,修改main:

rust">fn main() {let listener = TcpListener::bind("localhost:8080").unwrap();let pool = ThreadPool::new(4);for stream in listener.incoming().take(2) {let stream = stream.unwrap();pool.execute(|| {handle_connection(stream);});}println!("Shutting down.");
}

http://www.ppmy.cn/devtools/121942.html

相关文章

拓扑排序简介

拓扑排序(Topological Sort)是一种重要的图算法,用于对有向无环图(DAG, Directed Acyclic Graph)中的节点进行排序。拓扑排序的结果是一种线性序列,使得对于图中的任意一条有向边(u, v),顶点u都在顶点v之前。这种排序常用于任务调度、编译器依赖关系分析等领域。 拓…

程序计数器(学习笔记)

程序计数器是一块较小的内存空间&#xff0c;它的作用可以看做是当前线程所执行的字节码的信号指示器&#xff08;偏移地址&#xff09;&#xff0c;Java编译过程中产生的字节码有点类似编译原理的指令&#xff0c;程序计数器的内存空间存储的是当前执行的字节码的偏移地址 因为…

linux网络编程实战

前言 之前找工作的之后写了一些网络编程的笔记和代码&#xff0c;然后现在放到csdn上保存一下。有几个版本的&#xff0c;看看就好。就是简单的实现一下服务端和客户端之间的交互的&#xff0c;还没有我之前上linux编程课写的代码复杂。 哦对了&#xff0c;这个网络编程的代码对…

Debezium日常分享系列之:Debezium 3.0.0.Final发布

Debezium日常分享系列之&#xff1a;Debezium 3.0.0.Final发布 Debezium 核心的变化需要 Java 17基于Kafka 3.8 构建废弃的增量信号字段的删除每个表的详细指标 MariaDB连接器的更改版本 11.4.3 支持 MongoDB连接器的更改MongoDB sink connector MySQL连接器的改变MySQL 9MySQL…

软考系统分析师知识点二:经济管理

前言 今年报考了11月份的软考高级&#xff1a;系统分析师。 考试时间为&#xff1a;11月9日。 倒计时&#xff1a;35天。 目标&#xff1a;优先应试&#xff0c;其次学习&#xff0c;再次实践。 复习计划第一阶段&#xff1a;扫平基础知识点&#xff0c;仅抽取有用信息&am…

企业必备:搭建大模型应用平台实操教程

最近AI智能体很火&#xff0c;AI智能体平台化产品肯定属于大公司的。但在一些场景下&#xff0c;尤其是对业务数据要求很高的公司&#xff0c;那就只能用私有大模型。不一定完全是为了对外提供服务&#xff0c;对内改造工作流也是需要的。所以 我感觉未来大部分企业都会搞一个…

结合大语言模型的机械臂抓取操作简单介绍

一、大语言模型与机械臂抓取的基本操作 1. 大语言模型简介 大语言模型是基于深度学习技术构建的自然语言处理模型&#xff0c;能够生成、理解和处理文本信息。这些模型通过训练大量的文本数据&#xff0c;学习语法、上下文和常识&#xff0c;能够执行多种任务&#xff0c;如文…

<Rust>iced库(0.13.1)学习之部件(二十九):button部件新增方法on_press_with,可传入闭包函数

前言 本专栏是学习Rust的GUI库iced的合集,将介绍iced涉及的各个小部件分别介绍,最后会汇总为一个总的程序。 iced是RustGUI中比较强大的一个,目前处于发展中(即版本可能会改变),本专栏基于版本0.12.1. 注:新版本已更新为0.13 概述 这是本专栏的第二十九篇,在新版本中…