справочник по Python. мм isbn 9785932861578 9 785932 861578
Скачать 4.21 Mb.
|
Pool([numprocess [,initializer [, initargs]]]) Создает пул процессов. В аргументе numprocess передается число процессов, которые требуется создать. Если этот аргумент опустить, будет использо- ваться значение, возвращаемое функцией cpu_count(). В аргументе initial- izer передается вызываемый объект, который будет выполняться при запу- ске каждого процесса из пула. Аргумент initargs – это кортеж параметров, передаваемых объекту initializer. По умолчанию аргумент initializer при- при- нимает значение None. Экземпляр p класса Pool поддерживает следующие операции: p.apply(func [, args [, kwargs]]) Выполняет функцию func(*args, **kwargs) в одном из процессов пула и воз- вращает результат. Важно отметить, что функция func вызывается не во всех, а только в одном процессе из пула. Если потребуется вызвать функ- цию func параллельно, с другим набором аргументов, метод p.apply() сле- дует вызвать из другого потока управления или воспользоваться методом p.apply_async() p.apply_async(func [, args [, kwargs [, callback]]]) Выполняет функцию func(*args, **kwargs) в одном из процессов пула и воз- вращает результат асинхронно. Результатом вызова этого метода является экземпляр класса AsyncResult, который позднее можно использовать для получения фактического результата. В аргументе callback передается вы- зываемый объект, принимающий единственный аргумент. Как только ре- только ре- только ре- ре- ре- зультат вызова функции func будет доступен, он немедленно будет передан объекту callback. Объект callback не должен выполнять какие-либо опера- ции блокировки, в противном случае он заблокирует прием результатов в других асинхронных операциях. p.close() Закрывает пул процессов и предотвращает возможность выполнения после- дующих операций. Если к этому моменту остались какие-либо незавершен- ные операции, процессы из пула завершатся только после их выполнения. Модуль multiprocessing 531 p.join() Ожидает, пока все процессы из пула не завершат работу. Может вызывать- ся только после вызова метода close() или terminate(). p.imap(func, iterable [, chunksize]) Версия функции map(), которая вместо списка с результатами возвращает итератор. p.imap_unordered(func, iterable [, chunksize]]) То же, что и imap(), за исключением того, что результаты возвращаются в произвольном порядке, в зависимости от очередности получения их от процессов из пула. p.map(func, iterable [, chunksize]) Применяет вызываемый объект func ко всем элементам в объекте iterable и возвращает список с результатами. Операция выполняется несколькими процессами параллельно, для чего объект iterable разбивается на фрагмен- ты, которые затем передаются процессам из пула. Аргумент chunksize опре- опре- деляет количество элементов в каждом фрагменте. При обработке больших объемов данных увеличение значения chunksize может привести к улучше- нию общей производительности. p.map_async(func, iterable [, chunksize [, callback]]) То же, что и map(), за исключением того, что результат возвращается асин- хронно. Возвращает экземпляр класса AsyncResult, который позднее может использоваться для получения фактического результата. В аргументе call- back передается вызываемый объект, принимающий единственный аргу- мент. Как только результат будет доступен, он немедленно будет передан объекту callback. p.terminate() Немедленно завершает все процессы из пула, не позволяя им выполнить какие-либо заключительные действия или незавершенные операции. Этот метод вызывается автоматически, когда объект p утилизируется сборщи- ком мусора. Методы apply_async() и map_async() возвращают экземпляр a класса AsyncRe- sult , который обладает следующими методами: a.get([timeout]) Возвращает результат, ожидая, пока он поступит, если это необходимо. В необязательном аргументе timeout передается предельное время ожида- передается предельное время ожида- ния. Если результат не поступит в течение указанного промежутка вре- мени, возбуждается исключение multiprocessing.TimeoutError. Если при вы- полнении удаленной операции возникло исключение, оно повторно будет возбуждено при вызове этого метода. a.ready() Возвращает True, если вызов завершился полностью. 532 Глава 20. Потоки и многозадачность a.sucessful() Возвращает True, если в процессе вызова не возникло исключений. Если этот метод будет вызван до того, как результаты станут доступны, возбуж- дается исключение AssertionError. a.wait([timeout]) Ожидает, пока результаты станут доступны. В необязательном аргументе timeout передается предельное время ожидания. Следующий пример демонстрирует использование пула процессов для по- строения словаря, отображающего имена файлов в значения контрольных сумм SHA512 для всех файлов в каталоге: import os import multiprocessing import hashlib ёё # Вы можете изменить значения некоторых параметров BUFSIZE = 8192 # Размер буфера чтения POOLSIZE = 2 # Количество процессов ёё def compute_digest(filename): try: f = open(filename,”rb”) except IOError: return None digest = hashlib.sha512() while True: chunk = f.read(BUFSIZE) if not chunk: break digest.update(chunk) f.close() return filename, digest.digest() ёё def build_digest_map(topdir): digest_pool = multiprocessing.Pool(POOLSIZE) allfiles = (os.path.join(path,name) for path, dirs, files in os.walk(topdir) for name in files) digest_map = dict(digest_pool.imap_unordered(compute_digest, allfiles,20)) digest_pool.close() return digest_map ёё # Проверка. Измените имя каталога на желаемое. if __name__ == ‘__main__’: digest_map = build_digest_map(“/Users/beazley/Software/Python-3.0”) print(len(digest_map)) Сначала в этом примере с помощью выражения-генератора определяется последовательность всех путей к файлам в указанном дереве каталогов. За- тем эта последовательность делится на фрагменты и частями передается процессам из пула с помощью метода imap_unordered(). Каждый процесс из пула вычисляет контрольные суммы SHA512 для своих файлов с помощью функции compute_digest() и возвращает результаты главному процессу, Модуль multiprocessing 533 который собирает их в словаре. Несмотря на то что этот пример не содер- жит особых программистских ухищрений, тем не менее на макбуке автора с двухядерным процессором он дает 75-процентный прирост скорости по сравнению с решением, основанным на использовании единственного про- цесса. Имейте в виду, что использовать пул процессов имеет смысл только в том случае, если объем вычислений достаточно велик, чтобы компенсировать дополнительные накладные расходы на взаимодействия между процесса- ми. Вообще говоря, нет смысла использовать пул процессов для простых вычислений, таких как сложение пары чисел. Совместно используемые данные и синхронизация Обычно процессы полностью изолированы друг от друга и могут обмени- ваться между собой только с помощью очередей или каналов. Однако суще- ствует два объекта, которые могут использоваться для представления раз- деляемых данных. Внутри эти объекты используют области разделяемой памяти (с помощью модуля mmap), обеспечивая доступ к ним из нескольких процессов. Value(typecode, arg1, ... argN, lock) Создает объект типа ctypes в разделяемой памяти. В аргументе typecode передается либо строка, содержащая код типа из модуля array (например, ‘i’ , ‘d’ и другие), либо объект типа из модуля ctypes (например, ctypes.c_int, ctypes.c_double и другие). Все остальные позиционные аргументы arg1, arg2, , argN передаются конструктору указанного типа. Аргумент lock может передаваться только как именованный аргумент. Если он имеет значение True (по умолчанию), создается новая блокировка для защиты доступа к зна- чению. Если передать в этом аргументе существующую блокировку, напри- мер экземпляр класса Lock или RLock, то для синхронизации доступа будет использоваться эта блокировка. Если допустить, что v – это экземпляр раз- деляемого значения, созданного функцией Value(), то само значение будет доступно как v.value. Например, обращение к атрибуту v.value вернет значе- ние, а операция присваивания атрибуту v.value изменит значение. RawValue(typecode, arg1, ..., argN) То же, что и Value(), за исключением того, что не использует блокировку. Array(typecode, initializer, lock) Создает в разделяемой памяти массив объектов ctypes. Аргумент typecode определяет тип элементов массива и имеет тот же смысл, что и в функции Value() . В аргументе initializer можно передать либо целое число, опреде- ляющее начальный размер массива, либо последовательность элементов, элементы которой будет использоваться для инициализации массива. Ар- гумент lock может передаваться только как именованный аргумент и име- ет тот же смысл, что и в функции Value(). Если допустить, что a – это экзем- пляр разделяемого массива, созданного функцией Array(), то к элементам массива можно будет обращаться с использованием стандартных операций индексирования, извлечения среза и итераций, каждая из которых будет синхронизироваться блокировкой. В случае строк байтов объект a будет 534 Глава 20. Потоки и многозадачность также обладать атрибутом a.value, позволяющим обращаться к массиву как к единой строке. RawArray(typecode, initializer) То же, что и Array(), за исключением того, что не использует блокировку. Если потребуется написать программу, которая оперирует одновременно большим количеством элементов массива, для повышения производитель- ности лучше будет использовать этот тип данных и предусмотреть отдель- ную блокировку для синхронизации (если это необходимо). В дополнение к разделяемым значениям, которые создаются функциями Value() и Array(), модуль multiprocessing предоставляет разделяемые версии следующих примитивов синхронизации: Примитив Описание Lock Взаимоисключающая блокировка Rlock Реентерабельная взаимоисключающая блокировка (может приобретаться одним и тем же процессом множество раз, не блокируя его) Semaphore Семафор BoundedSemaphore Ограниченный семафор Event Событие Condition Переменная состояния Своим поведением эти объекты имитируют примитивы синхронизации идентичными именами, объявленные в модуле threading. За дополнитель- ной информацией обращайтесь к документации модуля threading. Следует отметить, что при работе с модулем multiprocessing обычно не при- ходится использовать блокировки, семафоры и другие подобные механиз- мы синхронизации, настолько же низкоуровневые, какие используются при работе с потоками. Операции send() и recv() каналов и операции put() и get() очередей уже обеспечивают синхронизацию. Однако в некоторых особых случаях вполне могут найти применение разделяемые значения и блокировки. Ниже приводится пример, в котором для передачи списка чисел с плавающей точкой от одного процесса другому вместо канала ис- пользуется разделяемый массив: import multiprocessing ёё class FloatChannel(object): def __init__(self, maxsize): self.buffer = multiprocessing.RawArray(‘d’,maxsize) self.buffer_len = multiprocessing.Value(‘i’) self.empty = multiprocessing.Semaphore(1) self.full = multiprocessing.Semaphore(0) def send(self,values): self.empty.acquire() # Продолжить, только если буфер пуст nitems = len(values) Модуль multiprocessing 535 self.buffer_len = nitems # Установить размер буфера self.buffer[:nitems] = values # Скопировать значения в буфер self.full.release() # Сообщить, что буфер заполнен def recv(self): self.full.acquire() # Продолжить, только если буфер заполнен values = self.buffer[:self.buffer_len.value] # Скопировать значение self.empty.release() # Сообщить, что буфер пуст return values ёё # Проверка производительности. Прием пакета сообщений def consume_test(count, ch): for i in xrange(count): values = ch.recv() ёё # Проверка производительности. Передача пакета сообщений def produce_test(count, values, ch): for i in xrange(count): ch.send(values) ёё if __name__ == ‘__main__’: ch = FloatChannel(100000) p = multiprocessing.Process(target=consume_test, args=(1000,ch)) p.start() values = [float(x) for x in xrange(100000)] produce_test(1000, values, ch) print(“Конец”) p.join() Подробное изучение этого примера я оставлю вам, в качестве самостоя- тельного упражнения. Однако замечу, что при проверке производительно- сти на компьютере автора отправка огромного списка чисел с плавающей точкой с помощью объекта FloatChannel выполняется примерно на 80 про- центов быстрее, чем отправка того же списка с помощью объекта Pipe (ко- торый вынужден преобразовывать все значения в последовательную форму и обратно с помощью модуля pickle). Управляемые объекты В отличие от потоков управления, процессы не поддерживают разделяе- мые объекты. Несмотря на имеющуюся возможность создавать разделяе- мые значения и массивы, как было показано в предыдущем разделе, этот прием не может использоваться для передачи более сложных объектов языка Python, таких как словари, списки или экземпляры пользователь- Python, таких как словари, списки или экземпляры пользователь- , таких как словари, списки или экземпляры пользователь- ских классов. Однако модуль multiprocessing предоставляет способ совмест- ного использования объектов, при условии, что они будут действовать под управлением так называемого менеджера. Менеджер – это отдельный до- черний процесс, в котором существует действующий объект и который играет роль сервера. Другие процессы обращаются к разделяемым объек- там с помощью специальных промежуточных прокси-объектов, которые действуют как клиенты сервера менеджера. Самый простой способ создать разделяемый объект заключается в том, чтобы вызвать функцию Manager(). 536 Глава 20. Потоки и многозадачность Manager() Создает и запускает отдельный процесс сервера менеджера. Возвращает экземпляр класса SyncManager, который определен в подмодуле multiprocess- ing.managers Экземпляр m класса SyncManager, возвращаемый функцией Manager(), обла- дает группой методов, создающих разделяемые объекты и возвращающих прокси-объекты, которые могут использоваться для доступа к разделяе- мым объектам. Обычно создание менеджера и создание разделяемых объ- ектов с помощью его методов производится перед запуском любых новых процессов. Ниже перечислены доступные методы: m.Array(typecode, sequence) Создает на стороне сервера разделяемый объект типа Array и возвраща- ет прокси-объект для доступа к нему. Описание аргументов можно найти в разделе «Совместно используемые данные и синхронизация». m.BoundedSemaphore([value]) Создает на стороне сервера разделяемый экземпляр threading.BoundedSema- .BoundedSema- BoundedSema- phore и возвращает прокси-объект для доступа к нему. m.Condition([lock]) Создает на стороне сервера разделяемый экземпляр threading.Condition и возвращает прокси-объект для доступа к нему. В аргументе lock переда- ется прокси-объект, созданный вызовом метода m.Lock() или m.Rlock(). m.dict([args]) Создает на стороне сервера разделяемый объект типа dict и возвращает прокси-объект для доступа к нему. Этот метод принимает те же аргументы, что и встроенная функция dict(). m.Event() Создает на стороне сервера разделяемый экземпляр threading.Event и воз- вращает прокси-объект для доступа к нему. m.list([sequence]) Создает на стороне сервера разделяемый объект типа list и возвращает прокси-объект для доступа к нему. Этот метод принимает те же аргументы, что и встроенная функция list(). m.Lock() Создает на стороне сервера разделяемый экземпляр threading.Lock и возвра- щает прокси-объект для доступа к нему. m.Namespace() Создает на стороне сервера разделяемый объект пространства имен и воз- вращает прокси-объект для доступа к нему. Под пространством имен в данном случае подразумевается объект, напоминающий модуль на языке Python. Например, если допустить, что n – это прокси-объект для доступа к объекту пространства имен, то к его атрибутам можно обращаться с по- мощью оператора точки (.), например: n.name = value или value = n.name. Модуль multiprocessing 537 Следует заметить, что имя атрибута name в подобных операциях имеет боль- шое значение. Если имя атрибута name начинается с алфавитного символа, то его значением является часть разделяемого объекта, находящегося под управлением менеджера, и он будет доступен всем остальным процессам. Если имя атрибута name начинается с символа подчеркивания, этот атрибут является частью прокси-объекта и не будет доступен другим процессам. |