目前 server 会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。
在当前 server 实现中模拟慢请求
让我们看看一个慢请求如何影响当前 server 实现中的其他请求。示例 20-10 通过模拟慢响应实现了 /sleep 请求处理,它会使 server 在响应之前休眠五秒。
文件名: src/main.rs
use std::thread; use std::time::Duration; # use std::io::prelude::*; # use std::net::TcpStream; # use std::fs::File; // --snip-- fn handle_connection(mut stream: TcpStream) { # let mut buffer = [0; 512]; # stream.read(&mut buffer).unwrap(); // --snip-- let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; // --snip-- }
这段代码有些凌乱,不过对于模拟的目的来说已经足够。这里创建了第二个请求 sleep
,我们会识别其数据。在 if
块之后增加了一个 else if
来检查 /sleep 请求,当接收到这个请求时,在渲染成功 HTML 页面之前会先休眠五秒。
现在就可以真切的看出我们的 server 有多么的原始;真实的库将会以更简洁的方式处理多请求识别问题!
使用 cargo run
启动 server,并接着打开两个浏览器窗口:一个请求 http://127.0.0.1:7878/ 而另一个请求 http://127.0.0.1:7878/sleep。如果像之前一样多次请求 /*,会发现响应的比较快速。不过如果请求 */sleep 之后在请求 /*,就会看到 */ 会等待直到 sleep
休眠完五秒之后才出现。
这里有多种办法来改变我们的 web server 使其避免所有请求都排在慢请求之后;我们将要实现的一个便是线程池。
使用线程池改善吞吐量
线程池(thread pool)是一组预先分配的等待或准备处理任务的线程。当程序收到一个新任务,线程池中的一个线程会被分配任务,这个线程会离开并处理任务。其余的线程则可用于处理在第一个线程处理任务的同时处理其他接收到的任务。当第一个线程处理完任务时,它会返回空闲线程池中等待处理新任务。线程池允许我们并发处理连接,增加 server 的吞吐量。
我们会将池中线程限制为较少的数量,以防拒绝服务(Denial of Service, DoS)攻击;如果程序为每一个接收的请求都新建一个线程,某人向 server 发起千万级的请求时会耗尽服务器的资源并导致所有请求的处理都被终止。
不同于分配无限的线程,线程池中将有固定数量的等待线程。当新进请求时,将请求发送到线程池中做处理。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向对队列索取另一个请求。通过这种设计,则可以并发处理 N
个请求,其中 N
为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前增加了能处理的慢请求的数量。
这个设计仅仅是多种改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和单线程异步 I/O 模型。如果你对这个主题感兴趣,则可以阅读更多关于其他解决方案的内容并尝试用 Rust 实现他们;对于一个像 Rust 这样的底层语言,所有这些方法都是可能的。
在开始之前,让我们讨论一下线程池应用看起来怎样。当尝试设计代码时,首先编写客户端接口确实有助于指导代码设计。以期望的调用方式来构建 API 代码的结构,接着在这个结构之内实现功能,而不是先实现功能再设计公有 API。
类似于第十二章项目中使用的测试驱动开发。这里将要使用编译器驱动开发(compiler-driven development)。我们将编写调用所期望的函数的代码,接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。
为每一个请求分配线程的代码结构
首先,让我们探索一下为每一个连接都创建一个线程的代码看起来如何。这并不是最终方案,因为正如之前讲到的它会潜在的分配无限的线程,不过这是一个开始。示例 20-11 展示了 main
的改变,它在 for
循环中为每一个流分配了一个新线程进行处理:
文件名: src/main.rs
# use std::thread; # use std::io::prelude::*; # use std::net::TcpListener; # use std::net::TcpStream; # fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } # fn handle_connection(mut stream: TcpStream) {}
正如第十六章讲到的,thread::spawn
会创建一个新线程并在其中运行闭包中的代码。如果运行这段代码并在在浏览器中加载 /sleep,接着在另两个浏览器标签页中加载 /*,确实会发现 */ 请求不必等待 /sleep 结束。不过正如之前提到的,这最终会使系统崩溃因为我们无限制的创建新线程。
为有限数量的线程创建一个类似的接口
我们期望线程池以类似且熟悉的方式工作,以便从线程切换到线程池并不会对使用该 API 的代码做出较大的修改。示例 20-12 展示我们希望用来替换 thread::spawn
的 ThreadPool
结构体的假想接口:
文件名: src/main.rs
# use std::thread; # use std::io::prelude::*; # use std::net::TcpListener; # use std::net::TcpStream; # struct ThreadPool; # impl ThreadPool { # fn new(size: u32) -> ThreadPool { ThreadPool } # fn execute<F>(&self, f: F) # where F: FnOnce() + Send + 'static {} # } # fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } } # fn handle_connection(mut stream: TcpStream) {}
这里使用 ThreadPool::new
来创建一个新的线程池,它有一个可配置的线程数的参数,在这里是四。这样在 for
循环中,pool.execute
有着类似 thread::spawn
的接口,它获取一个线程池运行于每一个流的闭包。pool.execute
需要实现为获取闭包并传递给池中的线程运行。这段代码还不能编译,不过通过尝试编译器会指导我们如何修复它。
采用编译器驱动构建 ThreadPool
结构体
继续并对示例 20-12 中的 src/main.rs 做出修改,并利用来自 cargo check
的编译器错误来驱动开发。下面是我们得到的第一个错误:
$ cargo check Compiling hello v0.1.0 (file:///projects/hello) error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool` --> src\main.rs:10:16 | 10 | let pool = ThreadPool::new(4); | ^^^^^^^^^^^^^^^ Use of undeclared type or module `ThreadPool` error: aborting due to previous error
好的,这告诉我们需要一个 ThreadPool
类型或模块,所以我们将构建一个。ThreadPool
的实现会与 web server 的特定工作相独立,所以让我们从 hello
crate 切换到存放 ThreadPool
实现的新库 crate。这也意味着可以在任何工作中使用这个单独的线程池库,而不仅仅是处理网络请求。
创建 src/lib.rs 文件,它包含了目前可用的最简单的 ThreadPool
定义:
文件名: src/lib.rs
pub struct ThreadPool;
接着创建一个新目录,src/bin,并将二进制 crate 根文件从 src/main.rs 移动到 src/bin/main.rs。这使得库 crate 成为 hello 目录的主要 crate;不过仍然可以使用 cargo run
运行 src/bin/main.rs 二进制文件。移动了 main.rs 文件之后,修改 src/bin/main.rs 文件开头加入如下代码来引入库 crate 并将 ThreadPool
引入作用域:
文件名: src/bin/main.rs
use hello::ThreadPool;
这仍然不能工作,再次尝试运行来得到下一个需要解决的错误:
$ cargo check Compiling hello v0.1.0 (file:///projects/hello) error[E0599]: no function or associated item named `new` found for type `hello::ThreadPool` in the current scope --> src/bin/main.rs:13:16 | 13 | let pool = ThreadPool::new(4); | ^^^^^^^^^^^^^^^ function or associated item not found in `hello::ThreadPool`
这告诉我们下一步是为 ThreadPool
创建一个叫做 new
的关联函数。我们还知道 new
需要有一个参数可以接受 4
,而且 new
应该返回 ThreadPool
实例。让我们实现拥有此特征的最小化 new
函数:
文件夹: src/lib.rs
pub struct ThreadPool; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } }
这里选择 usize
作为 size
参数的类型,因为我们知道为负的线程数没有意义。我们还知道将使用 4 作为线程集合的元素数量,这也就是使用 usize
类型的原因,如第三章 “整数类型” 部分所讲。
再次编译检查这段代码:
$ cargo check Compiling hello v0.1.0 (file:///projects/hello) warning: unused variable: `size` --> src/lib.rs:4:16 | 4 | pub fn new(size: usize) -> ThreadPool { | ^^^^ | = note: #[warn(unused_variables)] on by default = note: to avoid this warning, consider using `_size` instead error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope --> src/bin/main.rs:18:14 | 18 | pool.execute(|| { | ^^^^^^^
现在有了一个警告和一个错误。暂时先忽略警告,发生错误是因为并没有 ThreadPool
上的 execute
方法。回忆 “为有限数量的线程创建一个类似的接口” 部分我们决定线程池应该有与 thread::spawn
类似的接口,同时我们将实现 execute
函数来获取传递的闭包并将其传递给池中的空闲线程执行。
我们会在 ThreadPool
上定义 execute
函数来获取一个闭包参数。回忆第十三章的 “使用带有泛型和 Fn
trait 的闭包” 部分,闭包作为参数时可以使用三个不同的 trait:Fn
、FnMut
和 FnOnce
。我们需要决定这里应该使用哪种闭包。最终需要实现的类似于标准库的 thread::spawn
,所以我们可以观察 thread::spawn
的签名在其参数中使用了何种 bound。查看文档会发现:
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T + Send + 'static, T: Send + 'static
F
是这里我们关心的参数;T
与返回值有关所以我们并不关心。考虑到 spawn
使用 FnOnce
作为 F
的 trait bound,这可能也是我们需要的,因为最终会将传递给 execute
的参数传给 spawn
。因为处理请求的线程只会执行闭包一次,这也进一步确认了 FnOnce
是我们需要的 trait,这里符合 FnOnce
中 Once
的意思。
F
还有 trait bound Send
和生命周期绑定 'static
,这对我们的情况也是有意义的:需要 Send
来将闭包从一个线程转移到另一个线程,而 'static
是因为并不知道线程会执行多久。让我们编写一个使用带有这些 bound 的泛型参数 F
的 ThreadPool
的 execute
方法:
文件名: src/lib.rs
# pub struct ThreadPool; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { } }
FnOnce
trait 仍然需要之后的 ()
,因为这里的 FnOnce
代表一个没有参数也没有返回值的闭包。正如函数的定义,返回值类型可以从签名中省略,不过即便没有参数也需要括号。
这里再一次增加了 execute
方法的最小化实现:它没有做任何工作,只是尝试让代码能够编译。再次进行检查:
$ cargo check Compiling hello v0.1.0 (file:///projects/hello) warning: unused variable: `size` --> src/lib.rs:4:16 | 4 | pub fn new(size: usize) -> ThreadPool { | ^^^^ | = note: #[warn(unused_variables)] on by default = note: to avoid this warning, consider using `_size` instead warning: unused variable: `f` --> src/lib.rs:8:30 | 8 | pub fn execute<F>(&self, f: F) | ^ | = note: to avoid this warning, consider using `_f` instead
现在就只有警告了!这意味着能够编译了!注意如果尝试 cargo run
运行程序并在浏览器中发起请求,仍会在浏览器中出现在本章开始时那样的错误。这个库实际上还没有调用传递给 execute
的闭包!
一个你可能听说过的关于像 Haskell 和 Rust 这样有严格编译器的语言的说法是 “如果代码能够编译,它就能工作”。这是一个提醒大家的好时机,实际上这并不是普适的。我们的项目可以编译,不过它完全没有做任何工作!如果构建一个真实且功能完整的项目,则需花费大量的时间来开始编写单元测试来检查代码能否编译 并且 拥有期望的行为。
在 new
中验证池中线程数量
这里仍然存在警告是因为其并没有对 new
和 execute
的参数做任何操作。让我们用期望的行为来实现这些函数。以考虑 new
作为开始。之前选择使用无符号类型作为 size
参数的类型,因为线程数为负的线程池没有意义。然而,线程数为零的线程池同样没有意义,不过零是一个完全有效的 u32
值。让我们增加在返回 ThreadPool
实例之前检查 size
是否大于零的代码,并使用 assert!
宏在得到零时 panic,如示例 20-13 所示:
文件名: src/lib.rs
# pub struct ThreadPool; impl ThreadPool { /// 创建线程池。 /// /// 线程池中线程的数量。 /// /// # Panics /// /// `new` 函数在 size 为 0 时会 panic。 pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool } // --snip-- }
这里用文档注释为 ThreadPool
增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分来提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 cargo doc --open
并点击 ThreadPool
结构体来查看生成的 new
的文档看起来如何!
相比像这里使用 assert!
宏,也可以让 new
像之前 I/O 项目中示例 12-9 中 Config::new
那样返回一个 Result
,不过在这里我们选择创建一个没有任何线程的线程池应该是不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的 new
版本来感受一下两者的区别:
pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
分配空间以储存线程
现在有了一个有效的线程池线程数,就可以实际创建这些线程并在返回之前将他们储存在 ThreadPool
结构体中。不过如何 “储存” 一个线程?让我们再看看 thread::spawn
的签名:
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T + Send + 'static, T: Send + 'static
spawn
返回 JoinHandle<T>
,其中 T
是闭包返回的类型。尝试使用 JoinHandle
来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 T
将会是单元类型 ()
。
示例 20-14 中的代码可以编译,不过实际上还并没有创建任何线程。我们改变了 ThreadPool
的定义来存放一个 thread::JoinHandle<()>
的 vector 实例,使用 size
容量来初始化,并设置一个 for
循环了来运行创建线程的代码,并返回包含这些线程的 ThreadPool
实例:
文件名: src/lib.rs
use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // create some threads and store them in the vector } ThreadPool { threads } } // --snip-- }
这里将 std::thread
引入库 crate 的作用域,因为使用了 thread::JoinHandle
作为 ThreadPool
中 vector 元素的类型。
在得到了有效的数量之后,ThreadPool
新建一个存放 size
个元素的 vector。本书还未使用过 with_capacity
,它与 Vec::new
做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 size
个元素,预先进行分配比仅仅 Vec::new
要稍微有效率一些,因为 Vec::new
随着插入元素而重新改变大小。
如果再次运行 cargo check
,会看到一些警告,不过应该可以编译成功。
Worker
结构体负责从 ThreadPool
中将代码传递给线程
示例 20-14 的 for
循环中留下了一个关于创建线程的注释。如何实际创建线程呢?这是一个难题。标准库提供的创建线程的方法,thread::spawn
,它期望获取一些一旦创建线程就应该执行的代码。然而,我们希望开始线程并使其等待稍后传递的代码。标准库的线程实现并没有包含这么做的方法;我们必须自己实现。
我们将要实现的行为是创建线程并稍后发送代码,这会在 ThreadPool
和线程间引入一个新数据类型来管理这种新行为。这个数据结构称为 Worker
:这是一个池实现中的常见概念。想象一下在餐馆厨房工作的员工:员工等待来自客户的订单,他们负责接受这些订单并完成它们。
不同于在线程池中储存一个 JoinHandle<()>
实例的 vector,我们会储存 Worker
结构体的实例。每一个 Worker
会储存一个单独的 JoinHandle<()>
实例。接着会在 Worker
上实现一个方法,它会获取需要允许代码的闭包并将其发送给已经运行的线程执行。我们还会赋予每一个 worker id
,这样就可以在日志和调试中区别线程池中的不同 worker。
首先,让我们做出如此创建 ThreadPool
时所需的修改。在通过如下方式设置完 Worker
之后,我们会实现向线程发送闭包的代码:
- 定义
Worker
结构体存放id
和JoinHandle<()>
- 修改
ThreadPool
存放一个Worker
实例的 vector - 定义
Worker::new
函数,它获取一个id
数字并返回一个带有id
和用空闭包分配的线程的Worker
实例 - 在
ThreadPool::new
中,使用for
循环计数生成id
,使用这个id
新建Worker
,并储存进 vector 中
如果你渴望挑战,在查示例 20-15 中的代码之前尝试自己实现这些修改。
准备好了吗?示例 20-15 就是一个做出了这些修改的例子:
文件名: src/lib.rs
use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --snip-- } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread, } } }
这里将 ThreadPool
中字段名从 threads
改为 workers
,因为它现在储存 Worker
而不是 JoinHandle<()>
。使用 for
循环中的计数作为 Worker::new
的参数,并将每一个新建的 Worker
储存在叫做 workers
的 vector 中。
Worker
结构体和其 new
函数是私有的,因为外部代码(比如 src/bin/main.rs 中的 server)并不需要知道关于 ThreadPool
中使用 Worker
结构体的实现细节。Worker::new
函数使用 id
参数并储存了使用一个空闭包创建的 JoinHandle<()>
。
这段代码能够编译并用指定给 ThreadPool::new
的参数创建储存了一系列的 Worker
实例,不过 仍然 没有处理 execute
中得到的闭包。让我们聊聊接下来怎么做。
使用通道向线程发送请求
下一个需要解决的问题是传递给 thread::spawn
的闭包完全没有做任何工作。目前,我们在 execute
方法中获得期望执行的闭包,不过在创建 ThreadPool
的过程中创建每一个 Worker
时需要向 thread::spawn
传递一个闭包。
我们希望刚创建的 Worker
结构体能够从 ThreadPool
的队列中获取需要执行的代码,并发送到线程中执行他们。
在第十六章,我们学习了 通道 —— 一个沟通两个线程的简单手段 —— 对于这个例子来说则是绝佳的。这里通道将充当任务队列的作用,execute
将通过 ThreadPool
向其中线程正在寻找工作的 Worker
实例发送任务。如下是这个计划:
-
ThreadPool
会创建一个通道并充当发送端。 - 每个
Worker
将会充当通道的接收端。 - 新建一个
Job
结构体来存放用于向通道中发送的闭包。 -
execute
方法会在通道发送端发出期望执行的任务。 - 在线程中,
Worker
会遍历通道的接收端并执行任何接收到的任务。
让我们以在 ThreadPool::new
中创建通道并让 ThreadPool
实例充当发送端开始,如示例 20-16 所示。Job
是将在通道中发出的类型,目前它是一个没有任何内容的结构体:
文件名: src/lib.rs
# use std::thread; // --snip-- use std::sync::mpsc; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender, } } // --snip-- } # # struct Worker { # id: usize, # thread: thread::JoinHandle<()>, # } # # impl Worker { # fn new(id: usize) -> Worker { # let thread = thread::spawn(|| {}); # # Worker { # id, # thread, # } # } # }
在 ThreadPool::new
中,新建了一个通道,并接着让线程池在接收端等待。这段代码能够编译,不过仍有警告。
让我们尝试在线程池创建每个 worker 时将通道的接收端传递给他们。须知我们希望在 worker 所分配的线程中使用通道的接收端,所以将在闭包中引用 receiver
参数。示例 20-17 中展示的代码还不能编译:
文件名: src/lib.rs
impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender, } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread, } } }
这是一些小而直观的修改:将通道的接收端传递进了 Worker::new
,并接着在闭包中使用它。
如果尝试 check 代码,会得到这个错误:
$ cargo check Compiling hello v0.1.0 (file:///projects/hello) error[E0382]: use of moved value: `receiver` --> src/lib.rs:27:42 | 27 | workers.push(Worker::new(id, receiver)); | ^^^^^^^^ value moved here in previous iteration of loop | = note: move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
这段代码尝试将 receiver
传递给多个 Worker
实例。这是不行的,回忆第十六章:Rust 所提供的通道实现是多 生产者,单 消费者 的。这意味着不能简单的克隆通道的消费端来解决问题。即便可以,那也不是我们希望使用的技术;我们希望通过在所有的 worker 中共享单一 receiver
,在线程间分发任务。
另外,从通道队列中取出任务涉及到修改 receiver
,所以这些线程需要一个能安全的共享和修改 receiver
的方式,否则可能导致竞争状态(参考第十六章)。
回忆一下第十六章讨论的线程安全智能指针,为了在多个线程间共享所有权并允许线程修改其值,需要使用 Arc<Mutex<T>>
。Arc
使得多个 worker 拥有接收端,而 Mutex
则确保一次只有一个 worker 能从接收端得到任务。示例 20-18 展示了所需的修改:
文件名: src/lib.rs
# use std::thread; # use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; // --snip-- # pub struct ThreadPool { # workers: Vec<Worker>, # sender: mpsc::Sender<Job>, # } # struct Job; # impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender, } } // --snip-- } # struct Worker { # id: usize, # thread: thread::JoinHandle<()>, # } # impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- # let thread = thread::spawn(|| { # receiver; # }); # # Worker { # id, # thread, # } } }
在 ThreadPool::new
中,将通道的接收端放入一个 Arc
和一个 Mutex
中。对于每一个新 worker,克隆 Arc
来增加引用计数,如此这些 worker 就可以共享接收端的所有权了。
通过这些修改,代码可以编译了!我们做到了!
实现 execute
方法
最后让我们实现 ThreadPool
上的 execute
方法。同时也要修改 Job
结构体:它将不再是结构体,Job
将是一个有着 execute
接收到的闭包类型的 trait 对象的类型别名。第十九章 “类型别名用来创建类型同义词” 部分提到过,类型别名允许将长的类型变短。观察示例 20-19:
文件名: src/lib.rs
// --snip-- # pub struct ThreadPool { # workers: Vec<Worker>, # sender: mpsc::Sender<Job>, # } # use std::sync::mpsc; # struct Worker {} type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(job).unwrap(); } } // --snip--
在使用 execute
得到的闭包新建 Job
实例之后,将这些任务从通道的发送端发出。这里调用 send
上的 unwrap
,因为发送可能会失败,这可能发生于例如停止了所有线程执行的情况,这意味着接收端停止接收新消息了。不过目前我们无法停止线程执行;只要线程池存在他们就会一直执行。使用 unwrap
是因为我们知道失败不可能发生,即便编译器不这么认为。
不过到此事情还没有结束!在 worker 中,传递给 thread::spawn
的闭包仍然还只是 引用 了通道的接收端。相反我们需要闭包一直循环,向通道的接收端请求任务,并在得到任务时执行他们。如示例 20-20 对 Worker::new
做出修改:
文件名: src/lib.rs
// --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); (*job)(); } }); Worker { id, thread, } } }
这里,首先在 receiver
上调用了 lock
来获取互斥器,接着 unwrap
在出现任何错误时 panic。如果互斥器处于一种叫做 被污染(poisoned)的状态时获取锁可能会失败,这可能发生于其他线程在持有锁时 panic 了且没有释放锁。在这种情况下,调用 unwrap
使其 panic 是正确的行为。请随意将 unwrap
改为包含有意义错误信息的 expect
。
如果锁定了互斥器,接着调用 recv
从通道中接收 Job
。最后的 unwrap
也绕过了一些错误,这可能发生于持有通道发送端的线程停止的情况,类似于如果接收端关闭时 send
方法如何返回 Err
一样。
调用 recv
会阻塞当前线程,所以如果还没有任务,其会等待直到有可用的任务。Mutex<T>
确保一次只有一个 Worker
线程尝试请求任务。
理论上这段代码应该能够编译。不幸的是,Rust 编译器仍不够完美,会给出如下错误:
error[E0161]: cannot move a value of type std::ops::FnOnce() + std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be statically determined --> src/lib.rs:63:17 | 63 | (*job)(); | ^^^^^^
这个错误非常的神秘,因为这个问题本身就很神秘。为了调用储存在 Box<T>
(这正是 Job
别名的类型)中的 FnOnce
闭包,该闭包需要能将自己移动 出 Box<T>
,因为当调用这个闭包时,它获取 self
的所有权。通常来说,将值移动出 Box<T>
是不被允许的,因为 Rust 不知道 Box<T>
中的值将会有多大;回忆第十五章能够正常使用 Box<T>
是因为我们将未知大小的值储存进 Box<T>
从而得到已知大小的值。
第十七章曾见过,示例 17-15 中有使用了 self: Box<Self>
语法的方法,它允许方法获取储存在 Box<T>
中的 Self
值的所有权。这正是我们希望做的,然而不幸的是 Rust 不允许我们这么做:Rust 当闭包被调用时行为的那部分并没有使用 self: Box<Self>
实现。所以这里 Rust 也不知道它可以使用 self: Box<Self>
来获取闭包的所有权并将闭包移动出 Box<T>
。
Rust 仍在努力改进提升编译器的过程中,不过将来示例 20-20 中的代码应该能够正常工作。有很多像你一样的人正在修复这个以及其他问题!当你结束了本书的阅读,我们希望看到你也成为他们中的一员。
不过目前让我们通过一个小技巧来绕过这个问题。可以显式的告诉 Rust 在这里我们可以使用 self: Box<Self>
来获取 Box<T>
中值的所有权,而一旦获取了闭包的所有权就可以调用它了。这涉及到定义一个新 trait,它带有一个在签名中使用 self: Box<Self>
的方法 call_box
,为任何实现了 FnOnce()
的类型定义这个 trait,修改类型别名来使用这个新 trait,并修改 Worker
使用 call_box
方法。这些修改如示例 20-21 所示:
文件名: src/lib.rs
trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<FnBox + Send + 'static>; // --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread, } } }
首先,新建了一个叫做 FnBox
的 trait。这个 trait 有一个方法 call_box
,它类似于其他 Fn*
trait 中的 call
方法,除了它获取 self: Box<Self>
以便获取 self
的所有权并将值从 Box<T>
中移动出来。
接下来,为任何实现了 FnOnce()
trait 的类型 F
实现 FnBox
trait。这实际上意味着任何 FnOnce()
闭包都可以使用 call_box
方法。call_box
的实现使用 (*self)()
将闭包移动出 Box<T>
并调用此闭包。
现在我们需要 Job
类型别名是任何实现了新 trait FnBox
的 Box
。这允许我们在得到 Job
值时使用 Worker
中的 call_box
。为任何 FnOnce()
闭包都实现了 FnBox
trait 意味着无需对实际在通道中发出的值做任何修改。现在 Rust 就能够理解我们的行为是正确的了。
这是非常狡猾且复杂的手段。无需过分担心他们并不是非常有道理;总有一天,这一切将是毫无必要的。
通过这个技巧,线程池处于可以运行的状态了!执行 cargo run
并发起一些请求:
$ cargo run Compiling hello v0.1.0 (file:///projects/hello) warning: field is never used: `workers` --> src/lib.rs:7:5 | 7 | workers: Vec<Worker>, | ^^^^^^^^^^^^^^^^^^^^ | = note: #[warn(dead_code)] on by default warning: field is never used: `id` --> src/lib.rs:61:5 | 61 | id: usize, | ^^^^^^^^^ | = note: #[warn(dead_code)] on by default warning: field is never used: `thread` --> src/lib.rs:62:5 | 62 | thread: thread::JoinHandle<()>, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: #[warn(dead_code)] on by default Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs Running `target/debug/hello` Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing.
成功了!现在我们有了一个可以异步执行连接的线程池!它绝不会创建超过四个线程,所以当 server 收到大量请求时系统也不会负担过重。如果请求 /sleep,server 也能够通过另外一个线程处理其他请求。
注意如果同时在多个浏览器窗口打开 /sleep,它们可能会彼此间隔地加载 5 秒,因为一些浏览器处于缓存的原因会顺序执行相同请求的多个实例。这些限制并不是由于我们的 web server 造成的。
在学习了第十八章的 while let
循环之后,你可能会好奇为何不能如此编写 worker 线程:
文件名: src/lib.rs
// --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread, } } }
这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。其原因有些微妙:Mutex
结构体没有公有 unlock
方法,因为锁的所有权依赖 lock
方法返回的 LockResult<MutexGuard<T>>
中 MutexGuard<T>
的生命周期。这允许借用检查器在编译时确保绝不会在没有持有锁的情况下访问由 Mutex
守护的资源,不过如果没有认真的思考 MutexGuard<T>
的生命周期的话,也可能会导致比预期更久的持有锁。因为 while
表达式中的值在整个块一直处于作用域中,job.call_box()
调用的过程中其仍然持有锁,这意味着其他 worker 不能接收任务。
相反通过使用 loop
并在循环块之内而不是之外获取锁和任务,lock
方法返回的 MutexGuard
在 let job
语句结束之后立刻就被丢弃了。这确保了 recv
调用过程中持有锁,而在 job.call_box()
调用前锁就被释放了,这就允许并发处理多个请求了。