Язык программирования Rust
Скачать 7.02 Mb.
|
Структура кода, если мы могли бы создавать поток для каждого запроса Сначала давайте рассмотрим, как мог бы выглядеть код, если он создавал бы новый поток для каждого соединения. Как упоминалось ранее, это не окончательный план, а это отправная точка из-за проблем с возможным порождением неограниченного количества потоков. В листинге 20-11 показаны изменения, которые нужно внести в main , чтобы запускать новый поток для обработки каждого входящего потока соединения в цикле for Файл: src/main.rs Листинг 20-11: Порождение нового потока для каждого потока соединения Как вы изучили в главе 16, thread::spawn создаст новый поток и затем запустит код замыкания в этом новом потоке. Если вы запустите этот код и загрузите /sleep в своём браузере, в затем загрузите / в двух других вкладках браузера, вы действительно увидите, fn main () { let listener = TcpListener::bind( "127.0.0.1:7878" ).unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } что запросы к / не должны ждать завершения /sleep. Но, как мы уже упоминали, это в конечном счёте перегрузит систему, потому что вы будете создавать новые потоки без каких-либо ограничений. Создание аналогичного интерфейса для конечного числа потоков Мы хотим, чтобы наш пул потоков работал аналогичным, знакомым образом, чтобы переключение с потоков на пул потоков не требовало больших изменений в коде использующем наш API. В листинге 20-12 показан гипотетический интерфейс для структуры ThreadPool , который мы хотим использовать вместо thread::spawn Файл: src/main.rs Листинг 20-12: Наш идеальный интерфейс ThreadPool Мы используем ThreadPool::new , чтобы создать новый пул потоков с конфигурируемым количеством потоков, в данном случае четыре. Затем в цикле for выполняем pool.execute имеющий интерфейс, аналогичный интерфейсу thread::spawn , в котором выполняется замыкание, которое пул должен выполнить для каждого потока соединения. Нам нужно реализовать pool.execute , чтобы он принимал замыкание и передавал его потоку из пула для выполнения. Этот код не компилируется, но мы постараемся, чтобы компилятор в его исправлении. Создание структуры ThreadPool использованием разработки, управляемой компилятором Внесите изменения листинга 20-12 в файл src/main.rs, а затем давайте воспользуемся ошибками компилятора из команды cargo check для управления нашей разработкой. Вот первая ошибка, которую мы получаем: fn main () { let listener = TcpListener::bind( "127.0.0.1:7878" ).unwrap(); let pool = ThreadPool::new( 4 ); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } } Замечательно! Ошибка говорит о том, что нам нужен тип или модуль ThreadPool , поэтому мы создадим его сейчас. Наша реализация ThreadPool будет зависеть от того, какую работу выполняет наш веб-сервер. Итак, давайте переделаем крейт hello из бинарного в библиотечный для хранения реализации ThreadPool . После того, как поменяем в библиотечный крейт, мы также сможем использовать отдельную библиотеку пула потоков для любой работы, которую мы хотим выполнить с его использованием, а не только для обслуживания веб-запросов. Создайте файл src/lib.rs, который содержит следующее, что является простейшим определением структуры ThreadPool , которую мы можем иметь в данный момент: Файл: src/lib.rs Затем создайте новый каталог src/bin и переместите двоичный крейт с корнем в src/main.rs в src/bin/main.rs. Это сделает библиотечный крейт основным крейтом в каталоге hello; мы все ещё можем запустить двоичный файл из src/bin/main.rs, используя cargo run . Переместив файл main.rs, отредактируйте его, чтобы подключить крейт библиотеки и добавить тип ThreadPool в область видимости, добавив следующий код в начало src/bin/main.rs: Файл: src/bin/main.rs Этот код по-прежнему не будет работать, но давайте проверим его ещё раз, чтобы получить следующую ошибку, которую нам нужно устранить: $ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0433]: failed to resolve: use of undeclared type `ThreadPool` --> src/main.rs:11:16 | 11 | let pool = ThreadPool::new(4); | ^^^^^^^^^^ use of undeclared type `ThreadPool` For more information about this error, try `rustc --explain E0433`. error: could not compile `hello` due to previous error pub struct ThreadPool ; {{#rustdoc_include ../listings/ch20-web-server/no-listing- 01 -define-threadpool- struct / src /bin/main.rs:here}} Эта ошибка указывает, что далее нам нужно создать ассоциированную функцию с именем new для ThreadPool . Мы также знаем, что new должен иметь один параметр, который может принимать 4 в качестве аргумента и должен возвращать экземпляр ThreadPool . Давайте реализуем простейшую функцию new , которая будет иметь эти характеристики: Файл: src/lib.rs Мы выбираем usize в качестве типа параметра size , потому что мы знаем, что отрицательное число потоков не имеет никакого смысла. Мы также знаем, что мы будем использовать число 4 в качестве количества элементов в коллекции потоков, для чего предназначен тип usize , как обсуждалось в разделе "Целочисленные типы" главы 3. Давайте проверим код ещё раз: Теперь мы получаем предупреждение и ошибку. Игнорируем предупреждение не надолго, ошибка происходит потому что у нас нет метода execute в структуре ThreadPool . Вспомните раздел "Создание подобного интерфейса для конечного числа $ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope --> src/main.rs:12:28 | 12 | let pool = ThreadPool::new(4); | ^^^ function or associated item not found in `ThreadPool` For more information about this error, try `rustc --explain E0599`. error: could not compile `hello` due to previous error pub struct ThreadPool ; impl ThreadPool { pub fn new (size: usize ) -> ThreadPool { ThreadPool } } $ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope --> src/main.rs:17:14 | 17 | pool.execute(|| { | ^^^^^^^ method not found in `ThreadPool` For more information about this error, try `rustc --explain E0599`. error: could not compile `hello` due to previous error потоков" , в котором мы решили, что наш пул потоков должен иметь интерфейс, похожий на thread::spawn . Кроме того, мы реализуем функцию execute , чтобы она принимала замыкание и передавала его свободному потоку из пула для запуска. Мы определим метод execute у ThreadPool для приёма замыкания в качестве параметра. Вспомните раздел "Хранение замыканий с использованием общих параметров и типажей Fn " главы 13 и о том, что мы можем принимать замыкания в качестве параметров с тремя различными типажами: Fn , FnMut и FnOnce . Нам нужно решить, какой тип замыкания использовать здесь. Мы знаем, что в конечном счёте мы сделаем что-то похожее на реализацию стандартной библиотеки thread::spawn , поэтому мы можем посмотреть, какие ограничения накладывает на его параметр в сигнатуре thread::spawn . Документация показывает следующее: Параметр типа F - это тот, который нас интересует; параметр типа T относится к возвращаемому значению и нам он не интересен. Можно увидеть, что spawn использует FnOnce в качестве ограничения типажа у F . Это, вероятно то, чего мы хотим, потому что мы в конечном итоге передадим получаемый аргумент в execute для spawn . Мы также можем быть ещё более уверены, что FnOnce - это тот типаж, который мы хотим использовать, поскольку поток для выполнения запроса будет выполнять этот запрос только один раз, что соответствует параметру Once в типаже FnOnce Параметр типа F также имеет ограничение типажа Send и ограничение времени жизни 'static , которые полезны в нашей ситуации: нам нужен Send для передачи замыкания из одного потока в другой и 'static , потому что мы не знаем, сколько времени займёт выполнение потока. Давайте создадим метод execute для ThreadPool , который будет принимать обобщённый параметр типа F со следующими ограничениями: Файл: src/lib.rs Мы по-прежнему используем () после FnOnce потому что типаж FnOnce представляет замыкание, которое не принимает параметров и возвращает единичный тип () . Также как при определении функций, тип возвращаемого значения может быть опущен в сигнатуре, но даже если у нас нет параметров, нам все равно нужны скобки. pub fn spawn F: FnOnce () -> T, F: Send + 'static , T: Send + 'static , impl ThreadPool { // --snip-- pub fn execute self , f: F) where F: FnOnce () + Send + 'static , { } } Опять же, это самая простая реализация метода execute : она ничего не делает, мы только пытаемся сделать код компилируемым. Давайте проверим снова: Сейчас мы получаем только предупреждения, что означает, что код компилируется! Но обратите внимание, если вы попробуете cargo run и сделаете запрос в браузере, вы увидите ошибки в браузере, которые мы видели в начале главы. Наша библиотека на самом деле ещё не вызывает замыкание, переданное в execute ! Примечание: вы возможно слышали высказывание о языках со строгими компиляторами, таких как Haskell и Rust, которое звучит так: «Если код компилируется, то он работает». Но это высказывание не всегда верно. Наш проект компилируется, но абсолютно ничего не делает! Если бы мы создавали реальный, законченный проект, это был бы хороший момент начать писать модульные тесты, чтобы проверять, что код компилируется и имеет желаемое поведение. Проверка количества потоков в new Мы продолжим получать предупреждения, потому что мы ничего не делаем с параметрами для new и execute . Давайте реализуем тела этих функций в соответствии с желаемым поведением. Для начала давайте подумаем о new . Ранее мы выбирали без знаковый тип для параметра size , потому что пул с отрицательным числом потоков не имеет смысла. Тем не менее, пул с нулевым значением для потоков также не имеет смысла, но ноль является совершенно корректным для типа usize . Мы добавим код, чтобы проверить, что size больше нуля, перед возвращением экземпляра ThreadPool и будем паниковать, если программа получит ноль, используя макрос assert! , как показано в листинге 20-13. Файл: src/lib.rs $ cargo check Checking hello v0.1.0 (file:///projects/hello) Finished dev [unoptimized + debuginfo] target(s) in 0.24s Листинг 20-13: Реализация ThreadPool::new с паникой, если size равен нулю Мы добавили документации в ThreadPool с помощью комментариев. Обратите внимание, мы следовали хорошим практикам документирования, добавив раздел, в котором указывается ситуация при которой функция может паниковать как обсуждалось в главе 14. Попробуйте запустить cargo doc --open и кликнуть структуру ThreadPool , чтобы увидеть как выглядит сгенерированная документация для new ! Вместо добавления макроса assert! , как мы здесь сделали, мы могли бы описать у new возвращать Result как мы делали в Config::new проекта ввода/вывода в коде 12-9. Но сейчас мы решили, что попытка создания пула потоков без любого указания количества потоков должно быть не восстанавливаемой ошибкой. Если вы чувствуете себя честолюбивым, попробуйте написать версию new со следующей сигнатурой, чтобы сравнить обе версии: Создание места для хранения потоков Теперь у нас есть способ узнать, что задано допустимое число потоков для хранения в пуле и мы можем создать эти потоки и сохранить их в структуре ThreadPool перед её возвратом. Но как мы "храним" поток? Давайте ещё раз посмотрим на сигнатуру thread::spawn : Функция spawn возвращает тип JoinHandle , где T является типом, который возвращает замыкание. Давайте попробуем использовать JoinHandle и посмотрим, что impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new (size: usize ) -> ThreadPool { assert! (size > 0 ); ThreadPool } // --snip-- } pub fn new (size: usize ) -> Result F: FnOnce () -> T, F: Send + 'static , T: Send + 'static , произойдёт. В нашем случае замыкания, которые мы передаём пулу потоков, будут обрабатывать соединение и ничего не будут возвращать, поэтому T будет единичным (unit) типом () Листинг 20-14 скомпилируется, но пока не создаёт потоков. Мы изменили объявление ThreadPool , чтобы оно содержало вектор экземпляров thread::JoinHandle<()> , инициализировали вектор с размером size , установили цикл for , который будет запускать некоторый код для создания потоков и вернули экземпляр ThreadPool содержащий потоки. Файл: src/lib.rs Листинг 20-14: Создание вектора в ThreadPool для хранения потоков Мы добавили std::thread в область видимости библиотечного крейта, потому что мы используем thread::JoinHandle в качестве типа элементов вектора в ThreadPool После получения корректного значения size, наш ThreadPool создаёт новый вектор, который может содержать size элементов. В этой книге мы ещё не использовали функцию with_capacity , которая выполняет ту же задачу что и Vec::new , но с важным отличием: она заранее выделяет указанную память в векторе. Поскольку мы знаем, что нам нужно хранить size элементов в векторе, выполнение этого выделения немного более эффективно, чем использование Vec::new , который изменяет размеры при вставке элементов. Когда вы снова запустите cargo check , вы получите ещё несколько предупреждений, но все должно завершится успехом. use std::thread; pub struct ThreadPool { threads: Vec } impl ThreadPool { // --snip-- pub fn new (size: usize ) -> ThreadPool { assert! (size > 0 ); let mut threads = Vec ::with_capacity(size); for _ in 0 ..size { // create some threads and store them in the vector } ThreadPool { threads } } // --snip-- } Структура Worker ответственная за отправку кода из ThreadPool в поток Мы оставили комментарий относительно создания потоков в цикле for кода 20-14. Здесь мы рассмотрим, как мы на самом деле создаём потоки. Стандартная библиотека предоставляет thread::spawn как способ создания потоков, а thread::spawn ожидает получить некоторый код, который поток должен запустить как только поток создан. Однако в нашем случае мы хотим создать потоки и заставить их ждать код, который мы отправим им позже. Реализация потоков в стандартной библиотеке не имеет какого то способа это сделать, мы должны реализовать это вручную. Мы будем реализовывать это поведение с помощью новой структуры данных между ThreadPool и потоками, которая будет управлять этим новым поведением. Мы назовём эту структуру данных Worker , что является общим термином в реализации пулов. Подумайте о людях, работающих на кухне в ресторане: рабочие ждут пока не поступят заказы от клиентов, а затем они несут ответственность за принятие этих заказов и их выполнение. Вместо хранения вектора JoinHandle<()> в пуле потоков, мы будем сохранять экземпляры структуры Worker . Каждый Worker будет хранить один экземпляр JoinHandle<()> . Затем мы реализуем метод у Worker , который берет код замыкания для запуска и отправляет его в уже запущенный поток для выполнения. Мы также назначим каждому работнику id , чтобы мы могли различать разных работников в пуле при ведении журнала или отладке. Давайте внесём изменения в последовательность действий, которая выполняется при создании ThreadPool . Мы реализуем код, который отправляет замыкание в поток после того, как мы настроили Worker следующим образом: 1. Определим структуру Worker (работник), которая содержит id и JoinHandle<()> 2. Изменим ThreadPool , чтобы он содержал вектор экземпляров Worker 3. Определим функцию Worker::new , которая принимает номер id и возвращает экземпляр Worker , который содержит id и поток, порождённый пустым замыканием. 4. В ThreadPool::new используем счётчик цикла for для генерации id , создаём новый Worker с этим id и сохраняем экземпляр "работника" в вектор. Если вы готовы принять вызов, попробуйте реализовать эти изменения самостоятельно, прежде чем смотреть код листинге 20-15. Готовы? Вот листинг 20-15 с одним из способов сделать предыдущие модификации. Файл: src/lib.rs Листинг 20-15: Изменение ThreadPool для хранения экземпляров Worker вместо непосредственного хранения потоков Мы изменили имя поля в ThreadPool с threads на workers , потому что теперь оно содержит экземпляры Worker вместо экземпляров JoinHandle<()> . Мы используем счётчик в цикле for в качестве аргумента для Worker::new и сохраняем каждый новый Worker в векторе с именем workers Внешний код (вроде нашего сервера в src/bin/main.rs) не должен знать подробности реализации касательно использования структуры Worker внутри ThreadPool , поэтому мы делаем структуру Worker и её новую функцию new приватными. Функция Worker::new использует заданный нами id и сохраняет экземпляр JoinHandle<()> , который создаётся путём порождение нового потока с пустым замыканием. Этот код скомпилируется и будет хранить количество экземпляров Worker , которое мы указали в качестве аргумента функции ThreadPool::new . Но мы все ещё не обрабатываем use std::thread; pub struct ThreadPool { workers: Vec } impl ThreadPool { // --snip-- pub fn new (size: usize ) -> ThreadPool { assert! (size > 0 ); let mut workers = Vec ::with_capacity(size); for id in 0 ..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --snip-- } struct Worker { id: usize , thread: thread::JoinHandle<()>, } impl Worker { fn new (id: usize ) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } } |