Главная страница
Навигация по странице:

  • .imap_unordered( func , iterable [, chunksize ]])

  • Value( typecode , arg1 , ... argN , lock )

  • RawValue( typecode , arg1 , ..., argN ) То же, что и Value(), за исключением того, что не использует блокировку.Array( typecode

  • Примитив Описание

  • справочник по Python. мм isbn 9785932861578 9 785932 861578


    Скачать 4.21 Mb.
    Названиемм isbn 9785932861578 9 785932 861578
    Анкорсправочник по Python
    Дата08.05.2022
    Размер4.21 Mb.
    Формат файлаpdf
    Имя файлаBizli_Python-Podrobnyy-spravochnik.440222.pdf
    ТипСправочник
    #518195
    страница46 из 82
    1   ...   42   43   44   45   46   47   48   49   ...   82
    Pool([numprocess [,initializer [, initargs]]])
    Создает пул процессов. В аргументе numprocess передается число процессов, которые требуется создать. Если этот аргумент опустить, будет использо- ваться значение, возвращаемое функцией cpu_count(). В аргументе initial-
    izer
    передается вызываемый объект, который будет выполняться при запу- ске каждого процесса из пула. Аргумент initargs – это кортеж параметров, передаваемых объекту initializer. По умолчанию аргумент initializer при- при- нимает значение None.
    Экземпляр p класса Pool поддерживает следующие операции:
    p.apply(func [, args [, kwargs]])
    Выполняет функцию func(*args, **kwargs) в одном из процессов пула и воз- вращает результат. Важно отметить, что функция func вызывается не во всех, а только в одном процессе из пула. Если потребуется вызвать функ- цию func параллельно, с другим набором аргументов, метод p.apply() сле- дует вызвать из другого потока управления или воспользоваться методом
    p.apply_async()
    p.apply_async(func [, args [, kwargs [, callback]]])
    Выполняет функцию func(*args, **kwargs) в одном из процессов пула и воз- вращает результат асинхронно. Результатом вызова этого метода является экземпляр класса AsyncResult, который позднее можно использовать для получения фактического результата. В аргументе callback передается вы- зываемый объект, принимающий единственный аргумент. Как только ре- только ре- только ре- ре- ре- зультат вызова функции func будет доступен, он немедленно будет передан объекту callback. Объект callback не должен выполнять какие-либо опера- ции блокировки, в противном случае он заблокирует прием результатов в других асинхронных операциях.
    p.close()
    Закрывает пул процессов и предотвращает возможность выполнения после- дующих операций. Если к этому моменту остались какие-либо незавершен- ные операции, процессы из пула завершатся только после их выполнения.

    Модуль multiprocessing
    531
    p.join()
    Ожидает, пока все процессы из пула не завершат работу. Может вызывать- ся только после вызова метода close() или terminate().
    p.imap(func, iterable [, chunksize])
    Версия функции map(), которая вместо списка с результатами возвращает итератор.
    p.imap_unordered(func, iterable [, chunksize]])
    То же, что и imap(), за исключением того, что результаты возвращаются в произвольном порядке, в зависимости от очередности получения их от процессов из пула.
    p.map(func, iterable [, chunksize])
    Применяет вызываемый объект func ко всем элементам в объекте iterable и возвращает список с результатами. Операция выполняется несколькими процессами параллельно, для чего объект iterable разбивается на фрагмен- ты, которые затем передаются процессам из пула. Аргумент chunksize опре- опре- деляет количество элементов в каждом фрагменте. При обработке больших объемов данных увеличение значения chunksize может привести к улучше- нию общей производительности.
    p.map_async(func, iterable [, chunksize [, callback]])
    То же, что и map(), за исключением того, что результат возвращается асин- хронно. Возвращает экземпляр класса AsyncResult, который позднее может использоваться для получения фактического результата. В аргументе call-
    back
    передается вызываемый объект, принимающий единственный аргу- мент. Как только результат будет доступен, он немедленно будет передан объекту callback.
    p.terminate()
    Немедленно завершает все процессы из пула, не позволяя им выполнить какие-либо заключительные действия или незавершенные операции. Этот метод вызывается автоматически, когда объект p утилизируется сборщи- ком мусора.
    Методы apply_async() и map_async() возвращают экземпляр a класса AsyncRe- sult
    , который обладает следующими методами:
    a.get([timeout])
    Возвращает результат, ожидая, пока он поступит, если это необходимо.
    В необязательном аргументе timeout передается предельное время ожида- передается предельное время ожида- ния. Если результат не поступит в течение указанного промежутка вре- мени, возбуждается исключение multiprocessing.TimeoutError. Если при вы- полнении удаленной операции возникло исключение, оно повторно будет возбуждено при вызове этого метода.
    a.ready()
    Возвращает True, если вызов завершился полностью.

    532
    Глава 20. Потоки и многозадачность
    a.sucessful()
    Возвращает True, если в процессе вызова не возникло исключений. Если этот метод будет вызван до того, как результаты станут доступны, возбуж- дается исключение AssertionError.
    a.wait([timeout])
    Ожидает, пока результаты станут доступны. В необязательном аргументе
    timeout
    передается предельное время ожидания.
    Следующий пример демонстрирует использование пула процессов для по- строения словаря, отображающего имена файлов в значения контрольных сумм SHA512 для всех файлов в каталоге:
    import os import multiprocessing import hashlib
    ёё
    # Вы можете изменить значения некоторых параметров
    BUFSIZE = 8192 # Размер буфера чтения
    POOLSIZE = 2 # Количество процессов
    ёё
    def compute_digest(filename):
    try:
    f = open(filename,”rb”)
    except IOError:
    return None digest = hashlib.sha512()
    while True:
    chunk = f.read(BUFSIZE)
    if not chunk: break digest.update(chunk)
    f.close()
    return filename, digest.digest()
    ёё
    def build_digest_map(topdir):
    digest_pool = multiprocessing.Pool(POOLSIZE)
    allfiles = (os.path.join(path,name)
    for path, dirs, files in os.walk(topdir)
    for name in files)
    digest_map = dict(digest_pool.imap_unordered(compute_digest, allfiles,20))
    digest_pool.close()
    return digest_map
    ёё
    # Проверка. Измените имя каталога на желаемое.
    if __name__ == ‘__main__’:
    digest_map = build_digest_map(“/Users/beazley/Software/Python-3.0”)
    print(len(digest_map))
    Сначала в этом примере с помощью выражения-генератора определяется последовательность всех путей к файлам в указанном дереве каталогов. За- тем эта последовательность делится на фрагменты и частями передается процессам из пула с помощью метода imap_unordered(). Каждый процесс из пула вычисляет контрольные суммы SHA512 для своих файлов с помощью функции compute_digest() и возвращает результаты главному процессу,

    Модуль multiprocessing
    533
    который собирает их в словаре. Несмотря на то что этот пример не содер- жит особых программистских ухищрений, тем не менее на макбуке автора с двухядерным процессором он дает 75-процентный прирост скорости по сравнению с решением, основанным на использовании единственного про- цесса.
    Имейте в виду, что использовать пул процессов имеет смысл только в том случае, если объем вычислений достаточно велик, чтобы компенсировать дополнительные накладные расходы на взаимодействия между процесса- ми. Вообще говоря, нет смысла использовать пул процессов для простых вычислений, таких как сложение пары чисел.
    Совместно используемые данные и синхронизация
    Обычно процессы полностью изолированы друг от друга и могут обмени- ваться между собой только с помощью очередей или каналов. Однако суще- ствует два объекта, которые могут использоваться для представления раз- деляемых данных. Внутри эти объекты используют области разделяемой памяти (с помощью модуля mmap), обеспечивая доступ к ним из нескольких процессов.
    Value(typecode, arg1, ... argN, lock)
    Создает объект типа ctypes в разделяемой памяти. В аргументе typecode передается либо строка, содержащая код типа из модуля array (например,
    ‘i’
    , ‘d’ и другие), либо объект типа из модуля ctypes (например, ctypes.c_int, ctypes.c_double и другие). Все остальные позиционные аргументы arg1, arg2,
    , argN передаются конструктору указанного типа. Аргумент lock может передаваться только как именованный аргумент. Если он имеет значение
    True
    (по умолчанию), создается новая блокировка для защиты доступа к зна- чению. Если передать в этом аргументе существующую блокировку, напри- мер экземпляр класса Lock или RLock, то для синхронизации доступа будет использоваться эта блокировка. Если допустить, что v – это экземпляр раз- деляемого значения, созданного функцией Value(), то само значение будет доступно как v.value. Например, обращение к атрибуту v.value вернет значе- ние, а операция присваивания атрибуту v.value изменит значение.
    RawValue(typecode, arg1, ..., argN)
    То же, что и Value(), за исключением того, что не использует блокировку.
    Array(typecode, initializer, lock)
    Создает в разделяемой памяти массив объектов ctypes. Аргумент typecode определяет тип элементов массива и имеет тот же смысл, что и в функции
    Value()
    . В аргументе initializer можно передать либо целое число, опреде- ляющее начальный размер массива, либо последовательность элементов, элементы которой будет использоваться для инициализации массива. Ар- гумент lock может передаваться только как именованный аргумент и име- ет тот же смысл, что и в функции Value(). Если допустить, что a – это экзем- пляр разделяемого массива, созданного функцией Array(), то к элементам массива можно будет обращаться с использованием стандартных операций индексирования, извлечения среза и итераций, каждая из которых будет синхронизироваться блокировкой. В случае строк байтов объект a будет

    534
    Глава 20. Потоки и многозадачность также обладать атрибутом a.value, позволяющим обращаться к массиву как к единой строке.
    RawArray(typecode, initializer)
    То же, что и Array(), за исключением того, что не использует блокировку.
    Если потребуется написать программу, которая оперирует одновременно большим количеством элементов массива, для повышения производитель- ности лучше будет использовать этот тип данных и предусмотреть отдель- ную блокировку для синхронизации (если это необходимо).
    В дополнение к разделяемым значениям, которые создаются функциями
    Value()
    и Array(), модуль multiprocessing предоставляет разделяемые версии следующих примитивов синхронизации:
    Примитив
    Описание
    Lock
    Взаимоисключающая блокировка
    Rlock
    Реентерабельная взаимоисключающая блокировка
    (может приобретаться одним и тем же процессом множество раз, не блокируя его)
    Semaphore
    Семафор
    BoundedSemaphore
    Ограниченный семафор
    Event
    Событие
    Condition
    Переменная состояния
    Своим поведением эти объекты имитируют примитивы синхронизации идентичными именами, объявленные в модуле threading. За дополнитель- ной информацией обращайтесь к документации модуля threading.
    Следует отметить, что при работе с модулем multiprocessing обычно не при- ходится использовать блокировки, семафоры и другие подобные механиз- мы синхронизации, настолько же низкоуровневые, какие используются при работе с потоками. Операции send() и recv() каналов и операции put() и get() очередей уже обеспечивают синхронизацию. Однако в некоторых особых случаях вполне могут найти применение разделяемые значения и блокировки. Ниже приводится пример, в котором для передачи списка чисел с плавающей точкой от одного процесса другому вместо канала ис- пользуется разделяемый массив:
    import multiprocessing
    ёё
    class FloatChannel(object):
    def __init__(self, maxsize):
    self.buffer = multiprocessing.RawArray(‘d’,maxsize)
    self.buffer_len = multiprocessing.Value(‘i’)
    self.empty = multiprocessing.Semaphore(1)
    self.full = multiprocessing.Semaphore(0)
    def send(self,values):
    self.empty.acquire() # Продолжить, только если буфер пуст nitems = len(values)

    Модуль multiprocessing
    535
    self.buffer_len = nitems # Установить размер буфера self.buffer[:nitems] = values # Скопировать значения в буфер self.full.release() # Сообщить, что буфер заполнен def recv(self):
    self.full.acquire() # Продолжить, только если буфер заполнен values = self.buffer[:self.buffer_len.value] # Скопировать значение self.empty.release() # Сообщить, что буфер пуст return values
    ёё
    # Проверка производительности. Прием пакета сообщений def consume_test(count, ch):
    for i in xrange(count):
    values = ch.recv()
    ёё
    # Проверка производительности. Передача пакета сообщений def produce_test(count, values, ch):
    for i in xrange(count):
    ch.send(values)
    ёё
    if __name__ == ‘__main__’:
    ch = FloatChannel(100000)
    p = multiprocessing.Process(target=consume_test,
    args=(1000,ch))
    p.start()
    values = [float(x) for x in xrange(100000)]
    produce_test(1000, values, ch)
    print(“Конец”)
    p.join()
    Подробное изучение этого примера я оставлю вам, в качестве самостоя- тельного упражнения. Однако замечу, что при проверке производительно- сти на компьютере автора отправка огромного списка чисел с плавающей точкой с помощью объекта FloatChannel выполняется примерно на 80 про- центов быстрее, чем отправка того же списка с помощью объекта Pipe (ко- торый вынужден преобразовывать все значения в последовательную форму и обратно с помощью модуля pickle).
    Управляемые объекты
    В отличие от потоков управления, процессы не поддерживают разделяе- мые объекты. Несмотря на имеющуюся возможность создавать разделяе- мые значения и массивы, как было показано в предыдущем разделе, этот прием не может использоваться для передачи более сложных объектов языка Python, таких как словари, списки или экземпляры пользователь-
    Python, таких как словари, списки или экземпляры пользователь-
    , таких как словари, списки или экземпляры пользователь- ских классов. Однако модуль multiprocessing предоставляет способ совмест- ного использования объектов, при условии, что они будут действовать под управлением так называемого менеджера. Менеджер – это отдельный до- черний процесс, в котором существует действующий объект и который играет роль сервера. Другие процессы обращаются к разделяемым объек- там с помощью специальных промежуточных прокси-объектов, которые действуют как клиенты сервера менеджера.
    Самый простой способ создать разделяемый объект заключается в том, чтобы вызвать функцию Manager().

    536
    Глава 20. Потоки и многозадачность
    Manager()
    Создает и запускает отдельный процесс сервера менеджера. Возвращает экземпляр класса SyncManager, который определен в подмодуле multiprocess- ing.managers
    Экземпляр m класса SyncManager, возвращаемый функцией Manager(), обла- дает группой методов, создающих разделяемые объекты и возвращающих прокси-объекты, которые могут использоваться для доступа к разделяе- мым объектам. Обычно создание менеджера и создание разделяемых объ- ектов с помощью его методов производится перед запуском любых новых процессов. Ниже перечислены доступные методы:
    m.Array(typecode, sequence)
    Создает на стороне сервера разделяемый объект типа Array и возвраща- ет прокси-объект для доступа к нему. Описание аргументов можно найти в разделе «Совместно используемые данные и синхронизация».
    m.BoundedSemaphore([value])
    Создает на стороне сервера разделяемый экземпляр threading.BoundedSema-
    .BoundedSema-
    BoundedSema- phore и возвращает прокси-объект для доступа к нему.
    m.Condition([lock])
    Создает на стороне сервера разделяемый экземпляр threading.Condition и возвращает прокси-объект для доступа к нему. В аргументе lock переда- ется прокси-объект, созданный вызовом метода m.Lock() или m.Rlock().
    m.dict([args])
    Создает на стороне сервера разделяемый объект типа dict и возвращает прокси-объект для доступа к нему. Этот метод принимает те же аргументы, что и встроенная функция dict().
    m.Event()
    Создает на стороне сервера разделяемый экземпляр threading.Event и воз- вращает прокси-объект для доступа к нему.
    m.list([sequence])
    Создает на стороне сервера разделяемый объект типа list и возвращает прокси-объект для доступа к нему. Этот метод принимает те же аргументы, что и встроенная функция list().
    m.Lock()
    Создает на стороне сервера разделяемый экземпляр threading.Lock и возвра- щает прокси-объект для доступа к нему.
    m.Namespace()
    Создает на стороне сервера разделяемый объект пространства имен и воз- вращает прокси-объект для доступа к нему. Под пространством имен в данном случае подразумевается объект, напоминающий модуль на языке
    Python. Например, если допустить, что n – это прокси-объект для доступа к объекту пространства имен, то к его атрибутам можно обращаться с по- мощью оператора точки (.), например: n.name = value или value = n.name.

    Модуль multiprocessing
    537
    Следует заметить, что имя атрибута name в подобных операциях имеет боль- шое значение. Если имя атрибута name начинается с алфавитного символа, то его значением является часть разделяемого объекта, находящегося под управлением менеджера, и он будет доступен всем остальным процессам.
    Если имя атрибута name начинается с символа подчеркивания, этот атрибут является частью прокси-объекта и не будет доступен другим процессам.
    1   ...   42   43   44   45   46   47   48   49   ...   82


    написать администратору сайта