Главная страница
Навигация по странице:

  • Листинг 20-16: Модификация ThreadPool для хранения отправляющей части канала, который отправляет экземпляры

  • Листинг 20-17: Передача принимающей части канала "работнику"

  • Листинг 20-18: Совместное использование принимающей стороны канала среди "работников" используя Arc и

  • Листинг 20-19: Создание псевдонима типа Job для Box, содержащего каждое замыкание и затем отправляющее задание (job) в канал

  • Листинг 20-20: Получение и выполнение заданий в потоке "работника"

  • Листинг 20-22: Альтернативная реализация Worker::new с использованием

  • Изящное завершение и освобождение ресурсов

  • Реализация типажа

  • Листинг 20-22: Присоединение (Joining) каждого потока, когда пул потоков выходит из области видимости

  • Язык программирования Rust


    Скачать 7.02 Mb.
    НазваниеЯзык программирования Rust
    Дата12.04.2023
    Размер7.02 Mb.
    Формат файлаpdf
    Имя файлаThe Rust Programming Language_ru.pdf
    ТипУчебник
    #1056301
    страница59 из 62
    1   ...   54   55   56   57   58   59   60   61   62
    Отправка запросов в потоки через каналы
    Теперь мы рассмотрим проблему, заключающуюся в том, что замыкания переданные в thread::spawn абсолютно ничего не делают. Вот мы получаем замыкание, которое хотим выполнить в методе execute
    . Но для запуска нам необходимо передать замыкание в метод thread::spawn
    , где на каждый
    ThreadPool создаётся один
    Worker
    Мы хотим, чтобы только что созданные структуры
    Worker извлекали код для запуска из очереди хранящейся в
    ThreadPool и отправляли этот код в свой поток для выполнения.
    В главе 16 вы узнали о каналах (channels) - простом способе связи между двумя потоками,
    который идеально подойдёт для этого сценария. Мы будем использовать канал в качестве очереди заданий, а команда execute отправит задание из
    ThreadPool экземплярам
    Worker
    , который отправит задание в свой поток. Вот план:
    1.
    ThreadPool создаст канал и будет удерживать его передающую сторону.
    2. Каждый
    Worker будет удерживать принимающую сторону канала.
    3. Мы создадим новую структуру
    Job которая будет содержать замыкания, которые мы хотим отправить в канал.
    4. Метод execute отправит задание, которое он хочет выполнить, в отправляющую сторону канала.
    5. В своём потоке
    Worker будет выполнять цикл с принимающей стороной канала и выполнит замыкание любого получаемого задания.
    Давайте начнём с создания канала в
    ThreadPool::new и удержания отправляющей стороны в экземпляре
    ThreadPool
    , как показано в листинге 20-16. В структуре
    Job сейчас ничего не содержится, но это будет тип элемента который мы отправляем в канал.
    Файл: src/lib.rs

    Листинг 20-16: Модификация
    ThreadPool
    для хранения отправляющей части канала, который отправляет
    экземпляры
    Job
    В
    ThreadPool::new мы создаём наш новый канал и пул содержащий отправляющую сторону. Код успешно скомпилируется все ещё с предупреждениями.
    Давайте попробуем передавать принимающую сторону канала каждому "работнику"
    (структуре woker), когда пул потоков создаёт канал. Мы знаем, что хотим использовать получающую часть канала в потоке порождаемым "работником", поэтому мы будем ссылаться на параметр receiver в замыкании. Код 20-17 пока не компилируется.
    Файл: src/lib.rs use std::{sync::mpsc, thread}; pub struct
    ThreadPool
    { workers:
    Vec
    , sender: mpsc::Sender,
    } struct
    Job
    ; impl
    ThreadPool {
    // --snip-- pub fn new
    (size: usize
    ) -> ThreadPool { assert!
    (size >
    0
    ); let
    (sender, receiver) = mpsc::channel(); let mut workers =
    Vec
    ::with_capacity(size); for id in
    0
    ..size { workers.push(Worker::new(id));
    }
    ThreadPool { workers, sender }
    }
    // --snip--
    }

    Листинг 20-17: Передача принимающей части канала "работнику"
    Мы внесли несколько небольших и простых изменений: мы передаём принимающую часть канала в
    Worker::new
    , а затем используем его внутри замыкания.
    При попытке проверить код, мы получаем ошибку:
    Код пытается передать receiver в несколько экземпляров
    Worker
    . Это не будет работать, как вы помните из главы 16: реализация канала предоставляемая Rust,
    impl
    ThreadPool {
    // --snip-- pub fn new
    (size: usize
    ) -> ThreadPool { assert!
    (size >
    0
    ); let
    (sender, receiver) = mpsc::channel(); let mut workers =
    Vec
    ::with_capacity(size); for id in
    0
    ..size { workers.push(Worker::new(id, receiver));
    }
    ThreadPool { workers, sender }
    }
    // --snip--
    }
    // --snip-- impl
    Worker { fn new
    (id: usize
    , receiver: mpsc::Receiver) -> Worker { let thread = thread::spawn(|| { receiver;
    });
    Worker { id, thread }
    }
    }
    $
    cargo check
    Checking hello v0.1.0 (file:///projects/hello) error[E0382]: use of moved value: `receiver`
    -->
    src/lib.rs:26:42
    |
    21 | let (sender, receiver) = mpsc::channel();
    | -------- move occurs because `receiver` has type
    `std::sync::mpsc::Receiver`, which does not implement the `Copy` trait
    26 | workers.push(Worker::new(id, receiver));
    | ^^^^^^^^ value moved here, in previous iteration of loop
    For more information about this error, try `rustc --explain E0382`. error: could not compile `hello` due to previous error
    является моделью несколько производителей (multiple producer), один потребитель
    (single consumer). Это означает, что мы не можем просто клонировать принимающую часть канала для исправления этого кода. Даже если бы мы это могли, это не техника которую мы хотели бы использовать; вместо этого мы хотим распределить задачи среди потоков, разделяя один receiver среди всех "работников".
    Кроме того, удаление задачи из очереди канала включает изменение receiver
    , поэтому потокам необходим безопасный способ делиться и изменять receiver
    , в противном случае мы можем получить условия гонки (как описано в главе 16).
    Вспомните умные указатели, которые обсуждались в главе 16: чтобы делиться владением между несколькими потоками и позволить потокам изменять значение, нам нужно использовать тип
    Arc>
    . Тип
    Arc позволит нескольким "работникам" владеть получателем (receiver), а
    Mutex гарантирует что только один "работник" получит задание
    (job) от получателя в один момент времени. Листинг 20-18 показывает изменения,
    которые мы должны сделать.
    Файл: src/lib.rs use std::{ sync::{mpsc, Arc, Mutex}, thread,
    };
    // --snip-- impl
    ThreadPool {
    // --snip-- pub fn new
    (size: usize
    ) -> ThreadPool { assert!
    (size >
    0
    ); let
    (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers =
    Vec
    ::with_capacity(size); for id in
    0
    ..size { workers.push(Worker::new(id, Arc::clone(&receiver)));
    }
    ThreadPool { workers, sender }
    }
    // --snip--
    }
    // --snip-- impl
    Worker { fn new
    (id: usize
    , receiver: Arc>>) -> Worker {
    // --snip--
    }
    }

    Листинг 20-18: Совместное использование принимающей стороны канала среди "работников" используя
    Arc
    и
    Mutex
    В
    ThreadPool::new мы помещаем принимающую сторону канала внутрь
    Arc и
    Mutex
    Для каждого нового "работника" мы клонируем
    Arc
    , чтобы увеличить счётчик ссылок так, что "работники" могут разделять владение принимающей стороны канала.
    С этими изменениями код компилируется! Мы подбираемся к цели!
    Реализация метода execute
    Давайте реализуем метод execute у структуры
    ThreadPool
    . Мы также изменим тип
    Job со структуры на псевдоним типа для типаж-объекта, который содержит тип замыкания принимаемый методом execute
    . Как описано в разделе "Создание синонимов типа с помощью псевдонимов типа"
    главы 19, псевдонимы типов позволяют делать длинные типы короче. Посмотрите в листинг 20-19.
    Файл: src/lib.rs
    Листинг 20-19: Создание псевдонима типа
    Job
    для
    Box
    , содержащего каждое замыкание и затем
    отправляющее задание (job) в канал
    После создания нового экземпляра
    Job с помощью замыкания, получаемого в метод execute
    , мы отправляем это задание в отправляющую часть канала. Мы вызываем unwrap для send в случае неудачной отправки. Это может произойти, если например,
    мы остановим выполнение всех наших потоков, что означает, что принимающая сторона прекратила получение новых сообщений. На данный момент мы не можем остановить выполнение наших потоков: наши потоки продолжают выполняться, пока существует пул. Причина, по которой мы используем unwrap
    , заключается в том, что мы знаем, что сбоя не произойдёт, но компилятор этого не знает.
    // --snip-- type
    Job
    =
    Box
    <
    dyn
    FnOnce
    () +
    Send
    +
    'static
    >; impl
    ThreadPool {
    // --snip-- pub fn execute
    (&
    self
    , f: F) where
    F:
    FnOnce
    () +
    Send
    +
    'static
    ,
    { let job =
    Box
    ::new(f); self
    .sender.send(job).unwrap();
    }
    }
    // --snip--

    Но мы ещё не закончили! В "работнике" (worker) наше замыкание, переданное в thread::spawn все ещё ссылается только на принимающую сторону канала. Вместо этого нам нужно, чтобы замыкание работало в бесконечном цикле, запрашивая задание у принимающей части канала и выполняя задание, когда оно принято. Давайте внесём изменения, показанные в листинге 20-20 внутри
    Worker::new
    Файл: src/lib.rs
    Листинг 20-20: Получение и выполнение заданий в потоке "работника"
    Здесь мы сначала вызываем lock у receiver
    , чтобы получить мьютекс, а затем вызываем unwrap для паники при любых ошибках. Захват блокировки может завершиться неудачей, если мьютекс находится в отравленном state (poisoned state), что может произойти если какой-то другой поток запаниковал, удерживая блокировку,
    вместо снятия блокировки. В этой ситуации правильное действие - вызвать unwrap для паники потока. Не стесняйтесь заменить unwrap на expect с сообщением об ошибке,
    которое имеет для вас значение.
    Если мы получим блокировку мьютекса, мы вызываем recv для получения
    Job из канала. Окончательный вызов unwrap проходит мимо любых ошибок, которые могут произойти, если поток удерживающий отправляющую сторону канала, завершил работу подобно тому, как метод send возвращает
    Err
    , если принимающая сторона закрывается.
    Вызов recv блокирующий, поэтому если ещё нет задач (job), то текущий поток будет ждать, пока задача не станет доступной.
    Mutex
    гарантирует, что только один поток
    Worker пытается запросить задачу за раз.
    Наш пул потоков теперь находится в рабочем состоянии! Выполните cargo run и
    сделайте несколько запросов:
    // --snip-- impl
    Worker { fn new
    (id: usize
    , receiver: Arc>>) -> Worker { let thread = thread::spawn(
    move
    || loop
    { let job = receiver.lock().unwrap().recv().unwrap(); println!
    (
    "Worker {id} got a job; executing."
    ); job();
    });
    Worker { id, thread }
    }
    }

    Успех! Теперь у нас есть пул потоков, который обрабатывает соединения асинхронно.
    Никогда не создаётся более четырёх потоков, поэтому наша система не будет перегружена, если сервер получает много запросов. Если мы отправим запрос ресурса
    /sleep, сервер сможет обслуживать другие запросы, запустив их в другом потоке.
    Примечание: если вы запрашиваете /sleep в нескольких окнах браузера одновременно, они могут загружаться по одному с интервалами в 5 секунд.
    Некоторые веб-браузеры выполняют несколько экземпляров одного и того же запроса последовательно из-за кэширования. Данное ограничение не вызвано нашим веб-сервером.
    После изучения цикла while let в главе 18 вы можете удивиться, почему мы не написали код рабочего потока (worker thread), как показано в листинге 20-22.
    $
    cargo run
    Compiling hello v0.1.0 (file:///projects/hello) warning: field is never read: `workers`
    -->
    src/lib.rs:7:5
    |
    7 | workers: Vec,
    | ^^^^^^^^^^^^^^^^^^^^
    |
    = note: `#[warn(dead_code)]` on by default warning: field is never read: `id`
    -->
    src/lib.rs:48:5
    |
    48 | id: usize,
    | ^^^^^^^^^ warning: field is never read: `thread`
    -->
    src/lib.rs:49:5
    |
    49 | thread: thread::JoinHandle<()>,
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ warning: 3 warnings emitted
    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
    Running `target/debug/main`
    Worker 0 got a job; executing.
    Worker 2 got a job; executing.
    Worker 1 got a job; executing.
    Worker 3 got a job; executing.
    Worker 0 got a job; executing.
    Worker 2 got a job; executing.
    Worker 1 got a job; executing.
    Worker 3 got a job; executing.
    Worker 0 got a job; executing.
    Worker 2 got a job; executing.

    Файл: src/lib.rs
    Листинг 20-22: Альтернативная реализация
    Worker::new
    с использованием
    while let
    Этот код компилируется и запускается, но не приводит к желаемому поведению :
    медленный запрос всё равно приведёт к тому, что другие запросы будут ждать обработки. Причина здесь несколько тоньше:
    Mutex не имеет общедоступного unlock потому что право собственности на блокировку зависит от времени жизни
    MutexGuard
    в
    LockResult>
    которое возвращает метод lock
    . Во время компиляции анализатор заимствований может затем применить правило, согласно которому к ресурсу, охраняемому
    Mutex нельзя получить доступ, если мы не удерживаем блокировку. Но эта реализация также может привести к тому, что блокировка будет удерживаться дольше, чем предполагалось, если мы не будем тщательно продумывать время жизни
    MutexGuard
    Код в листинге 20-20, который использует let job = receiver.lock().unwrap().recv().unwrap();
    работает, потому что с let любые временные значения, используемые в выражении справа от знака равенства,
    немедленно удаляются после завершения оператора let
    . Однако while let
    (и if let and match
    ) не удаляет временные значения до конца связанного блока. В листинге 20-21
    блокировка сохраняется на время вызова job()
    , что означает, что другие исполнители не могут получать задания.
    // --snip-- impl
    Worker { fn new
    (id: usize
    , receiver: Arc>>) -> Worker { let thread = thread::spawn(
    move
    || { while let
    Ok
    (job) = receiver.lock().unwrap().recv() { println!
    (
    "Worker {id} got a job; executing."
    ); job();
    }
    });
    Worker { id, thread }
    }
    }

    Изящное завершение и освобождение ресурсов
    Листинг 20-20 асинхронно отвечает на запросы с помощью использования пула потоков,
    как мы и хотели. Мы получаем некоторые предупреждения про workers
    , id и поля thread
    , которые мы не используем напрямую, что напоминает нам о том, что мы не освобождаем все ресурсы. Когда мы используем менее элегантный метод остановки основного потока клавишной комбинацией ctrl-c, все остальные потоки также немедленно останавливаются, даже если они находятся в середине обработки запроса.
    Теперь мы будем реализовывать типаж
    Drop для вызова join у каждого потока в пуле,
    чтобы они могли завершить запросы над которыми они работают до закрытия. Затем мы реализуем способ сообщить потокам, что они должны перестать принимать новые запросы и завершить работу. Чтобы увидеть этот код в действии, мы изменим наш сервер так, чтобы он принимал только два запроса, прежде чем корректно завершить работу его пула потоков.
    Реализация типажа Drop для ThreadPool
    Давайте начнём с реализации
    Drop у нашего пула потоков. Когда пул удаляется, все наши потоки должны объединиться (join), чтобы убедиться, что они завершают свою работу. В листинге 20-22 показана первая попытка реализации
    Drop
    , код пока не будет работать.
    Файл: src/lib.rs
    Листинг 20-22: Присоединение (Joining) каждого потока, когда пул потоков выходит из области видимости
    Во-первых, мы проходим циклом по каждому workers из пула потоков. Для этого мы используем
    &mut
    , потому что self является изменяемой ссылкой и нам также нужно иметь возможность изменять экземпляр worker
    . Для каждого "работника" мы печатаем сообщение о том, что этот конкретный "работник" завершается, затем вызываем join у
    потока этого "работника". Если вызов join происходит с ошибкой, мы используем unwrap
    , чтобы вызвать панику в Rust и завершить не совсем красиво.
    Ошибка получаемая при компиляции этого кода:
    impl
    Drop for
    ThreadPool { fn drop
    (&
    mut self
    ) { for worker in
    &
    mut self
    .workers { println!
    (
    "Shutting down worker {}"
    , worker.id); worker.thread.join().unwrap();
    }
    }
    }

    Ошибка говорит, что мы не можем вызвать join
    , потому что у нас есть только изменяемое заимствование каждого worker и что join забирает во владение его аргумент. Чтобы решить эту проблему, нужно переместить поток из экземпляра
    Worker
    ,
    который владеет thread
    , чтобы join мог использовать внутренний поток. Мы сделали это в коде 17-15: если вместо этого
    Worker содержит тип
    Option>
    , мы можем вызвать метод take у
    Option
    , чтобы переместить значение из варианта
    Some и оставить вариант
    None на его месте. Другими словами, работающий
    Worker будет содержать вариант
    Some внутри thread
    , и когда мы хотим очистить
    Worker
    , мы заменяем значение варианта
    Some на вариант
    None
    , чтобы у
    Worker не было потока для запуска.
    Итак, мы хотим обновить объявление
    Worker следующим образом:
    Файл: src/lib.rs
    Теперь давайте опираться на компилятор, чтобы найти другие места, которые нужно изменить. Проверяя код, мы получаем две ошибки:
    $
    cargo check
    Checking hello v0.1.0 (file:///projects/hello) error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
    --> src/lib.rs:52:13
    |
    52 | worker.thread.join().unwrap();
    | ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
    | |
    | move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
    | note: this function takes ownership of the receiver `self`, which moves
    `worker.thread`
    For more information about this error, try `rustc --explain E0507`. error: could not compile `hello` due to previous error struct
    Worker
    { id: usize
    , thread:
    Option
    >,
    }

    Давайте обратимся ко второй ошибке, которая указывает на код в конце
    Worker::new
    ;
    нам нужно обернуть значение thread в вариант
    Some при создании нового
    Worker
    Внесите следующие изменения, чтобы исправить эту ошибку:
    Файл: src/lib.rs
    Первая ошибка находится в нашей реализации
    Drop
    . Ранее мы упоминали, что намеревались вызвать take для параметра
    Option
    , чтобы забрать thread из процесса worker
    . Следующие изменения делают это:
    Файл: src/lib.rs
    $
    cargo check
    Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no method named `join` found for enum `Option` in the current scope
    -->
    src/lib.rs:52:27
    |
    52 | worker.thread.join().unwrap();
    | ^^^^ method not found in `Option>` error[E0308]: mismatched types
    -->
    src/lib.rs:72:22
    |
    72 | Worker { id, thread }
    | ^^^^^^ expected enum `Option`, found struct `JoinHandle`
    |
    = note: expected enum `Option>` found struct `JoinHandle<_>` help: try wrapping the expression in `Some`
    |
    72 | Worker { id, thread: Some(thread) }
    | +++++++++++++ +
    Some errors have detailed explanations: E0308, E0599.
    For more information about an error, try `rustc --explain E0308`. error: could not compile `hello` due to 2 previous errors impl
    Worker { fn new
    (id: usize
    , receiver: Arc>>) -> Worker {
    // --snip--
    Worker { id, thread:
    Some
    (thread),
    }
    }
    }

    Как уже говорилось в главе 17, метод take у типа
    Option забирает значение из варианта
    Some и оставляет вариант
    None в этом месте. Мы используем if let
    , чтобы деструктурировать
    Some и получить поток; затем вызываем join у потока. Если поток "работника" уже
    None
    , мы знаем, что этот "работник" уже очистил свой поток, поэтому в этом случае ничего не происходит.
    1   ...   54   55   56   57   58   59   60   61   62


    написать администратору сайта