Язык программирования Rust
Скачать 7.02 Mb.
|
Листинг 16-2. Сохранение значения JoinHandle потока thread::spawn , гарантирующее, что поток выполнит всю необходимую работу, перед тем, как завершится Вызов join у дескриптора блокирует текущий поток, пока поток, представленный дескриптором не завершится. Блокировка потока означает, что потоку запрещено выполнять работу или выходить из него. Поскольку мы поместили вызов join после цикла for основного потока, выполнение листинга 16-2 должно привести к выводу, подобному следующему: use std::thread; use std::time::Duration; fn main () { let handle = thread::spawn(|| { for i in 1 10 { println! ( "hi number {} from the spawned thread!" , i); thread::sleep(Duration::from_millis( 1 )); } }); for i in 1 5 { println! ( "hi number {} from the main thread!" , i); thread::sleep(Duration::from_millis( 1 )); } handle.join().unwrap(); } Два потока продолжают чередоваться, но основной поток находится в ожидании из-за вызова handle.join() и не завершается до тех пор, пока не завершится запущенный поток. Но давайте посмотрим, что произойдёт, если мы вместо этого переместим handle.join() перед циклом for в main , например так: Файл: src/main.rs Основной поток будет ждать завершения порождённого потока, а затем запустит свой цикл for , поэтому выходные данные больше не будут чередоваться, как показано ниже: hi number 1 from the main thread! hi number 2 from the main thread! hi number 1 from the spawned thread! hi number 3 from the main thread! hi number 2 from the spawned thread! hi number 4 from the main thread! hi number 3 from the spawned thread! hi number 4 from the spawned thread! hi number 5 from the spawned thread! hi number 6 from the spawned thread! hi number 7 from the spawned thread! hi number 8 from the spawned thread! hi number 9 from the spawned thread! use std::thread; use std::time::Duration; fn main () { let handle = thread::spawn(|| { for i in 1 10 { println! ( "hi number {} from the spawned thread!" , i); thread::sleep(Duration::from_millis( 1 )); } }); handle.join().unwrap(); for i in 1 5 { println! ( "hi number {} from the main thread!" , i); thread::sleep(Duration::from_millis( 1 )); } } Небольшие детали, такие как место вызова join , могут повлиять на то, выполняются ли ваши потоки одновременно. Использование move-замыканий в потоках Мы будем часто использовать ключевое слово move с замыканиями, переданными в thread::spawn , потому что замыкание будет затем владеть значениями, взятыми из окружающего кода, а значит передаст владение этими значениями от одного потока к другому. В разделе «Захват окружения замыканиями» главы 13 мы обсуждали move (перемещение) в контексте замыканий. Теперь мы больше сосредоточимся на взаимодействии между move и thread::spawn Обратите внимание, что в листинге 16-1 замыкание, которое мы передаём в thread::spawn не принимает аргументов: мы не используем никаких данных из основного потока в коде порождённого потока. Чтобы использовать данные из основного потока в порождённом потоке, замыкание порождённого потока должно захватывать значения, которые ему необходимы. Листинг 16-3 показывает попытку создать вектор в главном потоке и использовать его в порождённом потоке. Тем не менее, это не будет работать, как вы увидите через мгновение. Файл: src/main.rs Листинг 16-3: Попытка использовать вектор, созданный основным потоком, в другом потоке hi number 1 from the spawned thread! hi number 2 from the spawned thread! hi number 3 from the spawned thread! hi number 4 from the spawned thread! hi number 5 from the spawned thread! hi number 6 from the spawned thread! hi number 7 from the spawned thread! hi number 8 from the spawned thread! hi number 9 from the spawned thread! hi number 1 from the main thread! hi number 2 from the main thread! hi number 3 from the main thread! hi number 4 from the main thread! use std::thread; fn main () { let v = vec! [ 1 , 2 , 3 ]; let handle = thread::spawn(|| { println! ( "Here's a vector: {:?}" , v); }); handle.join().unwrap(); } Замыкание использует переменную v , поэтому оно захватит v и сделает его частью окружения замыкания. Поскольку thread::spawn запускает это замыкание в новом потоке, мы должны иметь доступ к v внутри этого нового потока. Но при компиляции этого примера, мы получаем следующую ошибку: Rust выводит как захватить v и так как в println! нужна только ссылка на v , то замыкание пытается заимствовать v . Однако есть проблема: Rust не может определить, как долго будет работать порождённый поток, поэтому он не знает, будет ли всегда действительной ссылка на v В листинге 16-4 приведён сценарий, который с большей вероятностью будет иметь ссылку на v , что будет недопустимо: Файл: src/main.rs $ cargo run Compiling threads v0.1.0 (file:///projects/threads) error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function --> src/main.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `v` 7 | println!("Here's a vector: {:?}", v); | - `v` is borrowed here | note: function requires argument type to outlive `'static` --> src/main.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | println!("Here's a vector: {:?}", v); 8 | | }); | |______^ help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `threads` due to previous error Листинг 16-4. Поток с замыканием, который пытается захватить ссылку на v из основного потока, удаляющего v Если бы Rust позволил нам запустить этот код, есть вероятность, что порождённый поток был бы немедленно переведён в фоновый режим, не выполнив ничего. Порождённый поток имеет ссылку на v , но основной поток немедленно удаляет v , используя функцию drop , которую мы обсуждали в главе 15. Затем, когда порождённый поток начинает выполняться, v уже не существует, поэтому ссылка на него также будет недействительной. О, нет! Чтобы исправить ошибку компилятора в листинге 16-3, мы можем использовать совет из сообщения об ошибке: Добавляя ключевое слово move перед замыканием, мы заставляем замыкание забирать используемые значения во владение, вместо того, чтобы позволить Rust вывести необходимость заимствования значения. Модификация Листинга 16-3, показанная в Листинге 16-5, будет скомпилирована и запущена так, как мы ожидаем: Файл: src/main.rs use std::thread; fn main () { let v = vec! [ 1 , 2 , 3 ]; let handle = thread::spawn(|| { println! ( "Here's a vector: {:?}" , v); }); drop (v); // oh no! handle.join().unwrap(); } help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ++++ use std::thread; fn main () { let v = vec! [ 1 , 2 , 3 ]; let handle = thread::spawn( move || { println! ( "Here's a vector: {:?}" , v); }); handle.join().unwrap(); } Листинг 16-5. Использование ключевого слова move , чтобы замыкание стало владельцем используемых им значений. У нас может возникнуть соблазн попробовать то же самое, чтобы исправить код в листинге 16.4, где основной поток вызывал drop с помощью замыкания move . Однако это исправление не сработает, потому что то, что пытается сделать листинг 16.4, запрещено по другой причине. Если мы добавим move к замыканию, мы переместим v в окружение замыкания и больше не сможем вызывать для него drop в основном потоке. Вместо этого мы получим эту ошибку компилятора: Правила владения Rust снова нас спасли! Мы получили ошибку кода из листинга 16-3, потому что Rust был консервативен и заимствовал v только для потока, что означало, что основной поток теоретически может сделать недействительной ссылку на порождённый поток. Сообщив Rust о передаче владения v в порождаемый поток, мы гарантируем Rust, что основной поток больше не будет использовать v . Если мы изменим Листинг 16-4 таким же образом, то мы нарушаем правила владения при попытке использовать v в главном потоке. Ключевое слово move отменяет основное консервативное поведение Rust по заимствованию, что не позволяет нам нарушать правила владения. Имея базовое понимание потоков и API потоков, давайте посмотрим, что мы можем делать с помощью потоков. $ cargo run Compiling threads v0.1.0 (file:///projects/threads) error[E0382]: use of moved value: `v` --> src/main.rs:10:10 | 4 | let v = vec![1, 2, 3]; | - move occurs because `v` has type `Vec 5 | 6 | let handle = thread::spawn(move || { | ------- value moved into closure here 7 | println!("Here's a vector: {:?}", v); | - variable moved due to use in closure 10 | drop(v); // oh no! | ^ value used here after move For more information about this error, try `rustc --explain E0382`. error: could not compile `threads` due to previous error Передача данных с помощью сообщений между потоками Всё большую популярность для обеспечения безопасной многопоточности набирает способ, называемый передача сообщений. В этом случае потоки или акторы взаимодействуют друг с другом путём отправки сообщений с данными. Идея этого подхода выражена в слогане из документации языка Go таким образом: «Не стоит передавать информацию с помощью разделяемой памяти; лучше делитесь памятью, передавая информацию». Для обеспечения отправки многопоточных сообщений в стандартной библиотеке языка Rust реализованы каналы. Канал в программировании - это общепринятый механизм, с помощью которого данные из одного потока отправляются другому потоку. Вы можете представить канал в программировании как направленное движение воды, например как ручей или реку. Если вы поместите какую-нибудь вещь на воду, например резиновую уточку, она будет плыть вниз по течению до тех пор, пока это течение не кончится. Канал состоит из двух половин: передатчика и приёмника. Передатчик — это место вверх по течению, где вы опускаете резиновых уточек в реку, а приёмник — это место, где резиновые уточки оказываются в конце пути. Одна часть вашего кода вызывает методы передатчика с данными, которые вы хотите отправить, а другая часть проверяет принимающую сторону на наличие поступающих сообщений. Канал считается закрытым , если либо передающая, либо принимающая его половина уничтожена. Давайте создадим программу, в которой один поток будет генерировать значения и отправлять их в канал, а другой поток будет получать значения и распечатывать их. Мы будем отправлять между потоками простые значения, используя канал, чтобы проиллюстрировать эту функцию. После того, как вы ознакомитесь с этим методом, вы сможете использовать каналы с любыми потоками, которым необходимо взаимодействовать друг с другом. Это может быть например система чата или система, в которой несколько вычислительных потоков выполняют свою часть расчёта, а затем отправляют эту часть в отдельный поток, который уже агрегирует полученные результаты. Сначала в листинге 16-6 мы создадим канал, но не будем ничего с ним делать. Обратите внимание, что этот код ещё не компилируется, потому что Rust не может сказать, какой тип значений мы хотим отправить через канал. Файл: src/main.rs use std::sync::mpsc; fn main () { let (tx, rx) = mpsc::channel(); } Листинг 16-6: Создание канала и присваивание двух значений переменным tx и rx Мы создаём новый канал, используя функцию mpsc::channel ; mpsc означает несколько производителей, один потребитель (multiple producer, single consumer). Коротко, способ которым стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих источников генерирующих значения, но только одну принимающую сторону, которая потребляет эти значения. Представьте, что несколько ручьёв втекают в одну большую реку: всё, что плывёт вниз по любому из ручьёв, в конце концов окажется в одной реке. Сейчас мы пока начнём с одного производителя, а когда пример заработает, добавим ещё несколько. Функция mpsc::channel возвращает кортеж, первый элемент которого является отправляющей стороной (передатчиком), а вторым элементом является принимающая сторона (получатель). Аббревиатуры tx и rx традиционно используются во многих полях для передатчика и приёмника соответственно, поэтому мы называем соответствующие переменные именно так. Мы используем оператор let с шаблоном, который деструктурирует кортежи; мы обсудим использование шаблонов в операторах let и деструктуризацию в главе 18. А пока знайте, что описанное использование оператора let является удобным способом извлечения частей кортежа, возвращаемых mpsc::channel Давайте переместим передающую часть в порождённый поток так, чтобы он отправлял одну строку и чтобы таким образом, порождённый поток связывался с основным потоком, как показано в листинге 16-7. Это похоже на то, как если бы вы поместили резиновую утку в реку вверх по течению или отправили сообщение чата из одного потока в другой. Файл: src/main.rs Листинг 16-7: Перемещение tx в созданный поток и отправка сообщения «привет» Опять же, мы используем thread::spawn для создания нового потока, а затем используем move для перемещения tx в замыкание, чтобы порождённый поток владел tx . Порождённый поток должен владеть передатчиком, чтобы иметь возможность отправлять сообщения через канал. Передатчик имеет метод send , который принимает значение, которое мы хотим отправить. Метод send возвращает тип Result , use std::sync::mpsc; use std::thread; fn main () { let (tx, rx) = mpsc::channel(); thread::spawn( move || { let val = String ::from( "hi" ); tx.send(val).unwrap(); }); } поэтому, если получатель уже удалён и отправить значение некуда, операция отправки вернёт ошибку. В этом примере мы вызываем unwrap для паники в случае ошибки. В реальном приложении мы обработали бы эту ситуацию более корректно: вернитесь к главе 9, если хотите ещё раз разобрать стратегии правильной обработки ошибок. В листинге 16-8 мы получим значение от приёмника в основном потоке. Это похоже на извлечение резиновой уточки из воды в конце реки или получение сообщения в чате. Файл: src/main.rs Листинг 16-8. Получение значения «привет» в основном потоке и его печать Метод try_recv неблокирующий, отличается тем, что немедленно вернёт тип Result : где значение Ok содержащее сообщение, если оно доступно и значение Err , если в этот раз нет сообщений. Использование try_recv полезно, если текущий поток выполняет другую работу во время ожидания сообщений: мы могли бы написать цикл, довольно часто вызывающий метод try_recv , так чтобы обрабатывать сообщение, если оно доступно и в противном случае выполнять другую работу некоторое время до следующей проверки. Метод try_recv не блокирует выполнение, вместо этого он немедленно возвращает Result : значение Ok , содержащее сообщение, если оно доступно, и значение Err , если на этот раз сообщений нет. Использование try_recv полезно, если у потока есть другая работа во время ожидания сообщений: мы могли бы написать цикл, который время от времени вызывает try_recv , обрабатывает сообщение, если оно доступно, а в противном случае некоторое время выполняет другую работу, пока не проверит снова. Мы использовали recv в этом примере для простоты; у нас нет никакой другой работы для основного потока, кроме как ждать сообщений, поэтому блокировка основного потока уместна. При запуске кода листинга 16-8, мы увидим значение, напечатанное из основного потока: use std::sync::mpsc; use std::thread; fn main () { let (tx, rx) = mpsc::channel(); thread::spawn( move || { let val = String ::from( "hi" ); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println! ( "Got: {}" , received); } Отлично! Каналы и передача владения Правила владения играют жизненно важную роль в отправке сообщений, потому что они помогают писать безопасный многопоточный код. Предотвращение ошибок в многопоточном программировании является преимуществом для размышлений о владении во всех ваших Rust программах. Давайте проведём эксперимент, чтобы показать как каналы и владение действуют совместно для предотвращения проблем: мы попытаемся использовать значение val в порождённом потоке после того как отправим его в канал. Попробуйте скомпилировать код в листинге 16-9, чтобы понять, почему этот код не разрешён: Файл: src/main.rs Листинг 16-9: Попытка использовать val после того, как мы отправили его по каналу Здесь мы пытаемся напечатать значение val после того, как отправили его в канал вызвав tx.send . Разрешить это было бы плохой идеей: после того, как значение было отправлено в другой поток, текущий поток мог бы изменить или удалить значение, прежде чем мы попытались бы использовать значение снова. Потенциально изменения в другом потоке могут привести к ошибкам или не ожидаемым результатам из-за противоречивых или несуществующих данных. Однако Rust выдаёт нам ошибку, если мы пытаемся скомпилировать код в листинге 16-9: Got: hi use std::sync::mpsc; use std::thread; fn main () { let (tx, rx) = mpsc::channel(); thread::spawn( move || { let val = String ::from( "hi" ); tx.send(val).unwrap(); println! ( "val is {}" , val); }); let received = rx.recv().unwrap(); println! ( "Got: {}" , received); } Наша ошибка для многопоточности привела к ошибке компиляции. Функция send вступает во владение своим параметром и когда значение перемещается, получатель становится владельцем этого параметра. Это останавливает нас от случайного использования значения снова после его отправки; анализатор заимствования проверяет, что все в порядке. |