mirror of
https://github.com/KaiserY/trpl-zh-cn
synced 2024-11-14 04:41:49 +08:00
1024 lines
67 KiB
HTML
1024 lines
67 KiB
HTML
|
<!DOCTYPE HTML>
|
|||
|
<html lang="en" class="light" dir="ltr">
|
|||
|
<head>
|
|||
|
<!-- Book generated using mdBook -->
|
|||
|
<meta charset="UTF-8">
|
|||
|
<title>优雅停机与清理 - Rust 程序设计语言 简体中文版</title>
|
|||
|
|
|||
|
|
|||
|
<!-- Custom HTML head -->
|
|||
|
|
|||
|
<meta name="description" content="">
|
|||
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|||
|
<meta name="theme-color" content="#ffffff">
|
|||
|
|
|||
|
<link rel="icon" href="favicon.svg">
|
|||
|
<link rel="shortcut icon" href="favicon.png">
|
|||
|
<link rel="stylesheet" href="css/variables.css">
|
|||
|
<link rel="stylesheet" href="css/general.css">
|
|||
|
<link rel="stylesheet" href="css/chrome.css">
|
|||
|
<link rel="stylesheet" href="css/print.css" media="print">
|
|||
|
|
|||
|
<!-- Fonts -->
|
|||
|
<link rel="stylesheet" href="FontAwesome/css/font-awesome.css">
|
|||
|
<link rel="stylesheet" href="fonts/fonts.css">
|
|||
|
|
|||
|
<!-- Highlight.js Stylesheets -->
|
|||
|
<link rel="stylesheet" href="highlight.css">
|
|||
|
<link rel="stylesheet" href="tomorrow-night.css">
|
|||
|
<link rel="stylesheet" href="ayu-highlight.css">
|
|||
|
|
|||
|
<!-- Custom theme stylesheets -->
|
|||
|
<link rel="stylesheet" href="ferris.css">
|
|||
|
<link rel="stylesheet" href="theme/2018-edition.css">
|
|||
|
<link rel="stylesheet" href="theme/semantic-notes.css">
|
|||
|
<link rel="stylesheet" href="theme/listing.css">
|
|||
|
|
|||
|
</head>
|
|||
|
<body class="sidebar-visible no-js">
|
|||
|
<div id="body-container">
|
|||
|
<!-- Provide site root to javascript -->
|
|||
|
<script>
|
|||
|
var path_to_root = "";
|
|||
|
var default_theme = window.matchMedia("(prefers-color-scheme: dark)").matches ? "navy" : "light";
|
|||
|
</script>
|
|||
|
|
|||
|
<!-- Work around some values being stored in localStorage wrapped in quotes -->
|
|||
|
<script>
|
|||
|
try {
|
|||
|
var theme = localStorage.getItem('mdbook-theme');
|
|||
|
var sidebar = localStorage.getItem('mdbook-sidebar');
|
|||
|
|
|||
|
if (theme.startsWith('"') && theme.endsWith('"')) {
|
|||
|
localStorage.setItem('mdbook-theme', theme.slice(1, theme.length - 1));
|
|||
|
}
|
|||
|
|
|||
|
if (sidebar.startsWith('"') && sidebar.endsWith('"')) {
|
|||
|
localStorage.setItem('mdbook-sidebar', sidebar.slice(1, sidebar.length - 1));
|
|||
|
}
|
|||
|
} catch (e) { }
|
|||
|
</script>
|
|||
|
|
|||
|
<!-- Set the theme before any content is loaded, prevents flash -->
|
|||
|
<script>
|
|||
|
var theme;
|
|||
|
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
|
|||
|
if (theme === null || theme === undefined) { theme = default_theme; }
|
|||
|
var html = document.querySelector('html');
|
|||
|
html.classList.remove('light')
|
|||
|
html.classList.add(theme);
|
|||
|
var body = document.querySelector('body');
|
|||
|
body.classList.remove('no-js')
|
|||
|
body.classList.add('js');
|
|||
|
</script>
|
|||
|
|
|||
|
<input type="checkbox" id="sidebar-toggle-anchor" class="hidden">
|
|||
|
|
|||
|
<!-- Hide / unhide sidebar before it is displayed -->
|
|||
|
<script>
|
|||
|
var body = document.querySelector('body');
|
|||
|
var sidebar = null;
|
|||
|
var sidebar_toggle = document.getElementById("sidebar-toggle-anchor");
|
|||
|
if (document.body.clientWidth >= 1080) {
|
|||
|
try { sidebar = localStorage.getItem('mdbook-sidebar'); } catch(e) { }
|
|||
|
sidebar = sidebar || 'visible';
|
|||
|
} else {
|
|||
|
sidebar = 'hidden';
|
|||
|
}
|
|||
|
sidebar_toggle.checked = sidebar === 'visible';
|
|||
|
body.classList.remove('sidebar-visible');
|
|||
|
body.classList.add("sidebar-" + sidebar);
|
|||
|
</script>
|
|||
|
|
|||
|
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
|
|||
|
<div class="sidebar-scrollbox">
|
|||
|
<ol class="chapter"><li class="chapter-item expanded affix "><a href="title-page.html">Rust 程序设计语言</a></li><li class="chapter-item expanded affix "><a href="foreword.html">前言</a></li><li class="chapter-item expanded affix "><a href="ch00-00-introduction.html">简介</a></li><li class="chapter-item expanded "><a href="ch01-00-getting-started.html"><strong aria-hidden="true">1.</strong> 入门指南</a></li><li><ol class="section"><li class="chapter-item expanded "><a href="ch01-01-installation.html"><strong aria-hidden="true">1.1.</strong> 安装</a></li><li class="chapter-item expanded "><a href="ch01-02-hello-world.html"><strong aria-hidden="true">1.2.</strong> Hello, World!</a></li><li class="chapter-item expanded "><a href="ch01-03-hello-cargo.html"><strong aria-hidden="true">1.3.</strong> Hello, Cargo!</a></li></ol></li><li class="chapter-item expanded "><a href="ch02-00-guessing-game-tutorial.html"><strong aria-hidden="true">2.</strong> 写个猜数字游戏</a></li><li class="chapter-item expanded "><a href="ch03-00-common-programming-concepts.html"><strong aria-hidden="true">3.</strong> 常见编程概念</a></li><li><ol class="section"><li class="chapter-item expanded "><a href="ch03-01-variables-and-mutability.html"><strong aria-hidden="true">3.1.</strong> 变量与可变性</a></li><li class="chapter-item expanded "><a href="ch03-02-data-types.html"><strong aria-hidden="true">3.2.</strong> 数据类型</a></li><li class="chapter-item expanded "><a href="ch03-03-how-functions-work.html"><strong aria-hidden="true">3.3.</strong> 函数</a></li><li class="chapter-item expanded "><a href="ch03-04-comments.html"><strong aria-hidden="true">3.4.</strong> 注释</a></li><li class="chapter-item expanded "><a href="ch03-05-control-flow.html"><strong aria-hidden="true">3.5.</strong> 控制流</a></li></ol></li><li class="chapter-item expanded "><a href="ch04-00-understanding-ownership.html"><strong aria-hidden="true">4.</strong> 认识所有权</a></li><li><ol class="section"><li class="chapter-item expanded "><a href="ch04-01-what-is-ownership.html"><strong aria-hidden="true">4.1.</strong> 什么是所有权?</a></li><li class="chapter-item expanded "><a href="ch04-02-references-and-borrowing.html"><strong aria-hidden="true">4.2.</strong> 引用与借用</a></li><li class="chapter-item expanded "><a href="ch04-03-slices.html"><strong aria-hidden="true">4.3.</strong> Slice 类型</a></li></ol></li><li class="chapter-item expanded "><a href="ch05-00-structs.html"><strong aria-hidden="true">5.</strong> 使用结构体组织相关联的数据</a></li><li><ol class="section"><li class="chapter-item expanded "><a href="ch05-01-defining-structs.html"><strong aria-hidden="true">5.1.</strong> 结构体的定义和实例化</a></li><li class="chapter-item expanded "><a href="ch05-02-example-structs.html"><strong aria-hidden="true">5.2.</strong> 结构体示例程序</a></li><li class="chapter-item expanded "><a href="ch05-03-method-syntax.html"><strong aria-hidden="true">5.3.</strong> 方法语法</a></li></ol></li><li class="chapter-item expanded "><a href="ch06-00-enums.html"><strong aria-hidden="true">6.</strong> 枚举和模式匹配</a></li><li><ol class="section"><li class="chapter-item expanded "><a href="ch06-01-defining-an-enum.html"><strong aria-hidden="true">6.1.</strong> 枚举的定义</a></li><li class="chapter-item expanded "><a href="ch06-02-match.html"><strong aria-hidden="true">6.2.</strong> match 控制流结构</a></li><li class="chapter-item expanded "><a href="ch06-03-if-let.html"><strong aria-hidden="true">6.3.</strong> if let 简洁控制流</a></li></ol></li><li class="chapter-item expanded "><a href="ch07-00-managing-growing-projects-with-packages-crates-and-modules.html"><strong aria-hidden="true">7.</strong> 使用包、Crate 和模块管理不断增长的项目</a></li><li><ol class="section"><li class="chapter-item expanded "><a href="ch07-01-packages-and-crates.html"><strong aria-hidden="true">7.1.</strong> 包和 Crate</a></li><li class="chapter-item expanded "><a h
|
|||
|
</div>
|
|||
|
<div id="sidebar-resize-handle" class="sidebar-resize-handle">
|
|||
|
<div class="sidebar-resize-indicator"></div>
|
|||
|
</div>
|
|||
|
</nav>
|
|||
|
|
|||
|
<!-- Track and set sidebar scroll position -->
|
|||
|
<script>
|
|||
|
var sidebarScrollbox = document.querySelector('#sidebar .sidebar-scrollbox');
|
|||
|
sidebarScrollbox.addEventListener('click', function(e) {
|
|||
|
if (e.target.tagName === 'A') {
|
|||
|
sessionStorage.setItem('sidebar-scroll', sidebarScrollbox.scrollTop);
|
|||
|
}
|
|||
|
}, { passive: true });
|
|||
|
var sidebarScrollTop = sessionStorage.getItem('sidebar-scroll');
|
|||
|
sessionStorage.removeItem('sidebar-scroll');
|
|||
|
if (sidebarScrollTop) {
|
|||
|
// preserve sidebar scroll position when navigating via links within sidebar
|
|||
|
sidebarScrollbox.scrollTop = sidebarScrollTop;
|
|||
|
} else {
|
|||
|
// scroll sidebar to current active section when navigating via "next/previous chapter" buttons
|
|||
|
var activeSection = document.querySelector('#sidebar .active');
|
|||
|
if (activeSection) {
|
|||
|
activeSection.scrollIntoView({ block: 'center' });
|
|||
|
}
|
|||
|
}
|
|||
|
</script>
|
|||
|
|
|||
|
<div id="page-wrapper" class="page-wrapper">
|
|||
|
|
|||
|
<div class="page">
|
|||
|
<div id="menu-bar-hover-placeholder"></div>
|
|||
|
<div id="menu-bar" class="menu-bar sticky">
|
|||
|
<div class="left-buttons">
|
|||
|
<label id="sidebar-toggle" class="icon-button" for="sidebar-toggle-anchor" title="Toggle Table of Contents" aria-label="Toggle Table of Contents" aria-controls="sidebar">
|
|||
|
<i class="fa fa-bars"></i>
|
|||
|
</label>
|
|||
|
<button id="theme-toggle" class="icon-button" type="button" title="Change theme" aria-label="Change theme" aria-haspopup="true" aria-expanded="false" aria-controls="theme-list">
|
|||
|
<i class="fa fa-paint-brush"></i>
|
|||
|
</button>
|
|||
|
<ul id="theme-list" class="theme-popup" aria-label="Themes" role="menu">
|
|||
|
<li role="none"><button role="menuitem" class="theme" id="light">Light</button></li>
|
|||
|
<li role="none"><button role="menuitem" class="theme" id="rust">Rust</button></li>
|
|||
|
<li role="none"><button role="menuitem" class="theme" id="coal">Coal</button></li>
|
|||
|
<li role="none"><button role="menuitem" class="theme" id="navy">Navy</button></li>
|
|||
|
<li role="none"><button role="menuitem" class="theme" id="ayu">Ayu</button></li>
|
|||
|
</ul>
|
|||
|
<button id="search-toggle" class="icon-button" type="button" title="Search. (Shortkey: s)" aria-label="Toggle Searchbar" aria-expanded="false" aria-keyshortcuts="S" aria-controls="searchbar">
|
|||
|
<i class="fa fa-search"></i>
|
|||
|
</button>
|
|||
|
</div>
|
|||
|
|
|||
|
<h1 class="menu-title">Rust 程序设计语言 简体中文版</h1>
|
|||
|
|
|||
|
<div class="right-buttons">
|
|||
|
<a href="print.html" title="Print this book" aria-label="Print this book">
|
|||
|
<i id="print-button" class="fa fa-print"></i>
|
|||
|
</a>
|
|||
|
<a href="https://github.com/KaiserY/trpl-zh-cn/tree/main" title="Git repository" aria-label="Git repository">
|
|||
|
<i id="git-repository-button" class="fa fa-github"></i>
|
|||
|
</a>
|
|||
|
<a href="https://github.com/KaiserY/trpl-zh-cn/edit/main/src/ch21-03-graceful-shutdown-and-cleanup.md" title="Suggest an edit" aria-label="Suggest an edit">
|
|||
|
<i id="git-edit-button" class="fa fa-edit"></i>
|
|||
|
</a>
|
|||
|
|
|||
|
</div>
|
|||
|
</div>
|
|||
|
|
|||
|
<div id="search-wrapper" class="hidden">
|
|||
|
<form id="searchbar-outer" class="searchbar-outer">
|
|||
|
<input type="search" id="searchbar" name="searchbar" placeholder="Search this book ..." aria-controls="searchresults-outer" aria-describedby="searchresults-header">
|
|||
|
</form>
|
|||
|
<div id="searchresults-outer" class="searchresults-outer hidden">
|
|||
|
<div id="searchresults-header" class="searchresults-header"></div>
|
|||
|
<ul id="searchresults">
|
|||
|
</ul>
|
|||
|
</div>
|
|||
|
</div>
|
|||
|
|
|||
|
<!-- Apply ARIA attributes after the sidebar and the sidebar toggle button are added to the DOM -->
|
|||
|
<script>
|
|||
|
document.getElementById('sidebar-toggle').setAttribute('aria-expanded', sidebar === 'visible');
|
|||
|
document.getElementById('sidebar').setAttribute('aria-hidden', sidebar !== 'visible');
|
|||
|
Array.from(document.querySelectorAll('#sidebar a')).forEach(function(link) {
|
|||
|
link.setAttribute('tabIndex', sidebar === 'visible' ? 0 : -1);
|
|||
|
});
|
|||
|
</script>
|
|||
|
|
|||
|
<div id="content" class="content">
|
|||
|
<main>
|
|||
|
<h2 id="优雅停机与清理"><a class="header" href="#优雅停机与清理">优雅停机与清理</a></h2>
|
|||
|
<blockquote>
|
|||
|
<p><a href="https://github.com/rust-lang/book/blob/main/src/ch21-03-graceful-shutdown-and-cleanup.md">ch21-03-graceful-shutdown-and-cleanup.md</a>
|
|||
|
<br>
|
|||
|
commit 3e5105b52f7e8d3d95def07ffade4dcb1cfdee27</p>
|
|||
|
</blockquote>
|
|||
|
<p>示例 20-20 中的代码如期通过使用线程池异步的响应请求。这里有一些警告说 <code>workers</code>、<code>id</code> 和 <code>thread</code> 字段没有直接被使用,这提醒了我们并没有清理所有的内容。当使用不那么优雅的 <span class="keystroke">ctrl-c</span> 终止主线程时,所有其他线程也会立刻停止,即便它们正处于处理请求的过程中。</p>
|
|||
|
<p>现在我们要为 <code>ThreadPool</code> 实现 <code>Drop</code> trait 对线程池中的每一个线程调用 <code>join</code>,这样这些线程将会执行完它们的请求。接着会为 <code>ThreadPool</code> 实现一个告诉线程它们应该停止接收新请求并结束的方式。为了实践这些代码,修改 server 在优雅停机(graceful shutdown)之前只接受两个请求。</p>
|
|||
|
<h3 id="为-threadpool-实现-drop-trait"><a class="header" href="#为-threadpool-实现-drop-trait">为 <code>ThreadPool</code> 实现 <code>Drop</code> Trait</a></h3>
|
|||
|
<p>现在开始为线程池实现 <code>Drop</code>。当线程池被丢弃时,应该 join 所有线程以确保它们完成其操作。示例 20-22 展示了 <code>Drop</code> 实现的第一次尝试;这些代码还不能够编译:</p>
|
|||
|
<p><span class="filename">文件名:src/lib.rs</span></p>
|
|||
|
<pre><code class="language-rust ignore does_not_compile"><span class="boring">use std::{
|
|||
|
</span><span class="boring"> sync::{mpsc, Arc, Mutex},
|
|||
|
</span><span class="boring"> thread,
|
|||
|
</span><span class="boring">};
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">pub struct ThreadPool {
|
|||
|
</span><span class="boring"> workers: Vec<Worker>,
|
|||
|
</span><span class="boring"> sender: mpsc::Sender<Job>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">type Job = Box<dyn FnOnce() + Send + 'static>;
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl ThreadPool {
|
|||
|
</span><span class="boring"> /// Create a new ThreadPool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The size is the number of threads in the pool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// # Panics
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The `new` function will panic if the size is zero.
|
|||
|
</span><span class="boring"> pub fn new(size: usize) -> ThreadPool {
|
|||
|
</span><span class="boring"> assert!(size > 0);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> for id in 0..size {
|
|||
|
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&receiver)));
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> ThreadPool { workers, sender }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> pub fn execute<F>(&self, f: F)
|
|||
|
</span><span class="boring"> where
|
|||
|
</span><span class="boring"> F: FnOnce() + Send + 'static,
|
|||
|
</span><span class="boring"> {
|
|||
|
</span><span class="boring"> let job = Box::new(f);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> self.sender.send(job).unwrap();
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span>impl Drop for ThreadPool {
|
|||
|
fn drop(&mut self) {
|
|||
|
for worker in &mut self.workers {
|
|||
|
println!("Shutting down worker {}", worker.id);
|
|||
|
|
|||
|
worker.thread.join().unwrap();
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
<span class="boring">
|
|||
|
</span><span class="boring">struct Worker {
|
|||
|
</span><span class="boring"> id: usize,
|
|||
|
</span><span class="boring"> thread: thread::JoinHandle<()>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl Worker {
|
|||
|
</span><span class="boring"> fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
|
|||
|
</span><span class="boring"> let thread = thread::spawn(move || loop {
|
|||
|
</span><span class="boring"> let job = receiver.lock().unwrap().recv().unwrap();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> println!("Worker {id} got a job; executing.");
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> job();
|
|||
|
</span><span class="boring"> });
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> Worker { id, thread }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}</span></code></pre>
|
|||
|
<p><span class="caption">示例 20-22: 当线程池离开作用域时 join 每个线程</span></p>
|
|||
|
<p>这里首先遍历线程池中的每个 <code>workers</code>。这里使用了 <code>&mut</code> 因为 <code>self</code> 本身是一个可变引用而且也需要能够修改 <code>worker</code>。对于每一个线程,会打印出说明信息表明此特定 worker 正在关闭,接着在 worker 线程上调用 <code>join</code>。如果 <code>join</code> 调用失败,通过 <code>unwrap</code> 使得 panic 并进行不优雅的关闭。</p>
|
|||
|
<p>如下是尝试编译代码时得到的错误:</p>
|
|||
|
<pre><code class="language-console">$ cargo check
|
|||
|
Checking hello v0.1.0 (file:///projects/hello)
|
|||
|
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
|
|||
|
--> src/lib.rs:52:13
|
|||
|
|
|
|||
|
52 | worker.thread.join().unwrap();
|
|||
|
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
|
|||
|
| |
|
|||
|
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
|||
|
|
|
|||
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
|
|||
|
--> /rustc/eeb90cda1969383f56a2637cbd3037bdf598841c/library/std/src/thread/mod.rs:1778:17
|
|||
|
|
|||
|
For more information about this error, try `rustc --explain E0507`.
|
|||
|
error: could not compile `hello` (lib) due to 1 previous error
|
|||
|
</code></pre>
|
|||
|
<p>这里的错误告诉我们并不能调用 <code>join</code>,因为我们只有每一个 <code>worker</code> 的可变借用,而 <code>join</code> 需要获取其参数的所有权。为了解决这个问题,需要一个方法将 <code>thread</code> 移动出拥有其所有权的 <code>Worker</code> 实例以便 <code>join</code> 可以消费这个线程。示例 17-15 中我们曾见过这么做的方法:如果 <code>Worker</code> 存放的是 <code>Option<thread::JoinHandle<()></code>,就可以在 <code>Option</code> 上调用 <code>take</code> 方法将值从 <code>Some</code> 成员中移动出来而对 <code>None</code> 成员不做处理。换句话说,正在运行的 <code>Worker</code> 的 <code>thread</code> 将是 <code>Some</code> 成员值,而当需要清理 worker 时,将 <code>Some</code> 替换为 <code>None</code>,这样 worker 就没有可以运行的线程了。</p>
|
|||
|
<p>为此需要更新 <code>Worker</code> 的定义为如下:</p>
|
|||
|
<p><span class="filename">文件名:src/lib.rs</span></p>
|
|||
|
<pre><code class="language-rust ignore does_not_compile"><span class="boring">use std::{
|
|||
|
</span><span class="boring"> sync::{mpsc, Arc, Mutex},
|
|||
|
</span><span class="boring"> thread,
|
|||
|
</span><span class="boring">};
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">pub struct ThreadPool {
|
|||
|
</span><span class="boring"> workers: Vec<Worker>,
|
|||
|
</span><span class="boring"> sender: mpsc::Sender<Job>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">type Job = Box<dyn FnOnce() + Send + 'static>;
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl ThreadPool {
|
|||
|
</span><span class="boring"> /// Create a new ThreadPool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The size is the number of threads in the pool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// # Panics
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The `new` function will panic if the size is zero.
|
|||
|
</span><span class="boring"> pub fn new(size: usize) -> ThreadPool {
|
|||
|
</span><span class="boring"> assert!(size > 0);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> for id in 0..size {
|
|||
|
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&receiver)));
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> ThreadPool { workers, sender }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> pub fn execute<F>(&self, f: F)
|
|||
|
</span><span class="boring"> where
|
|||
|
</span><span class="boring"> F: FnOnce() + Send + 'static,
|
|||
|
</span><span class="boring"> {
|
|||
|
</span><span class="boring"> let job = Box::new(f);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> self.sender.send(job).unwrap();
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl Drop for ThreadPool {
|
|||
|
</span><span class="boring"> fn drop(&mut self) {
|
|||
|
</span><span class="boring"> for worker in &mut self.workers {
|
|||
|
</span><span class="boring"> println!("Shutting down worker {}", worker.id);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> worker.thread.join().unwrap();
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span>struct Worker {
|
|||
|
id: usize,
|
|||
|
thread: Option<thread::JoinHandle<()>>,
|
|||
|
}
|
|||
|
<span class="boring">
|
|||
|
</span><span class="boring">impl Worker {
|
|||
|
</span><span class="boring"> fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
|
|||
|
</span><span class="boring"> let thread = thread::spawn(move || loop {
|
|||
|
</span><span class="boring"> let job = receiver.lock().unwrap().recv().unwrap();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> println!("Worker {id} got a job; executing.");
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> job();
|
|||
|
</span><span class="boring"> });
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> Worker { id, thread }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}</span></code></pre>
|
|||
|
<p>现在依靠编译器来找出其他需要修改的地方。check 代码会得到两个错误:</p>
|
|||
|
<pre><code class="language-console">$ cargo check
|
|||
|
Checking hello v0.1.0 (file:///projects/hello)
|
|||
|
error[E0599]: no method named `join` found for enum `Option` in the current scope
|
|||
|
--> src/lib.rs:52:27
|
|||
|
|
|
|||
|
52 | worker.thread.join().unwrap();
|
|||
|
| ^^^^ method not found in `Option<JoinHandle<()>>`
|
|||
|
|
|
|||
|
note: the method `join` exists on the type `JoinHandle<()>`
|
|||
|
--> /rustc/eeb90cda1969383f56a2637cbd3037bdf598841c/library/std/src/thread/mod.rs:1778:5
|
|||
|
help: consider using `Option::expect` to unwrap the `JoinHandle<()>` value, panicking if the value is an `Option::None`
|
|||
|
|
|
|||
|
52 | worker.thread.expect("REASON").join().unwrap();
|
|||
|
| +++++++++++++++++
|
|||
|
|
|||
|
error[E0308]: mismatched types
|
|||
|
--> src/lib.rs:72:22
|
|||
|
|
|
|||
|
72 | Worker { id, thread }
|
|||
|
| ^^^^^^ expected `Option<JoinHandle<()>>`, found `JoinHandle<_>`
|
|||
|
|
|
|||
|
= note: expected enum `Option<JoinHandle<()>>`
|
|||
|
found struct `JoinHandle<_>`
|
|||
|
help: try wrapping the expression in `Some`
|
|||
|
|
|
|||
|
72 | Worker { id, thread: Some(thread) }
|
|||
|
| +++++++++++++ +
|
|||
|
|
|||
|
Some errors have detailed explanations: E0308, E0599.
|
|||
|
For more information about an error, try `rustc --explain E0308`.
|
|||
|
error: could not compile `hello` (lib) due to 2 previous errors
|
|||
|
</code></pre>
|
|||
|
<p>让我们修复第二个错误,它指向 <code>Worker::new</code> 结尾的代码;当新建 <code>Worker</code> 时需要将 <code>thread</code> 值封装进 <code>Some</code>。做出如下改变以修复问题:</p>
|
|||
|
<p><span class="filename">文件名:src/lib.rs</span></p>
|
|||
|
<pre><code class="language-rust ignore does_not_compile"><span class="boring">use std::{
|
|||
|
</span><span class="boring"> sync::{mpsc, Arc, Mutex},
|
|||
|
</span><span class="boring"> thread,
|
|||
|
</span><span class="boring">};
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">pub struct ThreadPool {
|
|||
|
</span><span class="boring"> workers: Vec<Worker>,
|
|||
|
</span><span class="boring"> sender: mpsc::Sender<Job>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">type Job = Box<dyn FnOnce() + Send + 'static>;
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl ThreadPool {
|
|||
|
</span><span class="boring"> /// Create a new ThreadPool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The size is the number of threads in the pool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// # Panics
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The `new` function will panic if the size is zero.
|
|||
|
</span><span class="boring"> pub fn new(size: usize) -> ThreadPool {
|
|||
|
</span><span class="boring"> assert!(size > 0);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> for id in 0..size {
|
|||
|
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&receiver)));
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> ThreadPool { workers, sender }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> pub fn execute<F>(&self, f: F)
|
|||
|
</span><span class="boring"> where
|
|||
|
</span><span class="boring"> F: FnOnce() + Send + 'static,
|
|||
|
</span><span class="boring"> {
|
|||
|
</span><span class="boring"> let job = Box::new(f);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> self.sender.send(job).unwrap();
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl Drop for ThreadPool {
|
|||
|
</span><span class="boring"> fn drop(&mut self) {
|
|||
|
</span><span class="boring"> for worker in &mut self.workers {
|
|||
|
</span><span class="boring"> println!("Shutting down worker {}", worker.id);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> worker.thread.join().unwrap();
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">struct Worker {
|
|||
|
</span><span class="boring"> id: usize,
|
|||
|
</span><span class="boring"> thread: Option<thread::JoinHandle<()>>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span>impl Worker {
|
|||
|
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
|
|||
|
// --snip--
|
|||
|
|
|||
|
<span class="boring"> let thread = thread::spawn(move || loop {
|
|||
|
</span><span class="boring"> let job = receiver.lock().unwrap().recv().unwrap();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> println!("Worker {id} got a job; executing.");
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> job();
|
|||
|
</span><span class="boring"> });
|
|||
|
</span><span class="boring">
|
|||
|
</span> Worker {
|
|||
|
id,
|
|||
|
thread: Some(thread),
|
|||
|
}
|
|||
|
}
|
|||
|
}</code></pre>
|
|||
|
<p>第一个错误位于 <code>Drop</code> 实现中。之前提到过要调用 <code>Option</code> 上的 <code>take</code> 将 <code>thread</code> 移动出 <code>worker</code>。如下改变会修复问题:</p>
|
|||
|
<p><span class="filename">文件名:src/lib.rs</span></p>
|
|||
|
<pre><code class="language-rust ignore not_desired_behavior"><span class="boring">use std::{
|
|||
|
</span><span class="boring"> sync::{mpsc, Arc, Mutex},
|
|||
|
</span><span class="boring"> thread,
|
|||
|
</span><span class="boring">};
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">pub struct ThreadPool {
|
|||
|
</span><span class="boring"> workers: Vec<Worker>,
|
|||
|
</span><span class="boring"> sender: mpsc::Sender<Job>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">type Job = Box<dyn FnOnce() + Send + 'static>;
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl ThreadPool {
|
|||
|
</span><span class="boring"> /// Create a new ThreadPool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The size is the number of threads in the pool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// # Panics
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The `new` function will panic if the size is zero.
|
|||
|
</span><span class="boring"> pub fn new(size: usize) -> ThreadPool {
|
|||
|
</span><span class="boring"> assert!(size > 0);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> for id in 0..size {
|
|||
|
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&receiver)));
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> ThreadPool { workers, sender }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> pub fn execute<F>(&self, f: F)
|
|||
|
</span><span class="boring"> where
|
|||
|
</span><span class="boring"> F: FnOnce() + Send + 'static,
|
|||
|
</span><span class="boring"> {
|
|||
|
</span><span class="boring"> let job = Box::new(f);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> self.sender.send(job).unwrap();
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span>impl Drop for ThreadPool {
|
|||
|
fn drop(&mut self) {
|
|||
|
for worker in &mut self.workers {
|
|||
|
println!("Shutting down worker {}", worker.id);
|
|||
|
|
|||
|
if let Some(thread) = worker.thread.take() {
|
|||
|
thread.join().unwrap();
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
<span class="boring">
|
|||
|
</span><span class="boring">struct Worker {
|
|||
|
</span><span class="boring"> id: usize,
|
|||
|
</span><span class="boring"> thread: Option<thread::JoinHandle<()>>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl Worker {
|
|||
|
</span><span class="boring"> fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
|
|||
|
</span><span class="boring"> let thread = thread::spawn(move || loop {
|
|||
|
</span><span class="boring"> let job = receiver.lock().unwrap().recv().unwrap();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> println!("Worker {id} got a job; executing.");
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> job();
|
|||
|
</span><span class="boring"> });
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> Worker {
|
|||
|
</span><span class="boring"> id,
|
|||
|
</span><span class="boring"> thread: Some(thread),
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}</span></code></pre>
|
|||
|
<p>如第十八章我们见过的,<code>Option</code> 上的 <code>take</code> 方法会取出 <code>Some</code> 而留下 <code>None</code>。使用 <code>if let</code> 解构 <code>Some</code> 并得到线程,接着在线程上调用 <code>join</code>。如果 worker 的线程已然是 <code>None</code>,就知道此时这个 worker 已经清理了其线程所以无需做任何操作。</p>
|
|||
|
<h3 id="向线程发送信号使其停止接收任务"><a class="header" href="#向线程发送信号使其停止接收任务">向线程发送信号使其停止接收任务</a></h3>
|
|||
|
<p>有了所有这些修改,代码就能编译且没有任何警告。不过也有坏消息,这些代码还不能以我们期望的方式运行。问题的关键在于 <code>Worker</code> 中分配的线程所运行的闭包中的逻辑:调用 <code>join</code> 并不会关闭线程,因为它们一直 <code>loop</code> 来寻找任务。如果采用这个实现来尝试丢弃 <code>ThreadPool</code>,则主线程会永远阻塞在等待第一个线程结束上。</p>
|
|||
|
<p>为了修复这个问题,我们将修改 <code>ThreadPool</code> 的 <code>drop</code> 实现并修改 <code>Worker</code> 循环。</p>
|
|||
|
<p>首先修改 <code>ThreadPool</code> 的 <code>drop</code> 实现在等待线程结束前显式丢弃 <code>sender</code>。示例 20-23 展示了 <code>ThreadPool</code> 显式丢弃 <code>sender</code> 所作的修改。我们使用了与之前处理线程时相同的 <code>Option</code> 和 <code>take</code> 技术以便能从 <code>ThreadPool</code> 中移动 <code>sender</code>:</p>
|
|||
|
<p><span class="filename">文件名:src/lib.rs</span></p>
|
|||
|
<pre><code class="language-rust noplayground not_desired_behavior"><span class="boring">use std::{
|
|||
|
</span><span class="boring"> sync::{mpsc, Arc, Mutex},
|
|||
|
</span><span class="boring"> thread,
|
|||
|
</span><span class="boring">};
|
|||
|
</span><span class="boring">
|
|||
|
</span>pub struct ThreadPool {
|
|||
|
workers: Vec<Worker>,
|
|||
|
sender: Option<mpsc::Sender<Job>>,
|
|||
|
}
|
|||
|
// --snip--
|
|||
|
<span class="boring">
|
|||
|
</span><span class="boring">type Job = Box<dyn FnOnce() + Send + 'static>;
|
|||
|
</span><span class="boring">
|
|||
|
</span>impl ThreadPool {
|
|||
|
<span class="boring"> /// Create a new ThreadPool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The size is the number of threads in the pool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// # Panics
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The `new` function will panic if the size is zero.
|
|||
|
</span> pub fn new(size: usize) -> ThreadPool {
|
|||
|
// --snip--
|
|||
|
|
|||
|
<span class="boring"> assert!(size > 0);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> for id in 0..size {
|
|||
|
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&receiver)));
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span> ThreadPool {
|
|||
|
workers,
|
|||
|
sender: Some(sender),
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
pub fn execute<F>(&self, f: F)
|
|||
|
where
|
|||
|
F: FnOnce() + Send + 'static,
|
|||
|
{
|
|||
|
let job = Box::new(f);
|
|||
|
|
|||
|
self.sender.as_ref().unwrap().send(job).unwrap();
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
impl Drop for ThreadPool {
|
|||
|
fn drop(&mut self) {
|
|||
|
drop(self.sender.take());
|
|||
|
|
|||
|
for worker in &mut self.workers {
|
|||
|
println!("Shutting down worker {}", worker.id);
|
|||
|
|
|||
|
if let Some(thread) = worker.thread.take() {
|
|||
|
thread.join().unwrap();
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
<span class="boring">
|
|||
|
</span><span class="boring">struct Worker {
|
|||
|
</span><span class="boring"> id: usize,
|
|||
|
</span><span class="boring"> thread: Option<thread::JoinHandle<()>>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl Worker {
|
|||
|
</span><span class="boring"> fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
|
|||
|
</span><span class="boring"> let thread = thread::spawn(move || loop {
|
|||
|
</span><span class="boring"> let job = receiver.lock().unwrap().recv().unwrap();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> println!("Worker {id} got a job; executing.");
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> job();
|
|||
|
</span><span class="boring"> });
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> Worker {
|
|||
|
</span><span class="boring"> id,
|
|||
|
</span><span class="boring"> thread: Some(thread),
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}</span></code></pre>
|
|||
|
<p><span class="caption">示例 20-23: 在 join worker 线程之前显式丢弃 <code>sender</code></span></p>
|
|||
|
<p>丢弃 <code>sender</code> 会关闭信道,这表明不会有更多的消息被发送。这时 worker 中的无限循环中的所有 <code>recv</code> 调用都会返回错误。在示例 20-24 中,我们修改 <code>Worker</code> 循环在这种情况下优雅地退出,这意味着当 <code>ThreadPool</code> 的 <code>drop</code> 实现调用 <code>join</code> 时线程会结束。</p>
|
|||
|
<p><span class="filename">文件名:src/lib.rs</span></p>
|
|||
|
<pre><code class="language-rust noplayground"><span class="boring">use std::{
|
|||
|
</span><span class="boring"> sync::{mpsc, Arc, Mutex},
|
|||
|
</span><span class="boring"> thread,
|
|||
|
</span><span class="boring">};
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">pub struct ThreadPool {
|
|||
|
</span><span class="boring"> workers: Vec<Worker>,
|
|||
|
</span><span class="boring"> sender: Option<mpsc::Sender<Job>>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">type Job = Box<dyn FnOnce() + Send + 'static>;
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl ThreadPool {
|
|||
|
</span><span class="boring"> /// Create a new ThreadPool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The size is the number of threads in the pool.
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// # Panics
|
|||
|
</span><span class="boring"> ///
|
|||
|
</span><span class="boring"> /// The `new` function will panic if the size is zero.
|
|||
|
</span><span class="boring"> pub fn new(size: usize) -> ThreadPool {
|
|||
|
</span><span class="boring"> assert!(size > 0);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> for id in 0..size {
|
|||
|
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&receiver)));
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> ThreadPool {
|
|||
|
</span><span class="boring"> workers,
|
|||
|
</span><span class="boring"> sender: Some(sender),
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> pub fn execute<F>(&self, f: F)
|
|||
|
</span><span class="boring"> where
|
|||
|
</span><span class="boring"> F: FnOnce() + Send + 'static,
|
|||
|
</span><span class="boring"> {
|
|||
|
</span><span class="boring"> let job = Box::new(f);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> self.sender.as_ref().unwrap().send(job).unwrap();
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">impl Drop for ThreadPool {
|
|||
|
</span><span class="boring"> fn drop(&mut self) {
|
|||
|
</span><span class="boring"> drop(self.sender.take());
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> for worker in &mut self.workers {
|
|||
|
</span><span class="boring"> println!("Shutting down worker {}", worker.id);
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> if let Some(thread) = worker.thread.take() {
|
|||
|
</span><span class="boring"> thread.join().unwrap();
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring">struct Worker {
|
|||
|
</span><span class="boring"> id: usize,
|
|||
|
</span><span class="boring"> thread: Option<thread::JoinHandle<()>>,
|
|||
|
</span><span class="boring">}
|
|||
|
</span><span class="boring">
|
|||
|
</span>impl Worker {
|
|||
|
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
|
|||
|
let thread = thread::spawn(move || loop {
|
|||
|
let message = receiver.lock().unwrap().recv();
|
|||
|
|
|||
|
match message {
|
|||
|
Ok(job) => {
|
|||
|
println!("Worker {id} got a job; executing.");
|
|||
|
|
|||
|
job();
|
|||
|
}
|
|||
|
Err(_) => {
|
|||
|
println!("Worker {id} disconnected; shutting down.");
|
|||
|
break;
|
|||
|
}
|
|||
|
}
|
|||
|
});
|
|||
|
|
|||
|
Worker {
|
|||
|
id,
|
|||
|
thread: Some(thread),
|
|||
|
}
|
|||
|
}
|
|||
|
}</code></pre>
|
|||
|
<p><span class="caption">示例 20-24:当 <code>recv</code> 返回错误时显式退出循环</span></p>
|
|||
|
<p>为了实践这些代码,如示例 20-25 所示修改 <code>main</code> 在优雅停机 server 之前只接受两个请求:</p>
|
|||
|
<p><span class="filename">文件名:src/main.rs</span></p>
|
|||
|
<pre><code class="language-rust ignore"><span class="boring">use hello::ThreadPool;
|
|||
|
</span><span class="boring">use std::{
|
|||
|
</span><span class="boring"> fs,
|
|||
|
</span><span class="boring"> io::{prelude::*, BufReader},
|
|||
|
</span><span class="boring"> net::{TcpListener, TcpStream},
|
|||
|
</span><span class="boring"> thread,
|
|||
|
</span><span class="boring"> time::Duration,
|
|||
|
</span><span class="boring">};
|
|||
|
</span><span class="boring">
|
|||
|
</span>fn main() {
|
|||
|
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
|
|||
|
let pool = ThreadPool::new(4);
|
|||
|
|
|||
|
for stream in listener.incoming().take(2) {
|
|||
|
let stream = stream.unwrap();
|
|||
|
|
|||
|
pool.execute(|| {
|
|||
|
handle_connection(stream);
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
println!("Shutting down.");
|
|||
|
}
|
|||
|
<span class="boring">
|
|||
|
</span><span class="boring">fn handle_connection(mut stream: TcpStream) {
|
|||
|
</span><span class="boring"> let buf_reader = BufReader::new(&stream);
|
|||
|
</span><span class="boring"> let request_line = buf_reader.lines().next().unwrap().unwrap();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let (status_line, filename) = match &request_line[..] {
|
|||
|
</span><span class="boring"> "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
|
|||
|
</span><span class="boring"> "GET /sleep HTTP/1.1" => {
|
|||
|
</span><span class="boring"> thread::sleep(Duration::from_secs(5));
|
|||
|
</span><span class="boring"> ("HTTP/1.1 200 OK", "hello.html")
|
|||
|
</span><span class="boring"> }
|
|||
|
</span><span class="boring"> _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
|
|||
|
</span><span class="boring"> };
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let contents = fs::read_to_string(filename).unwrap();
|
|||
|
</span><span class="boring"> let length = contents.len();
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> let response =
|
|||
|
</span><span class="boring"> format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
|
|||
|
</span><span class="boring">
|
|||
|
</span><span class="boring"> stream.write_all(response.as_bytes()).unwrap();
|
|||
|
</span><span class="boring">}</span></code></pre>
|
|||
|
<p><span class="caption">示例 20-25: 在处理两个请求之后通过退出循环来停止 server</span></p>
|
|||
|
<p>你不会希望真实世界的 web server 只处理两次请求就停机了,这只是为了展示优雅停机和清理处于正常工作状态。</p>
|
|||
|
<p><code>take</code> 方法定义于 <code>Iterator</code> trait,这里限制循环最多头 2 次。<code>ThreadPool</code> 会在 <code>main</code> 的结尾离开作用域,而且还会看到 <code>drop</code> 实现的运行。</p>
|
|||
|
<p>使用 <code>cargo run</code> 启动 server,并发起三个请求。第三个请求应该会失败,而终端的输出应该看起来像这样:</p>
|
|||
|
<pre><code class="language-console">$ cargo run
|
|||
|
Compiling hello v0.1.0 (file:///projects/hello)
|
|||
|
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
|
|||
|
Running `target/debug/hello`
|
|||
|
Worker 0 got a job; executing.
|
|||
|
Shutting down.
|
|||
|
Shutting down worker 0
|
|||
|
Worker 3 got a job; executing.
|
|||
|
Worker 1 disconnected; shutting down.
|
|||
|
Worker 2 disconnected; shutting down.
|
|||
|
Worker 3 disconnected; shutting down.
|
|||
|
Worker 0 disconnected; shutting down.
|
|||
|
Shutting down worker 1
|
|||
|
Shutting down worker 2
|
|||
|
Shutting down worker 3
|
|||
|
</code></pre>
|
|||
|
<p>可能会出现不同顺序的 worker 和信息输出。可以从信息中看到服务是如何运行的:worker 0 和 worker 3 获取了头两个请求。server 会在头第二个请求后停止接受请求,<code>ThreadPool</code> 的 <code>Drop</code> 实现甚至会在 worker 3 开始工作之前就开始执行。丢弃 <code>sender</code> 会断开所有 worker 的连接并让它们关闭。每个 worker 在断开时会打印出一个信息,接着线程池调用 <code>join</code> 来等待每一个 worker 线程结束。</p>
|
|||
|
<p>这个特定的运行过程中一个有趣的地方在于:<code>ThreadPool</code> 丢弃 <code>sender</code>,而在任何线程收到消息之前,就尝试 join worker 0 了。worker 0 还没有从 <code>recv</code> 获得一个错误,所以主线程阻塞直到 worker 0 结束。与此同时,worker 3 接收到一个任务接着所有线程会收到一个错误。一旦 worker 0 结束,主线程就等待余下其他 worker 结束。此时它们都退出了循环并停止。</p>
|
|||
|
<p>恭喜!现在我们完成了这个项目,也有了一个使用线程池异步响应请求的基础 web server。我们能对 server 执行优雅停机,它会清理线程池中的所有线程。</p>
|
|||
|
<p>如下是完整的代码参考:</p>
|
|||
|
<p><span class="filename">文件名:src/main.rs</span></p>
|
|||
|
<pre><code class="language-rust ignore">use hello::ThreadPool;
|
|||
|
use std::{
|
|||
|
fs,
|
|||
|
io::{prelude::*, BufReader},
|
|||
|
net::{TcpListener, TcpStream},
|
|||
|
thread,
|
|||
|
time::Duration,
|
|||
|
};
|
|||
|
|
|||
|
fn main() {
|
|||
|
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
|
|||
|
let pool = ThreadPool::new(4);
|
|||
|
|
|||
|
for stream in listener.incoming().take(2) {
|
|||
|
let stream = stream.unwrap();
|
|||
|
|
|||
|
pool.execute(|| {
|
|||
|
handle_connection(stream);
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
println!("Shutting down.");
|
|||
|
}
|
|||
|
|
|||
|
fn handle_connection(mut stream: TcpStream) {
|
|||
|
let buf_reader = BufReader::new(&stream);
|
|||
|
let request_line = buf_reader.lines().next().unwrap().unwrap();
|
|||
|
|
|||
|
let (status_line, filename) = match &request_line[..] {
|
|||
|
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
|
|||
|
"GET /sleep HTTP/1.1" => {
|
|||
|
thread::sleep(Duration::from_secs(5));
|
|||
|
("HTTP/1.1 200 OK", "hello.html")
|
|||
|
}
|
|||
|
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
|
|||
|
};
|
|||
|
|
|||
|
let contents = fs::read_to_string(filename).unwrap();
|
|||
|
let length = contents.len();
|
|||
|
|
|||
|
let response =
|
|||
|
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
|
|||
|
|
|||
|
stream.write_all(response.as_bytes()).unwrap();
|
|||
|
}</code></pre>
|
|||
|
<p><span class="filename">文件名:src/lib.rs</span></p>
|
|||
|
<pre><code class="language-rust noplayground">use std::{
|
|||
|
sync::{mpsc, Arc, Mutex},
|
|||
|
thread,
|
|||
|
};
|
|||
|
|
|||
|
pub struct ThreadPool {
|
|||
|
workers: Vec<Worker>,
|
|||
|
sender: Option<mpsc::Sender<Job>>,
|
|||
|
}
|
|||
|
|
|||
|
type Job = Box<dyn FnOnce() + Send + 'static>;
|
|||
|
|
|||
|
impl ThreadPool {
|
|||
|
/// Create a new ThreadPool.
|
|||
|
///
|
|||
|
/// The size is the number of threads in the pool.
|
|||
|
///
|
|||
|
/// # Panics
|
|||
|
///
|
|||
|
/// The `new` function will panic if the size is zero.
|
|||
|
pub fn new(size: usize) -> ThreadPool {
|
|||
|
assert!(size > 0);
|
|||
|
|
|||
|
let (sender, receiver) = mpsc::channel();
|
|||
|
|
|||
|
let receiver = Arc::new(Mutex::new(receiver));
|
|||
|
|
|||
|
let mut workers = Vec::with_capacity(size);
|
|||
|
|
|||
|
for id in 0..size {
|
|||
|
workers.push(Worker::new(id, Arc::clone(&receiver)));
|
|||
|
}
|
|||
|
|
|||
|
ThreadPool {
|
|||
|
workers,
|
|||
|
sender: Some(sender),
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
pub fn execute<F>(&self, f: F)
|
|||
|
where
|
|||
|
F: FnOnce() + Send + 'static,
|
|||
|
{
|
|||
|
let job = Box::new(f);
|
|||
|
|
|||
|
self.sender.as_ref().unwrap().send(job).unwrap();
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
impl Drop for ThreadPool {
|
|||
|
fn drop(&mut self) {
|
|||
|
drop(self.sender.take());
|
|||
|
|
|||
|
for worker in &mut self.workers {
|
|||
|
println!("Shutting down worker {}", worker.id);
|
|||
|
|
|||
|
if let Some(thread) = worker.thread.take() {
|
|||
|
thread.join().unwrap();
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
struct Worker {
|
|||
|
id: usize,
|
|||
|
thread: Option<thread::JoinHandle<()>>,
|
|||
|
}
|
|||
|
|
|||
|
impl Worker {
|
|||
|
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
|
|||
|
let thread = thread::spawn(move || loop {
|
|||
|
let message = receiver.lock().unwrap().recv();
|
|||
|
|
|||
|
match message {
|
|||
|
Ok(job) => {
|
|||
|
println!("Worker {id} got a job; executing.");
|
|||
|
|
|||
|
job();
|
|||
|
}
|
|||
|
Err(_) => {
|
|||
|
println!("Worker {id} disconnected; shutting down.");
|
|||
|
break;
|
|||
|
}
|
|||
|
}
|
|||
|
});
|
|||
|
|
|||
|
Worker {
|
|||
|
id,
|
|||
|
thread: Some(thread),
|
|||
|
}
|
|||
|
}
|
|||
|
}</code></pre>
|
|||
|
<p>这里还有很多可以做的事!如果你希望继续增强这个项目,如下是一些点子:</p>
|
|||
|
<ul>
|
|||
|
<li>为 <code>ThreadPool</code> 和其公有方法增加更多文档</li>
|
|||
|
<li>为库的功能增加测试</li>
|
|||
|
<li>将 <code>unwrap</code> 调用改为更健壮的错误处理</li>
|
|||
|
<li>使用 <code>ThreadPool</code> 进行其他不同于处理网络请求的任务</li>
|
|||
|
<li>在 <a href="https://crates.io/">crates.io</a> 上寻找一个线程池 crate 并使用它实现一个类似的 web server,将其 API 和鲁棒性与我们的实现做对比</li>
|
|||
|
</ul>
|
|||
|
<h2 id="总结"><a class="header" href="#总结">总结</a></h2>
|
|||
|
<p>好极了!你结束了本书的学习!由衷感谢你同我们一道加入这次 Rust 之旅。现在你已经准备好出发并实现自己的 Rust 项目并帮助他人了。请不要忘记我们的社区,这里有其他 Rustaceans 正乐于帮助你迎接 Rust 之路上的任何挑战。</p>
|
|||
|
|
|||
|
</main>
|
|||
|
|
|||
|
<nav class="nav-wrapper" aria-label="Page navigation">
|
|||
|
<!-- Mobile navigation buttons -->
|
|||
|
<a rel="prev" href="ch21-02-multithreaded.html" class="mobile-nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
|
|||
|
<i class="fa fa-angle-left"></i>
|
|||
|
</a>
|
|||
|
|
|||
|
<a rel="next prefetch" href="appendix-00.html" class="mobile-nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
|
|||
|
<i class="fa fa-angle-right"></i>
|
|||
|
</a>
|
|||
|
|
|||
|
<div style="clear: both"></div>
|
|||
|
</nav>
|
|||
|
</div>
|
|||
|
</div>
|
|||
|
|
|||
|
<nav class="nav-wide-wrapper" aria-label="Page navigation">
|
|||
|
<a rel="prev" href="ch21-02-multithreaded.html" class="nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
|
|||
|
<i class="fa fa-angle-left"></i>
|
|||
|
</a>
|
|||
|
|
|||
|
<a rel="next prefetch" href="appendix-00.html" class="nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
|
|||
|
<i class="fa fa-angle-right"></i>
|
|||
|
</a>
|
|||
|
</nav>
|
|||
|
|
|||
|
</div>
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
<script>
|
|||
|
window.playground_copyable = true;
|
|||
|
</script>
|
|||
|
|
|||
|
|
|||
|
<script src="elasticlunr.min.js"></script>
|
|||
|
<script src="mark.min.js"></script>
|
|||
|
<script src="searcher.js"></script>
|
|||
|
|
|||
|
<script src="clipboard.min.js"></script>
|
|||
|
<script src="highlight.js"></script>
|
|||
|
<script src="book.js"></script>
|
|||
|
|
|||
|
<!-- Custom JS scripts -->
|
|||
|
<script src="ferris.js"></script>
|
|||
|
|
|||
|
|
|||
|
</div>
|
|||
|
</body>
|
|||
|
</html>
|