Главная страница
Навигация по странице:

  • JoinableQueue([ maxsize ])

  • .recv() Извлекает из канала объект, отправленный методом c .send(). Если с другой стороны соединение было закрыто и в канале нет данных, возбуждает ис- ключение EOFError. c

  • .recv_bytes([ maxlength ])

  • справочник по Python. мм isbn 9785932861578 9 785932 861578


    Скачать 4.21 Mb.
    Названиемм isbn 9785932861578 9 785932 861578
    Анкорсправочник по Python
    Дата08.05.2022
    Размер4.21 Mb.
    Формат файлаpdf
    Имя файлаBizli_Python-Podrobnyy-spravochnik.440222.pdf
    ТипСправочник
    #518195
    страница45 из 82
    1   ...   41   42   43   44   45   46   47   48   ...   82
    Queue([maxsize])
    Создает очередь для организации обмена сообщениями между процессами.
    Аргумент maxsize определяет максимальное количество элементов, кото- рые можно поместить в очередь. При вызове без аргумента размер очереди не ограничивается. Внутренняя реализация очередей основана на исполь- зовании каналов и блокировок. Кроме того, для передачи данных из очере- ди в канал запускается вспомогательный поток управления.
    Экземпляр q класса Queue обладает следующими методами:
    q.cancel_join_thread()
    Предотвращает автоматическое присоединение к фоновому потоку при за- вершении процесса. Благодаря этому исключается возможность блокиро- вания процесса в вызове метода join_thread().
    q.close()
    Закрывает очередь, запрещая добавление новых элементов. После вызова этого метода фоновый вспомогательный поток продолжит запись данных

    Модуль multiprocessing
    523
    из очереди в канал и завершится, как только очередь будет исчерпана.
    Этот метод вызывается автоматически, когда экземпляр q утилизируется сборщиком мусора. Операция закрытия очереди не генерирует какой-либо признак окончания передачи данных и не возбуждает исключение на при- нимающей стороне. Например, если принимающий процесс (потребитель) находится в ожидании в методе get(), закрытие очереди на стороне пере- дающего процесса (поставщика) не повлечет выход из метода get()с призна- ком ошибки на стороне потребителя.
    q.empty()
    Возвращает True, если в момент вызова очередь q была пустой. Имейте в виду, что если имеются другие процессы и потоки, добавляющие новые элементы в очередь, результат вызова этой функции не может считаться надежным (между моментом получения результата и моментом его провер- ки в очередь могут быть добавлены новые элементы).
    q.full()
    Возвращает True, если в момент вызова очередь q была полной. Результат этой функции также нельзя считать надежным в многопоточных приложе- ниях (смотрите описание q.empty()).
    q.get([block [, timeout]])
    Возвращает элемент из очереди q. Если очередь q пуста, процесс приоста- навливается до появления элемента в очереди. Аргумент block управляет режимом блокировки и по умолчанию имеет значение True. Если в этом ар- гументе передать False, при попытке получить элемент из пустой очереди метод будет возбуждать исключение Queue.Empty (объявляется в библиотеч- ном модуле Queue). Аргумент timeout является необязательным и использу- является необязательным и использу- необязательным и использу- необязательным и использу- и использу- и использу- использу- использу- ется, когда блокировка разрешена. Если в течение указанного интервала времени в очереди не появится сообщений, будет возбуждено исключение
    Queue.Empty
    q.get_nowait()
    То же, что и q.get(False).
    q.join_thread()
    Выполняет присоединение к фоновому потоку очереди. Может использо- ваться, чтобы дождаться момента, когда очередь будет исчерпана после вы- зова метода q.close(). По умолчанию этот метод вызывается всеми процес- сами, которые не являются создателями очереди q. Такое поведение можно изменить, вызвав метод q.cancel_join_thread().
    q.put(item [, block [, timeout]])
    Добавляет элемент item в очередь. Если очередь заполнена до предела, про- цесс приостанавливается до появления места в очереди. Аргумент block управляет режимом блокировки и по умолчанию имеет значение True. Если в этом аргументе передать False, при попытке добавить элемент в заполнен- ную очередь метод будет возбуждать исключение Queue.Full (объявляется в библиотечном модуле Queue). Аргумент timeout определяет предельное время ожидания появления свободного места в очереди, когда блокировка

    524
    Глава 20. Потоки и многозадачность разрешена. По истечении времени ожидания будет возбуждено исключе- ние Queue.Full.
    q.put_nowait(item)
    То же, что и q.put(item, False).
    q.qsize()
    Возвращает примерное количество элементов, находящихся в очереди. Ре- зультат этой функции также нельзя считать надежным, потому что между моментом получения результата и моментом его проверки могут быть до- бавлены новые элементы или извлечены существующие. В некоторых си- стемах этот метод может возбуждать исключение NotImplementedError.
    JoinableQueue([maxsize])
    Создает необособленный процесс очереди, доступной для совместного ис- пользования. Очереди этого типа очень похожи на очереди типа Queue, за исключением того, что очередь позволяет потребителю известить постав- щика, что элементы были благополучно обработаны. Процедура передачи извещений реализована на основе разделяемых семафоров и переменных состояния.
    Экземпляр q класса JoinableQueue обладает теми же методами, что и экзем- пляр класса Queue, и сверх того имеет следующие дополнительные методы:
    q.task_done()
    Используется потребителем, чтобы сообщить, что элемент очереди, полу- ченный методом q.get(), был обработан. Возбуждает исключение ValueError, если количество вызовов этого метода превышает количество элементов, извлеченных из очереди.
    q.join()
    Используется поставщиком, чтобы дождаться момента, когда будут обра- ботаны все элементы очереди. Этот метод приостанавливает процесс, пока для каждого элемента в очереди не будет вызван метод q.task_done().
    Следующий пример демонстрирует, как реализовать процесс, который в бесконечном цикле получает элементы из очереди и обрабатывает их. По- ставщик добавляет элементы в очередь и ожидает, пока они не будут об- работаны.
    import multiprocessing
    ёё
    def consumer(input_q):
    while True:
    item = input_q.get()
    # Обработать элемент print(item) # Заместите эту инструкцию фактической обработкой
    # Сообщить о завершении обработки input_q.task_done()
    ёё
    def producer(sequence, output_q):
    for item in sequence:
    # Добавить элемент в очередь output_q.put(item)
    ёё

    Модуль multiprocessing
    525
    # Настройка if __name__ == ‘__main__’:
    q = multiprocessing.JoinableQueue()
    # Запустить процесс-потребитель cons_p = multiprocessing.Process(target=consumer,args=(q,))
    cons_p.daemon=True cons_p.start()
    ёё
    # Воспроизвести элементы.
    # Переменная sequence представляет последовательность элементов, которые
    # будут передаваться потребителю. На практике вместо переменной можно
    # использовать генератор или воспроизводить элементы каким-либо другим
    # способом.
    sequence = [1,2,3,4]
    producer(sequence, q)
    ёё
    # Дождаться, пока все элементы не будут обработаны q.join()
    В этом примере процесс-потребитель запускается как демонический про- цесс, потому что он выполняется в бесконечном цикле, а нам необходимо, чтобы он завершался вместе с главной программой (если этого не сделать, программа зависнет). Чтобы иметь возможность в процессе-поставщике определить момент, когда все элементы будут успешно обработаны, ис- пользуется очередь типа JoinableQueue. Это гарантирует метод join(); если забыть вызвать этот метод, процесс-потребитель будет завершен еще до того, как успеет обработать все элементы в очереди.
    Добавлять и извлекать элементы очереди могут сразу несколько процес- сов. Например, если данные должны получать сразу несколько процессов- потребителей, это можно было бы реализовать, как показано ниже:
    if __name__ == ‘__main__’:
    q = multiprocessing.JoinableQueue()
    # Запустить несколько процессов-потребителей cons_p1 = multiprocessing.Process(target=consumer,args=(q,))
    cons_p1.daemon=True cons_p1.start()
    cons_p2 = multiprocessing.Process(target=consumer,args=(q,))
    cons_p2.daemon=True cons_p2.start()
    ёё
    # Воспроизвести элементы.
    # Переменная sequence представляет последовательность элементов, которые
    # будут передаваться потребителю. На практике вместо переменной можно
    # использовать генератор или воспроизводить элементы каким-либо другим
    # способом.
    sequence = [1,2,3,4]
    producer(sequence, q)
    ёё
    # Дождаться, пока все элементы не будут обработаны q.join()

    526
    Глава 20. Потоки и многозадачность
    При разработке подобного программного кода не забывайте, что каждый элемент, помещаемый в очередь, преобразуется в последовательность бай- тов и отправляется процессу через канал или сетевое соединение. Как пра- вило, с точки зрения производительности лучше послать небольшое коли- чество крупных объектов, чем много маленьких.
    В некоторых ситуациях бывает желательно, чтобы поставщик извещал потребителей, что элементов больше не будет и потребители должны за- вершить работу. Для этих целей можно использовать сигнальную метку – специальное значение, которое сигнализирует об окончании работы. Ниже приводится пример, иллюстрирующий использование значения None в ка- честве сигнальной метки:
    import multiprocessing
    ёё
    def consumer(input_q):
    while True:
    item = input_q.get()
    if item is None:
    break
    # Обработать элемент print(item) # Заместите эту инструкцию фактической обработкой
    # Завершение print(“Потребитель завершил работу”)
    ёё
    def producer(sequence, output_q):
    for item in sequence:
    # Добавить элемент в очередь output_q.put(item)
    ёё
    if __name__ == ‘__main__’:
    q = multiprocessing.Queue()
    # Запустить процесс-потребитель cons_p = multiprocessing.Process(target=consumer,args=(q,))
    cons_p.start()
    ёё
    # Воспроизвести элементы. sequence = [1,2,3,4]
    producer(sequence, q)
    ёё
    # Сообщить о завершении, поместив в очередь сигнальную метку q.put(None)
    ёё
    # Дождаться, пока завершится процесс-потребитель cons_p.join()
    При использовании сигнальных меток, как это показано в примере, следу- ет помнить, что необходимо добавить в очередь по одной сигнальной метке для каждого потребителя. Например, если было запущено три процесса- потребителя, извлекающих элементы из очереди, процесс-поставщик дол- жен добавить в очередь три сигнальные метки, чтобы завершить работу всех потребителей.
    Для обмена сообщениями между процессами вместо очередей можно ис- пользовать каналы.

    Модуль multiprocessing
    527
    Pipe([duplex])
    Создает канал между процессами и возвращает кортеж (conn1, conn2), где поля conn1 и conn2 являются объектами класса Connection, представляющи- ми концы канала. По умолчанию создается двунаправленный канал. Если в аргументе duplex передать значение False, то объект conn1 можно будет ис- пользовать только для чтения, а объект conn2 – только для записи. Функ- ция Pipe() должна вызываться до создания и запуска объектов класса Pro- cess
    , которые будут пользоваться каналом.
    Экземпляр c класса Connection, возвращаемый функцией Pipe(), обладает следующими методами и атрибутами:
    c.close()
    Закрывает соединение. Вызывается автоматически, когда объект c утили- зируется сборщиком мусора.
    c.fileno()
    Возвращает целочисленный дескриптор файла, идентифицирующий со- единение.
    c.poll([timeout])
    Возвращает True при наличии данных в канале. Аргумент timeout опреде- ляет предельное время ожидания. При вызове без аргумента метод немед- ленно возвращает результат. Если в аргументе timeout передать значение
    None
    , метод будет ожидать появления данных неопределенно долго.
    c.recv()
    Извлекает из канала объект, отправленный методом c.send(). Если с другой стороны соединение было закрыто и в канале нет данных, возбуждает ис- ключение EOFError.
    c.recv_bytes([maxlength])
    Принимает строку байтов сообщения, отправленную методом c.send_bytes().
    Аргумент maxlength определяет максимальное количество байтов, которые требуется принять. Если входящее сообщение длиннее заданного значения, возбуждается исключение IOError, после чего последующие операции чте- ния из канала становятся невозможными. Если с другой стороны соедине- ние было закрыто и в канале нет данных, возбуждает исключение EOFError.
    c.recv_bytes_into(buffer [, offset])
    Принимает строку байтов сообщения и сохраняет ее в объекте buffer, ко- торый должен поддерживать интерфейс буферов, доступных для записи
    (такой как объект типа bytearray или похожий). Аргумент offset определя- ет смещение в байтах от начала буфера, куда будет записано сообщение.
    Возвращает количество прочитанных байтов. Если длина сообщения пре- количество прочитанных байтов. Если длина сообщения пре- количество прочитанных байтов. Если длина сообщения пре- прочитанных байтов. Если длина сообщения пре- прочитанных байтов. Если длина сообщения пре- байтов. Если длина сообщения пре- байтов. Если длина сообщения пре-
    . Если длина сообщения пре-
    Если длина сообщения пре- высит объем доступного пространства в буфере, будет возбуждено исклю- чение BufferTooShort.
    c.send(obj)
    Отправляет объект через соединение. Аргумент obj может быть любым объектом, совместимым с модулем pickle.

    528
    Глава 20. Потоки и многозадачность
    c.send_bytes(buffer [, offset [, size]])
    Отправляет строку байтов из буфера через соединение. Аргумент buffer может быть любым объектом, поддерживающим интерфейс буферов. Ар- гумент offset определяет смещение в байтах от начала буфера, а аргумент
    size
    – количество байтов, которые требуется отправить. Данные отправля- ются в виде одного сообщения и могут быть приняты одним вызовом мето- да c.recv_bytes().
    Работа с каналами мало чем отличается от работы с очередями. Ниже при- водится пример, демонстрирующий решение предыдущей задачи передачи данных между поставщиком и потребителем на основе каналов:
    import multiprocessing
    ёё
    # Получает элементы из канала.
    def consumer(pipe):
    output_p, input_p = pipe input_p.close() # Закрыть конец канала, доступный для записи while True:
    try:
    item = output_p.recv()
    except EOFError:
    break
    # Обработать элемент print(item) # Заместите эту инструкцию фактической обработкой
    # Завершение print(“Потребитель завершил работу”)
    ёё
    # Создает элементы и помещает их в канал. Переменная sequence представляет
    # итерируемый объект с элементами, которые требуется обработать.
    def producer(sequence, input_p):
    for item in sequence:
    # Послать элемент в канал input_p.send(item)
    ёё
    if __name__ == ‘__main__’:
    (output_p, input_p) = multiprocessing.Pipe()
    # Запустить процесс-потребитель cons_p = multiprocessing.Process(target=consumer,args=((output_p,
    input_p),))
    cons_p.start()
    ёё
    # Закрыть в поставщике конец канала, доступный для чтения output_p.close()
    ёё
    # Отправить элементы sequence = [1,2,3,4]
    producer(sequence, input_p)
    ёё
    # Сообщить об окончании, закрыв конец канала, доступный для записи input_p.close()
    ёё
    # Дождаться, пока завершится процесс-потребитель cons_p.join()

    Модуль multiprocessing
    529
    Особое внимание должно уделяться корректному управлению концами ка- нала. Если один из концов канала не используется поставщиком или потре- бителем, его следует закрыть. Именно этим объясняется, почему в примере выше процесс-поставщик закрывает конец канала, доступный для чтения, а процесс-потребитель – конец канала, доступный для записи. Если забыть выполнить одну из этих операций, программа может зависнуть при вызове метода recv() в процессе-потребителе. Операционная система ведет подсчет ссылок на каналы, поэтому, чтобы возбудить исключение EOFError, канал должен быть закрыт с обоих концов. То есть, если закрыть канал только со стороны поставщика, это не окажет никакого эффекта, пока потребитель не закроет свой конец того же канала.
    Каналы могут использоваться как средство обмена сообщениями в двух направлениях. То есть они позволяют писать программы, реализующие модель обмена запрос/ответ, которая обычно применяется во взаимодей- ствиях типа клиент/сервер или в вызовах удаленных процедур. Ниже при- водится пример реализации такого типа взаимодействий:
    import multiprocessing
    # Серверный процесс def adder(pipe):
    server_p, client_p = pipe client_p.close()
    while True:
    try:
    x,y = server_p.recv()
    except EOFError:
    break result = x + y server_p.send(result)
    # Завершение print(“Сервер завершил работу”)
    ёё
    if __name__ == ‘__main__’:
    (server_p, client_p) = multiprocessing.Pipe()
    # Запустить серверный процесс adder_p = multiprocessing.Process(target=adder,args=((server_p, client_p),))
    adder_p.start()
    ёё
    # Закрыть серверный канал в клиенте server_p.close()
    ёё
    # Послать серверу несколько запросов client_p.send((3,4))
    print(client_p.recv())
    client_p.send((‘Hello’,’World’))
    print(client_p.recv())
    ёё
    # Конец. Закрыть канал client_p.close()
    ёё
    # Дождаться, пока завершится серверный процесс adder_p.join()

    530
    Глава 20. Потоки и многозадачность
    В этом примере функция adder() запускается как серверный процесс, ожи- дающий поступления сообщений на своем конце канала. Получив сообще- ние, сервер обрабатывает его и отправляет результаты обратно в канал. Не забывайте, что методы send() и recv() используют модуль pickle для сериа- лизации и десериализации объектов. В примере сервер получает кортеж
    (
    x, y)
    и возвращает результат операции x + y. В более сложных приложе- ниях могут использоваться вызовы удаленных процедур, для чего может потребоваться создать пул процессов, о чем рассказывается ниже.
    Пулы процессов
    Следующий класс позволяет создать пул процессов, которые могут выпол- нять различные виды обработки данных. По своей функциональности пул напоминает генератор списков и операции функционального программи- рования, такие как отображение и снижение размерности.
    1   ...   41   42   43   44   45   46   47   48   ...   82


    написать администратору сайта