Язык программирования Rust
Скачать 7.02 Mb.
|
Отправка запросов в потоки через каналы Теперь мы рассмотрим проблему, заключающуюся в том, что замыкания переданные в thread::spawn абсолютно ничего не делают. Вот мы получаем замыкание, которое хотим выполнить в методе execute . Но для запуска нам необходимо передать замыкание в метод thread::spawn , где на каждый ThreadPool создаётся один Worker Мы хотим, чтобы только что созданные структуры Worker извлекали код для запуска из очереди хранящейся в ThreadPool и отправляли этот код в свой поток для выполнения. В главе 16 вы узнали о каналах (channels) - простом способе связи между двумя потоками, который идеально подойдёт для этого сценария. Мы будем использовать канал в качестве очереди заданий, а команда execute отправит задание из ThreadPool экземплярам Worker , который отправит задание в свой поток. Вот план: 1. ThreadPool создаст канал и будет удерживать его передающую сторону. 2. Каждый Worker будет удерживать принимающую сторону канала. 3. Мы создадим новую структуру Job которая будет содержать замыкания, которые мы хотим отправить в канал. 4. Метод execute отправит задание, которое он хочет выполнить, в отправляющую сторону канала. 5. В своём потоке Worker будет выполнять цикл с принимающей стороной канала и выполнит замыкание любого получаемого задания. Давайте начнём с создания канала в ThreadPool::new и удержания отправляющей стороны в экземпляре ThreadPool , как показано в листинге 20-16. В структуре Job сейчас ничего не содержится, но это будет тип элемента который мы отправляем в канал. Файл: src/lib.rs Листинг 20-16: Модификация ThreadPool для хранения отправляющей части канала, который отправляет экземпляры Job В ThreadPool::new мы создаём наш новый канал и пул содержащий отправляющую сторону. Код успешно скомпилируется все ещё с предупреждениями. Давайте попробуем передавать принимающую сторону канала каждому "работнику" (структуре woker), когда пул потоков создаёт канал. Мы знаем, что хотим использовать получающую часть канала в потоке порождаемым "работником", поэтому мы будем ссылаться на параметр receiver в замыкании. Код 20-17 пока не компилируется. Файл: src/lib.rs use std::{sync::mpsc, thread}; pub struct ThreadPool { workers: Vec } struct Job ; impl ThreadPool { // --snip-- pub fn new (size: usize ) -> ThreadPool { assert! (size > 0 ); let (sender, receiver) = mpsc::channel(); let mut workers = Vec ::with_capacity(size); for id in 0 ..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender } } // --snip-- } Листинг 20-17: Передача принимающей части канала "работнику" Мы внесли несколько небольших и простых изменений: мы передаём принимающую часть канала в Worker::new , а затем используем его внутри замыкания. При попытке проверить код, мы получаем ошибку: Код пытается передать receiver в несколько экземпляров Worker . Это не будет работать, как вы помните из главы 16: реализация канала предоставляемая Rust, impl ThreadPool { // --snip-- pub fn new (size: usize ) -> ThreadPool { assert! (size > 0 ); let (sender, receiver) = mpsc::channel(); let mut workers = Vec ::with_capacity(size); for id in 0 ..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new (id: usize , receiver: mpsc::Receiver }); Worker { id, thread } } } $ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0382]: use of moved value: `receiver` --> src/lib.rs:26:42 | 21 | let (sender, receiver) = mpsc::channel(); | -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver 26 | workers.push(Worker::new(id, receiver)); | ^^^^^^^^ value moved here, in previous iteration of loop For more information about this error, try `rustc --explain E0382`. error: could not compile `hello` due to previous error является моделью несколько производителей (multiple producer), один потребитель (single consumer). Это означает, что мы не можем просто клонировать принимающую часть канала для исправления этого кода. Даже если бы мы это могли, это не техника которую мы хотели бы использовать; вместо этого мы хотим распределить задачи среди потоков, разделяя один receiver среди всех "работников". Кроме того, удаление задачи из очереди канала включает изменение receiver , поэтому потокам необходим безопасный способ делиться и изменять receiver , в противном случае мы можем получить условия гонки (как описано в главе 16). Вспомните умные указатели, которые обсуждались в главе 16: чтобы делиться владением между несколькими потоками и позволить потокам изменять значение, нам нужно использовать тип Arc . Тип Arc позволит нескольким "работникам" владеть получателем (receiver), а Mutex гарантирует что только один "работник" получит задание (job) от получателя в один момент времени. Листинг 20-18 показывает изменения, которые мы должны сделать. Файл: src/lib.rs use std::{ sync::{mpsc, Arc, Mutex}, thread, }; // --snip-- impl ThreadPool { // --snip-- pub fn new (size: usize ) -> ThreadPool { assert! (size > 0 ); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec ::with_capacity(size); for id in 0 ..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new (id: usize , receiver: Arc // --snip-- } } Листинг 20-18: Совместное использование принимающей стороны канала среди "работников" используя Arc и Mutex В ThreadPool::new мы помещаем принимающую сторону канала внутрь Arc и Mutex Для каждого нового "работника" мы клонируем Arc , чтобы увеличить счётчик ссылок так, что "работники" могут разделять владение принимающей стороны канала. С этими изменениями код компилируется! Мы подбираемся к цели! Реализация метода execute Давайте реализуем метод execute у структуры ThreadPool . Мы также изменим тип Job со структуры на псевдоним типа для типаж-объекта, который содержит тип замыкания принимаемый методом execute . Как описано в разделе "Создание синонимов типа с помощью псевдонимов типа" главы 19, псевдонимы типов позволяют делать длинные типы короче. Посмотрите в листинг 20-19. Файл: src/lib.rs Листинг 20-19: Создание псевдонима типа Job для Box , содержащего каждое замыкание и затем отправляющее задание (job) в канал После создания нового экземпляра Job с помощью замыкания, получаемого в метод execute , мы отправляем это задание в отправляющую часть канала. Мы вызываем unwrap для send в случае неудачной отправки. Это может произойти, если например, мы остановим выполнение всех наших потоков, что означает, что принимающая сторона прекратила получение новых сообщений. На данный момент мы не можем остановить выполнение наших потоков: наши потоки продолжают выполняться, пока существует пул. Причина, по которой мы используем unwrap , заключается в том, что мы знаем, что сбоя не произойдёт, но компилятор этого не знает. // --snip-- type Job = Box < dyn FnOnce () + Send + 'static >; impl ThreadPool { // --snip-- pub fn execute self , f: F) where F: FnOnce () + Send + 'static , { let job = Box ::new(f); self .sender.send(job).unwrap(); } } // --snip-- Но мы ещё не закончили! В "работнике" (worker) наше замыкание, переданное в thread::spawn все ещё ссылается только на принимающую сторону канала. Вместо этого нам нужно, чтобы замыкание работало в бесконечном цикле, запрашивая задание у принимающей части канала и выполняя задание, когда оно принято. Давайте внесём изменения, показанные в листинге 20-20 внутри Worker::new Файл: src/lib.rs Листинг 20-20: Получение и выполнение заданий в потоке "работника" Здесь мы сначала вызываем lock у receiver , чтобы получить мьютекс, а затем вызываем unwrap для паники при любых ошибках. Захват блокировки может завершиться неудачей, если мьютекс находится в отравленном state (poisoned state), что может произойти если какой-то другой поток запаниковал, удерживая блокировку, вместо снятия блокировки. В этой ситуации правильное действие - вызвать unwrap для паники потока. Не стесняйтесь заменить unwrap на expect с сообщением об ошибке, которое имеет для вас значение. Если мы получим блокировку мьютекса, мы вызываем recv для получения Job из канала. Окончательный вызов unwrap проходит мимо любых ошибок, которые могут произойти, если поток удерживающий отправляющую сторону канала, завершил работу подобно тому, как метод send возвращает Err , если принимающая сторона закрывается. Вызов recv блокирующий, поэтому если ещё нет задач (job), то текущий поток будет ждать, пока задача не станет доступной. Mutex гарантирует, что только один поток Worker пытается запросить задачу за раз. Наш пул потоков теперь находится в рабочем состоянии! Выполните cargo run и сделайте несколько запросов: // --snip-- impl Worker { fn new (id: usize , receiver: Arc move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println! ( "Worker {id} got a job; executing." ); job(); }); Worker { id, thread } } } Успех! Теперь у нас есть пул потоков, который обрабатывает соединения асинхронно. Никогда не создаётся более четырёх потоков, поэтому наша система не будет перегружена, если сервер получает много запросов. Если мы отправим запрос ресурса /sleep, сервер сможет обслуживать другие запросы, запустив их в другом потоке. Примечание: если вы запрашиваете /sleep в нескольких окнах браузера одновременно, они могут загружаться по одному с интервалами в 5 секунд. Некоторые веб-браузеры выполняют несколько экземпляров одного и того же запроса последовательно из-за кэширования. Данное ограничение не вызвано нашим веб-сервером. После изучения цикла while let в главе 18 вы можете удивиться, почему мы не написали код рабочего потока (worker thread), как показано в листинге 20-22. $ cargo run Compiling hello v0.1.0 (file:///projects/hello) warning: field is never read: `workers` --> src/lib.rs:7:5 | 7 | workers: Vec | ^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(dead_code)]` on by default warning: field is never read: `id` --> src/lib.rs:48:5 | 48 | id: usize, | ^^^^^^^^^ warning: field is never read: `thread` --> src/lib.rs:49:5 | 49 | thread: thread::JoinHandle<()>, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ warning: 3 warnings emitted Finished dev [unoptimized + debuginfo] target(s) in 1.40s Running `target/debug/main` Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Файл: src/lib.rs Листинг 20-22: Альтернативная реализация Worker::new с использованием while let Этот код компилируется и запускается, но не приводит к желаемому поведению : медленный запрос всё равно приведёт к тому, что другие запросы будут ждать обработки. Причина здесь несколько тоньше: Mutex не имеет общедоступного unlock потому что право собственности на блокировку зависит от времени жизни MutexGuard в LockResult которое возвращает метод lock . Во время компиляции анализатор заимствований может затем применить правило, согласно которому к ресурсу, охраняемому Mutex нельзя получить доступ, если мы не удерживаем блокировку. Но эта реализация также может привести к тому, что блокировка будет удерживаться дольше, чем предполагалось, если мы не будем тщательно продумывать время жизни MutexGuard Код в листинге 20-20, который использует let job = receiver.lock().unwrap().recv().unwrap(); работает, потому что с let любые временные значения, используемые в выражении справа от знака равенства, немедленно удаляются после завершения оператора let . Однако while let (и if let and match ) не удаляет временные значения до конца связанного блока. В листинге 20-21 блокировка сохраняется на время вызова job() , что означает, что другие исполнители не могут получать задания. // --snip-- impl Worker { fn new (id: usize , receiver: Arc move || { while let Ok (job) = receiver.lock().unwrap().recv() { println! ( "Worker {id} got a job; executing." ); job(); } }); Worker { id, thread } } } Изящное завершение и освобождение ресурсов Листинг 20-20 асинхронно отвечает на запросы с помощью использования пула потоков, как мы и хотели. Мы получаем некоторые предупреждения про workers , id и поля thread , которые мы не используем напрямую, что напоминает нам о том, что мы не освобождаем все ресурсы. Когда мы используем менее элегантный метод остановки основного потока клавишной комбинацией ctrl-c, все остальные потоки также немедленно останавливаются, даже если они находятся в середине обработки запроса. Теперь мы будем реализовывать типаж Drop для вызова join у каждого потока в пуле, чтобы они могли завершить запросы над которыми они работают до закрытия. Затем мы реализуем способ сообщить потокам, что они должны перестать принимать новые запросы и завершить работу. Чтобы увидеть этот код в действии, мы изменим наш сервер так, чтобы он принимал только два запроса, прежде чем корректно завершить работу его пула потоков. Реализация типажа Drop для ThreadPool Давайте начнём с реализации Drop у нашего пула потоков. Когда пул удаляется, все наши потоки должны объединиться (join), чтобы убедиться, что они завершают свою работу. В листинге 20-22 показана первая попытка реализации Drop , код пока не будет работать. Файл: src/lib.rs Листинг 20-22: Присоединение (Joining) каждого потока, когда пул потоков выходит из области видимости Во-первых, мы проходим циклом по каждому workers из пула потоков. Для этого мы используем &mut , потому что self является изменяемой ссылкой и нам также нужно иметь возможность изменять экземпляр worker . Для каждого "работника" мы печатаем сообщение о том, что этот конкретный "работник" завершается, затем вызываем join у потока этого "работника". Если вызов join происходит с ошибкой, мы используем unwrap , чтобы вызвать панику в Rust и завершить не совсем красиво. Ошибка получаемая при компиляции этого кода: impl Drop for ThreadPool { fn drop (& mut self ) { for worker in & mut self .workers { println! ( "Shutting down worker {}" , worker.id); worker.thread.join().unwrap(); } } } Ошибка говорит, что мы не можем вызвать join , потому что у нас есть только изменяемое заимствование каждого worker и что join забирает во владение его аргумент. Чтобы решить эту проблему, нужно переместить поток из экземпляра Worker , который владеет thread , чтобы join мог использовать внутренний поток. Мы сделали это в коде 17-15: если вместо этого Worker содержит тип Option , мы можем вызвать метод take у Option , чтобы переместить значение из варианта Some и оставить вариант None на его месте. Другими словами, работающий Worker будет содержать вариант Some внутри thread , и когда мы хотим очистить Worker , мы заменяем значение варианта Some на вариант None , чтобы у Worker не было потока для запуска. Итак, мы хотим обновить объявление Worker следующим образом: Файл: src/lib.rs Теперь давайте опираться на компилятор, чтобы найти другие места, которые нужно изменить. Проверяя код, мы получаем две ошибки: $ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference --> src/lib.rs:52:13 | 52 | worker.thread.join().unwrap(); | ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call | | | move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait | note: this function takes ownership of the receiver `self`, which moves `worker.thread` For more information about this error, try `rustc --explain E0507`. error: could not compile `hello` due to previous error struct Worker { id: usize , thread: Option } Давайте обратимся ко второй ошибке, которая указывает на код в конце Worker::new ; нам нужно обернуть значение thread в вариант Some при создании нового Worker Внесите следующие изменения, чтобы исправить эту ошибку: Файл: src/lib.rs Первая ошибка находится в нашей реализации Drop . Ранее мы упоминали, что намеревались вызвать take для параметра Option , чтобы забрать thread из процесса worker . Следующие изменения делают это: Файл: src/lib.rs $ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no method named `join` found for enum `Option` in the current scope --> src/lib.rs:52:27 | 52 | worker.thread.join().unwrap(); | ^^^^ method not found in `Option --> src/lib.rs:72:22 | 72 | Worker { id, thread } | ^^^^^^ expected enum `Option`, found struct `JoinHandle` | = note: expected enum `Option | 72 | Worker { id, thread: Some(thread) } | +++++++++++++ + Some errors have detailed explanations: E0308, E0599. For more information about an error, try `rustc --explain E0308`. error: could not compile `hello` due to 2 previous errors impl Worker { fn new (id: usize , receiver: Arc // --snip-- Worker { id, thread: Some (thread), } } } Как уже говорилось в главе 17, метод take у типа Option забирает значение из варианта Some и оставляет вариант None в этом месте. Мы используем if let , чтобы деструктурировать Some и получить поток; затем вызываем join у потока. Если поток "работника" уже None , мы знаем, что этот "работник" уже очистил свой поток, поэтому в этом случае ничего не происходит. |