Язык программирования Rust
Скачать 7.02 Mb.
|
Отправка нескольких значений и ожидание получателем Код в листинге 16-8 компилируется и выполняется, но в нем не ясно показано то, что два отдельных потока общаются друг с другом через канал. В листинге 16-10 мы внесли некоторые изменения, которые докажут, что код в листинге 16-8 работает одновременно: порождённый поток теперь будет отправлять несколько сообщений и делать паузу на секунду между каждым сообщением. Файл: src/main.rs $ cargo run Compiling message-passing v0.1.0 (file:///projects/message-passing) error[E0382]: borrow of moved value: `val` --> src/main.rs:10:31 | 8 | let val = String::from("hi"); | --- move occurs because `val` has type `String`, which does not implement the `Copy` trait 9 | tx.send(val).unwrap(); | --- value moved here 10 | println!("val is {}", val); | ^^^ value borrowed here after move | = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info) For more information about this error, try `rustc --explain E0382`. error: could not compile `message-passing` due to previous error Листинг 16-10: Отправка нескольких сообщений и пауза между ними На этот раз порождённый поток имеет вектор строк, которые мы хотим отправить основному потоку. Мы перебираем их, отправляя каждую строку по отдельности и делаем паузу между ними, вызывая функцию thread::sleep со значением Duration равным 1 секунде. В основном потоке мы больше не вызываем функцию recv явно: вместо этого мы используем rx как итератор. Для каждого полученного значения мы печатаем его. Когда канал будет закрыт, итерация закончится. При выполнении кода в листинге 16-10 вы должны увидеть следующий вывод с паузой в 1 секунду между каждой строкой: Поскольку у нас нет кода, который приостанавливает или задерживает цикл for в основном потоке, мы можем сказать, что основной поток ожидает получения значений из порождённого потока. Создание нескольких отправителей путём клонирования передатчика use std::sync::mpsc; use std::thread; use std::time::Duration; fn main () { let (tx, rx) = mpsc::channel(); thread::spawn( move || { let vals = vec! [ String ::from( "hi" ), String ::from( "from" ), String ::from( "the" ), String ::from( "thread" ), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs( 1 )); } }); for received in rx { println! ( "Got: {}" , received); } } Got: hi Got: from Got: the Got: thread Ранее мы упоминали, что mpsc — это аббревиатура от множественного производителя, одного потребителя . Давайте используем mpsc в полной мере и расширим код в листинге 16.10, создав несколько потоков, которые отправляют значения одному и тому же получателю. Мы можем сделать это, клонировав передатчик, как показано в листинге 16.11: Файл: src/main.rs Листинг 16-11: Отправка нескольких сообщений от нескольких производителей На этот раз, прежде чем мы создадим первый порождённый поток, мы вызовем функцию clone на передатчике. В результате мы получим новый передатчик, который мы сможем передать первому порождённому потоку. Исходный передатчик мы передадим второму порождённому потоку. Это даст нам два потока, каждый из которых отправляет разные сообщения одному получателю. // --snip-- let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn( move || { let vals = vec! [ String ::from( "hi" ), String ::from( "from" ), String ::from( "the" ), String ::from( "thread" ), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs( 1 )); } }); thread::spawn( move || { let vals = vec! [ String ::from( "more" ), String ::from( "messages" ), String ::from( "for" ), String ::from( "you" ), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs( 1 )); } }); for received in rx { println! ( "Got: {}" , received); } // --snip-- Когда вы запустите код, вывод должен выглядеть примерно так: Вы можете увидеть значения в другом порядке, в зависимости от вашей системы. Именно такое поведение делает параллелизм как интересным, так и сложным. Если вы поэкспериментируете с thread::sleep , задавая различные значения аргумента в разных потоках, каждый запуск будет более недетерминированным и каждый раз будут выводиться разные данные. Теперь, когда мы посмотрели, как работают каналы, давайте рассмотрим другой метод многопоточности. Got: hi Got: more Got: from Got: messages Got: for Got: the Got: thread Got: you Многопоточное разделяемое состояние Передача сообщений — прекрасный способ обработки параллелизма, но не единственный. Другим методом может быть доступ нескольких потоков к одним и тем же общим данным. Рассмотрим ещё раз часть слогана из документации по языку Go: «Не стоит передавать информацию с помощью разделяемой памяти». Как бы выглядело общение, используя разделяемую память? Кроме того, почему энтузиасты передачи сообщений предостерегают от его использования? В каком-то смысле каналы в любом языке программирования похожи на единоличное владение, потому что после передачи значения по каналу вам больше не следует использовать отправленное значение. Многопоточная, совместно используемая память подобна множественному владению: несколько потоков могут одновременно обращаться к одной и той же области памяти. Как вы видели в главе 15, где умные указатели сделали возможным множественное владение, множественное владение может добавить сложность, потому что нужно управлять этими разными владельцами. Система типов Rust и правила владения очень помогают в их правильном управлении. Для примера давайте рассмотрим мьютексы, один из наиболее распространённых многопоточных примитивов для разделяемой памяти. Мьютексы предоставляют доступ к данным из одного потока (за раз) Mutex - это сокращение от взаимное исключение (mutual exclusion), так как мьютекс позволяет только одному потоку получать доступ к некоторым данным в любой момент времени. Для того, чтобы получить доступ к данным в мьютексе, поток должен сначала подать сигнал, что он хочет получить доступ запрашивая блокировку (lock) мьютекса. Блокировка - это структура данных, являющаяся частью мьютекса, которая отслеживает кто в настоящее время имеет эксклюзивный доступ к данным. Поэтому мьютекс описывается как объект защищающий данные, которые он хранит через систему блокировки. Мьютексы имеют репутацию трудных в использовании, потому что вы должны помнить два правила: Перед тем как попытаться получить доступ к данным необходимо получить блокировку. Когда вы закончили работу с данными, которые защищает мьютекс, вы должны разблокировать данные, чтобы другие потоки могли получить блокировку. Для понимания мьютекса, представьте пример из жизни как групповое обсуждение на конференции с одним микрофоном. Прежде чем участник дискуссии сможет говорить, он должен спросить или дать сигнал, что он хочет использовать микрофон. Когда он получает микрофон, то может говорить столько, сколько хочет, а затем передаёт микрофон следующему участнику, который попросит дать ему выступить. Если участник дискуссии забудет освободить микрофон, когда закончит с ним, то никто больше не сможет говорить. Если управление общим микрофоном идёт не правильно, то конференция не будет работать как было запланировано! Правильное управление мьютексами может быть невероятно сложным и именно поэтому многие люди с энтузиазмом относятся к каналам. Однако, благодаря системе типов и правилам владения в Rust, вы не можете использовать блокировку и разблокировку неправильным образом. Mutex API Давайте рассмотрим пример использования мьютекса в листинге 16-12 без использования нескольких потоков: Файл: src/main.rs Листинг 16-12: Изучение API Mutex для простоты в однопоточном контексте. Как и во многих типах, мы создаём Mutex используя ассоциированную функцию new Чтобы получить доступ к данным внутри мьютекса, мы используем метод lock для получения блокировки. Этот вызов блокирует текущий поток, поэтому он не может выполнять какую-либо другую работу, пока не наступит наша очередь получить блокировку. Вызов lock завершится неудачей, если запаникует другой поток, удерживающий блокировку. В этом случае никто никогда не сможет получить блокировку, поэтому мы решили вызвать unwrap и вызвать панику, если окажемся в такой ситуации. После того как мы получили блокировку, мы можем рассматривать возвращаемое значение, в данном случае с именем num , как изменяемую ссылку на данные внутри. Система типов гарантирует, что мы получим блокировку перед использованием значения из m . Переменная m имеет тип Mutex , а не просто i32 , поэтому мы должны вызвать функцию lock , чтобы иметь возможность использовать значение типа i32 . Мы не можем забыть об этом; без этого система типов не позволит нам получить доступ ко внутреннему i32 значению. use std::sync::Mutex; fn main () { let m = Mutex::new( 5 ); { let mut num = m.lock().unwrap(); *num = 6 ; } println! ( "m = {:?}" , m); } Как вы наверное подозреваете, Mutex является умным указателем. Точнее, вызов lock возвращает умный указатель, называемый MutexGuard , обёрнутый в LockResult , который мы обработали с помощью вызова unwrap . Умный указатель типа MutexGuard реализует типаж Deref для указания на внутренние данные; умный указатель также имеет реализацию типажа Drop , автоматически снимающего блокировку, когда MutexGuard выходит из области видимости, что происходит в конце внутренней области видимости. В результате у нас нет риска забыть снять блокировку и оставить мьютекс в заблокированном состоянии, препятствуя его использованию другими потоками (снятие блокировки происходит автоматически). После снятия блокировки можно напечатать значение мьютекса и увидеть, что мы смогли изменить внутреннее i32 на 6. Разделение Mutex Теперь давайте попробуем с помощью Mutex совместно использовать значение между несколькими потоками. Мы стартуем 10 потоков и каждый из них увеличивает значение счётчика на 1, поэтому счётчик изменяется от 0 до 10. Обратите внимание, что в следующих нескольких примерах будут ошибки компилятора и мы будем использовать эти ошибки, чтобы узнать больше об использовании типа Mutex и как Rust помогает нам правильно его использовать. Листинг 16-13 содержит наш начальный пример: Файл: src/main.rs Листинг 16-13. Десять потоков, увеличивающих счётчик, защищённый Mutex use std::sync::Mutex; use std::thread; fn main () { let counter = Mutex::new( 0 ); let mut handles = vec! []; for _ in 0 10 { let handle = thread::spawn( move || { let mut num = counter.lock().unwrap(); *num += 1 ; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println! ( "Result: {}" , *counter.lock().unwrap()); } Мы создаём переменную-счётчик counter для хранения i32 значения внутри Mutex , как мы это делали в листинге 16-12. Затем мы создаём 10 потоков, перебирая диапазон чисел. Мы используем thread::spawn и передаём всем этим потокам одинаковое замыкание, которое перемещает счётчик в поток, запрашивает блокировку на Mutex , вызывая метод lock , а затем добавляет 1 к значению в мьютексе. Когда поток завершит выполнение своего замыкания, num выйдет из области видимости и освободит блокировку, чтобы её мог получить другой поток. В основном потоке мы собираем все дескрипторы в переменную handles. Затем, как мы это делали в листинге 16-2, вызываем join для каждого дескриптора, чтобы убедиться в завершении всех потоков. В этот момент основной поток получит доступ к блокировке и тоже напечатает результат программы. Компилятор намекнул, что этот пример не компилируется. Давайте выясним почему! Сообщение об ошибке указывает, что значение counter было перемещёно в замыкание на предыдущей итерации цикла. Rust говорит нам, что мы не можем передать counter во владение нескольким потокам. Давайте исправим ошибку компилятора с помощью метода множественного владения, который мы обсуждали в главе 15. Множественное владение между множеством потоков В главе 15 мы давали значение нескольким владельцам, используя умный указатель Rc для создания значения подсчитанных ссылок. Давайте сделаем то же самое здесь и посмотрим, что произойдёт. Мы завернём Mutex в Rc в листинге 16-14 и клонируем Rc перед передачей владения в поток. Теперь, когда мы увидели ошибки, мы также вернёмся к использованию цикла for и сохраним ключевое слово move у замыкания. Файл: src/main.rs $ cargo run Compiling shared-state v0.1.0 (file:///projects/shared-state) error[E0382]: use of moved value: `counter` --> src/main.rs:9:36 | 5 | let counter = Mutex::new(0); | ------- move occurs because `counter` has type `Mutex 9 | let handle = thread::spawn(move || { | ^^^^^^^ value moved into closure here, in previous iteration of loop 10 | let mut num = counter.lock().unwrap(); | ------- use occurs due to use in closure For more information about this error, try `rustc --explain E0382`. error: could not compile `shared-state` due to previous error Листинг 16-14: Попытка использования Rc , чтобы позволить нескольким потокам владеть Mutex Ещё раз, мы компилируем и получаем ... другие ошибки! Компилятор учит нас. use std::rc::Rc; use std::sync::Mutex; use std::thread; fn main () { let counter = Rc::new(Mutex::new( 0 )); let mut handles = vec! []; for _ in 0 10 { let counter = Rc::clone(&counter); let handle = thread::spawn( move || { let mut num = counter.lock().unwrap(); *num += 1 ; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println! ( "Result: {}" , *counter.lock().unwrap()); } $ cargo run Compiling shared-state v0.1.0 (file:///projects/shared-state) error[E0277]: `Rc --> src/main.rs:11:22 | 11 | let handle = thread::spawn(move || { | ______________________^^^^^^^^^^^^^_- | | | | | `Rc 12 | | let mut num = counter.lock().unwrap(); 13 | | 14 | | *num += 1; 15 | | }); | |_________- within this `[closure@src/main.rs:11:36: 15:10]` | = help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not implemented for `Rc = note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10]` note: required by a bound in `spawn` For more information about this error, try `rustc --explain E0277`. error: could not compile `shared-state` due to previous error Вау, сообщение об ошибке очень многословное! Вот некоторые важные части, на которых нужно сосредоточить внимание: первая встроенная ошибка говорит о том, что ``std::rc::Rc cannot be sent between threads safely . Причиной этого является следующая важная часть сообщения об ошибке. Сообщение об ошибке говорит, the trait bound Send is not satisfied . Мы поговорим про типаж Send в следующем разделе: это один из типажей гарантирующих что типы, используемые потоками, предназначены для использования в многопоточных ситуациях. К сожалению, Rc небезопасен для совместного использования между потоками. Когда Rc управляет счётчиком ссылок, он добавляется значение к счётчику для каждого вызова clone и вычитается значение из счётчика, когда каждое клонированное значение удаляется при выходе из области видимости. Но он не использует примитивы многопоточности, чтобы гарантировать, что изменения в подсчёте не могут быть прерваны другим потоком. Это может привести к неправильным подсчётам - незначительным ошибкам, которые в свою очередь, могут привести к утечкам памяти или удалению значения до того, как мы отработали с ним. Нам нужен тип точно такой же как Rc , но который позволяет изменять счётчик ссылок безопасно из разных потоков. Атомарный счётчик ссылок Arc К счастью, Arc является типом аналогичным типу Rc , который безопасен для использования в ситуациях многопоточности. Буква А означает атомарное, что означает тип ссылка подсчитываемая атомарно. Atomics - это дополнительный вид примитивов для многопоточности, который мы не будем здесь подробно описывать: дополнительную информацию смотрите в документации стандартной библиотеки для std::sync::atomic На данный момент вам просто нужно знать, что atomics работают как примитивные типы, но безопасны для совместного использования между потоками. Вы можете спросить, почему все примитивные типы не являются атомарными и почему стандартные типы библиотек не реализованы для использования вместе с типом Arc по умолчанию. Причина в том, что безопасность потоков сопровождается снижением производительности, которое вы хотите платить только тогда, когда вам это действительно нужно. Если вы просто выполняете операции со значениями в одном потоке, то ваш код может работать быстрее, если он не должен обеспечивать гарантии предоставляемые atomics. Давайте вернёмся к нашему примеру: типы Arc и Rc имеют одинаковый API, поэтому мы исправляем нашу программу, заменяя тип в строках use , вызове new и вызове clone . Код в листинге 16-15, наконец скомпилируется и запустится: Файл: src/main.rs |