справочник по Python. мм isbn 9785932861578 9 785932 861578
Скачать 4.21 Mb.
|
545 вать четко оформленную процедуру их завершения, а не полагаться на механизм сборки мусора или принудительное завершение дочерних процессов, с использованием операции terminate(). • Использование менеджеров и прокси-объектов тесно связано с различ- ными понятиями распределенных вычислений (такими как распреде- ленные объекты). Хорошая книга о распределенных вычислениях мо- жет оказаться очень полезным справочником. • Модуль multiprocessing возник из сторонней библиотеки, известной как pyprocessing . Поиск советов по использованию этой библиотеки и ее опи- сание может принести неплохие результаты. • Несмотря на то что этот модуль можно использовать в операционной сис теме Windows, тем не менее вам следует тщательно проработать офи- Windows, тем не менее вам следует тщательно проработать офи- , тем не менее вам следует тщательно проработать офи- циальную документацию, чтобы понять самые тонкие особенности. На- пример, модуль multiprocessing предусматривает реализацию собствен- ного клона функции fork() из UNIX для запуска новых процессов в Win- UNIX для запуска новых процессов в Win- для запуска новых процессов в Win- Win- dows. Эта функция создает копию окружения родительского процесса и отправляет ее дочернему процессу с помощью канала. Вообще говоря, этот модуль больше подходит для использования в UNIX. • И самое главное – старайтесь сделать реализацию максимально про- стой. Модуль threading Модуль threading содержит определение класса Thread и реализацию раз- личных механизмов синхронизации, используемых в многопоточных про- граммах. Объекты класса Thread Класс Thread используется для представления отдельного потока управле- ния. Новый поток можно создать вызовом конструктора: Thread(group=None, target=None, name=None, args=(), kwargs={}) Создает новый экземпляр класса Thread. Аргумент group всегда получает значение None и зарезервирован для использования в будущем. В аргументе target передается вызываемый объект, который вызывается методом run() при запуске потока. По умолчанию получает значение None, которое означа- ет, что ничего вызываться не будет. Аргумент name определяет имя потока. По умолчанию генерируется уникальное имя вида “Thread-N”. В аргументе args передается кортеж позиционных аргументов для функции target, а в аргументе kwargs – словарь именованных аргументов для target. Экземпляр t класса Thread поддерживает следующие методы и атрибуты: t.start() Запускает поток вызовом метода run() в отдельном потоке управления. Этот метод может вызываться только один раз. 546 Глава 20. Потоки и многозадачность t.run() Э тот метод вызывается при запуске потока. По умолчанию вызывает функ- цию target, которая была передана конструктору. При желании можно создать свой класс, производный от класса Thread, и определить в нем соб- ственную реализацию метода run(). t.join([timeout]) Ожидает завершения потока или истечения указанного интервала време- ни. Аргумент timeout определяет максимальный период ожидания в секун- дах, в виде числа с плавающей точкой. Поток не может присоединяться к самому себе и будет ошибкой пытаться присоединиться к потоку до того, как он будет запущен. t.is_alive() Возвращает True, если поток t продолжает работу, и False – в противном случае. Поток считается действующим от момента вызова метода start() до момента, когда завершится метод run(). В старых программах можно встре- тить вызов метода с именем t.isAlive(), которое является синонимом этого метода. t.name Имя потока. Этот атрибут используется только для идентификации и мо- потока. Этот атрибут используется только для идентификации и мо- потока. Этот атрибут используется только для идентификации и мо- . Этот атрибут используется только для идентификации и мо- Этот атрибут используется только для идентификации и мо- жет принимать любые значения (желательно осмысленные, что может упростить отладку). В старых программах можно встретить вызовы мето- дов с именами t.getName() и t.setName(name), которые используются для ма- нипулирования именем потока. t.ident Целочисленный идентификатор потока. Если поток еще не был запущен, этот атрибут содержит значение None. t.daemon Логический флаг, указывающий, будет ли поток демоническим. Значе- ние этого атрибута должно устанавливаться до вызова метода start(). По умолчанию он получает значение, унаследованное от потока, создавшего его. Программа на языке Python завершается, когда не осталось ни одного активного, не демонического потока управления. Любая программа имеет главный поток, представляющий первоначальный поток управления, ко- торый не является демоническим. В старых программах можно встретить вызовы методов с именами t.setDaemon(flag) и t.isDaemon(), которые исполь- зуются для манипулирования этим значением. Ниже приводится пример, демонстрирующий, как создавать и запускать функции (или другие вызываемые объекты) в отдельных потоках управ- ления: import threading import time ёё def clock(interval): while True: Модуль threading 547 print(“Текущее время: %s” % time.ctime()) time.sleep(interval) ёё t = threading.Thread(target=clock, args=(15,)) t.daemon = True t.start() ёё Следующий пример демонстрирует, как определить тот же поток в виде класса: import threading import time ёё class ClockThread(threading.Thread): def __init__(self,interval): threading.Thread.__init__(self) self.daemon = True self.interval = interval def run(self): while True: print(“Текущее время: %s” % time.ctime()) time.sleep(self.interval) ёё t = ClockProcess(15) t.start() Когда объявляется собственный класс потока, в котором переопределяется метод __init__(), чрезвычайно важно не забыть вызвать конструктор базо- вого класса Thread.__init__(), как показано в примере. Если этого не сде- этого не сде- этого не сде- не сде- не сде- сде- сде- лать, вы столкнетесь с неприятной ошибкой. Кроме того, ошибкой будет пытаться переопределить какие-либо другие методы класса Thread, кроме методов run() и __init__(). Настройка атрибута daemon в этих примерах является характерной операци- ей при работе с потоками, которые выполняют бесконечный цикл. Обычно интерпретатор Python ожидает завершения всех потоков, прежде чем за- Python ожидает завершения всех потоков, прежде чем за- ожидает завершения всех потоков, прежде чем за- вершиться самому. Однако такое поведение часто бывает нежелательным, когда имеются никогда не завершающиеся фоновые потоки. Значение True в атрибуте daemon позволяет интерпретатору завершиться сразу же после выхода из главной программы. В этом случае демонические потоки просто уничтожаются. Объекты класса Timer Объекты класса Timer используются для вызова функций через определен- ное время. Timer(interval, func [, args [, kwargs]]) Создает объект таймера, который вызывает функцию func через interval се- се- кунд. В аргументах args и kwargs передаются позиционные и именованные аргументы для функции func. Таймер не запускается, пока не будет вызван метод start(). 548 Глава 20. Потоки и многозадачность Э кземпляр t класса Timer обладает следующими методами: t.start() Запускает таймер. Функция func, переданная конструктору Timer(), будет вызвана спустя указанное количество секунд, после вызова этого метода. t.cancel() Останавливает таймер, если функция еще не была вызвана. Объекты класса Lock Простейшая блокировка (или взаимоисключающая блокировка) – это ме- ханизм синхронизации, имеющий два состояния – «закрыто» и «откры- то». Для изменения состояния блокировки используются методы acquire() и release(). Если блокировка находится в состоянии «закрыто», любая по- пытка приобрести ее будет заблокирована до момента, пока она не будет освобождена. Если сразу несколько потоков управления пытаются приоб- рести блокировку, только один из них сможет продолжить работу, когда блокировка будет освобождена. Порядок, в каком потоки смогут продол- жить работу, заранее не определен. Новый экземпляр класса Lock создается с помощью конструктора: Lock() Создает новый экземпляр блокировки, которая изначально находится в со- стоянии «открыто». Экземпляр lock класса Lock поддерживает следующие методы: lock.acquire([blocking ]) Приобретает блокировку. Если блокировка находится в состоянии «закры- то», этот метод приостанавливает работу потока, пока блокировка не бу- дет освобождена. Если в аргументе blocking передать значение False, метод тут же вернет значение False, если блокировка не может быть приобретена, и True – если блокировку удалось приобрести. lock.release() Освобождает блокировку. Будет ошибкой пытаться вызвать этот метод, когда блокировка находится в состоянии «открыто» или из другого потока, не из того, где вызывался метод acquire(). Объекты класса RLock Реентерабельная блокировка – это механизм синхронизации, напоминаю- щий обычную блокировку, но которая в одном и том же потоке может быть приобретена множество раз. Эта особенность позволяет потоку, владею- щему блокировкой, выполнять вложенные операции acquire() и release(). В подобных ситуациях только самый внешний вызов метода release() дей- ствительно переведет блокировку в состояние «открыто». Новый экземпляр класса RLock создается с помощью конструктора: Модуль threading 549 RLock() Создает новый экземпляр реентерабельной блокировки. Экземпляр rlock класса RLock поддерживает следующие методы: rlock.acquire([blocking ]) Приобретает блокировку. В случае необходимости дальнейшая работа по- тока приостанавливается, пока блокировка не будет освобождена. Если перед вызовом метода блокировкой не владел ни один поток, она запирает- ся, а уровень рекурсии блокировки устанавливается равным 1. Если вызы- вающий поток уже владеет блокировкой, уровень рекурсии увеличивается на единицу и метод тут же возвращает управление. rlock.release() Уменьшает уровень рекурсии. Если значение уровня рекурсии достигло нуля, блокировка переводится в состояние «открыто». В противном случае блокировка остается в состоянии «закрыто». Эта функция должна вызы- ваться только из потока, который владеет блокировкой. Семафоры и ограниченные семафоры Семафор – это механизм синхронизации, основанный на счетчике, кото- рый уменьшается при каждом вызове метода acquire() и увеличивается при каждом вызове метода release(). Если счетчик семафора достигает нуля, метод acquire() приостанавливает работу потока, пока какой-либо другой поток не вызовет метод release(). Semaphore([value]) Создает новый семафор. Аргумент value определяет начальное значение счетчика. При вызове без аргументов счетчик получает значение 1. Экземпляр s класса Semaphore поддерживает следующие методы: s.acquire([blocking]) Приобретает семафор. Если внутренний счетчик имеет значение больше нуля, этот метод уменьшает его на 1 и тут же возвращает управление. Если значение счетчика равно нулю, этот метод приостанавливает работу пото- ка, пока другой поток не вызовет метод release(). Аргумент blocking имеет тот же смысл, что и в методе acquire() экземпляров классов Lock и RLock. s.release() Увеличивает внутренний счетчик семафора на 1. Если перед вызовом ме- тода счетчик был равен нулю и имеется другой поток, ожидающий осво- бождения семафора, этот поток возобновляет работу. Если сразу несколько потоков управления пытаются приобрести семафор, только в одном из них метод acquire() вернет управление. Порядок, в каком потоки смогут про- должить работу, заранее не определен. BoundedSemaphore([value]) Создает новый семафор. Аргумент value определяет начальное значение счетчика. При вызове без аргументов счетчик получает значение 1. Огра- 550 Глава 20. Потоки и многозадачность ниченный семафор BoundedSemaphore действует точно так же, как и обыч- ный семафор Semaphore, за исключением того, что количество вызовов мето- да release() не может превышать количество вызовов метода acquire(). Тонкое отличие семафоров от взаимоисключающих блокировок состоит в том, что семафоры могут использоваться в качестве сигналов. Например, методы acquire() и release() могут вызываться из разных потоков управ- ления и обеспечивать взаимодействие между потоками поставщика и по- требителя. produced = threading.Semaphore(0) consumed = threading.Semaphore(1) ёё def producer(): while True: consumed.acquire() produce_item() produced.release() ёё def consumer(): while True: produced.acquire() item = get_item() consumed.release() Такой способ обмена сигналами, как показан в этом примере, часто реали- зуется с помощью переменных состояния, о которых рассказывается ниже. События События используются для организации взаимодействия потоков. Один поток «посылает» событие, а один или более других потоков ожидают его. Экземпляр класса Event обладает внутренним флагом, который можно установить методом set() и сбросить методом clear(). Метод wait() приоста- навливает работу потока, пока флаг не будет установлен. Event() Создает новый экземпляр класса Event со сброшенным внутренним флагом. Экземпляр e класса Event поддерживает следующие методы: e.is_set() Возвращает True, если внутренний флаг установлен. В старых программах этот метод вызывается под именем isSet(). e.set() Устанавливает внутренний флаг. После этого все потоки, ожидавшие, пока флаг будет установлен, продолжат свою работу. e.clear() Сбрасывает внутренний флаг. e.wait([timeout]) Приостанавливает работу потока, пока не будет установлен внутренний флаг. Если флаг уже был установлен, возвращает управление немедленно. Модуль threading 551 В противном случае работа потока приостанавливается, пока другой поток не установит флаг вызовом метод set() или пока не истечет интервал вре- мени timeout. В аргументе timeout передается число с плавающей точкой, определяющее предельное время ожидания в секундах. Экземпляры класса Event могут использоваться как средство извещения других потоков, но они не должны использоваться для реализации обме- на извещениями, который обычен в схемах взаимодействий поставщик/ потребитель. Например, избегайте такого способа использования событий: evt = Event() ёё def producer(): while True: # создать элемент ... evt.signal() ёё def consumer(): while True: # Дождаться появления элемента evt.wait() # Обработать элемент ... # Сбросить событие и ждать появления следующего элемента evt.clear() Такая реализация работает неустойчиво, потому что поставщик может воспроизвести новый элемент в промежутке между вызовами методов evt.wait() и evt.clear(). В этой ситуации, сбросив событие, потребитель не сможет обнаружить новый элемент, пока поставщик не создаст еще один. В лучшем случае программа будет испытывать небольшие отклонения, вы- ражающиеся в непредсказуемых задержках обработки новых элементов, а в худшем она может зависнуть, потеряв событие. Для решения подобных проблем лучше использовать переменные состояния. Переменные состояния Переменная состояния (condition variable) – это механизм синхрониза- ции, надстроенный на уже имеющейся блокировке, который используется потоками, когда требуется дождаться наступления определенного состоя- ния или появления события. Переменные состояния обычно используются в схемах поставщик-потребитель, когда один поток производит данные, а другой обрабатывает их. Новый экземпляр класса Condition создается с помощью конструктора: Condition([lock]) Создает новую переменную состояния. В необязательном аргументе lock передается экземпляр класса Lock или RLock. При вызове без аргумента для использования совместно с переменной состояния создается новый экзем- пляр класса RLock. Экземпляр cv класса Condition поддерживает следующие методы: 552 Глава 20. Потоки и многозадачность cv.acquire(*args) Приобретает блокировку, связанную с переменной состояния. Этот метод вызывает метод acquire(*args) блокировки и возвращает результат. cv.release() Освобождает блокировку, связанную с переменной состояния. Этот метод вызывает метод release() блокировки. cv.wait([timeout]) Ожидает, пока не будет получено извещение или пока не истечет время ожидания. Этот метод должен вызываться только после того, как вызы- вающий поток приобретет блокировку. При вызове метода блокировка освобождается, а поток приостанавливается, пока другим потоком не бу- дет вызван метод notify() или notifyAll() переменной состояния. После воз- обновления метод тут же повторно приобретает блокировку и возвращает управление вызывающему потоку. В аргументе timeout передается число с плавающей точкой, определяющее предельное время ожидания в секун- дах. По истечении указанного интервала времени блокировка снова приоб- ретается, и поток возобновляет работу. cv.notify([n]) Возобновляет работу одного или более потоков, ожидающих изменения данной переменной состояния. Этот метод должен вызываться только по- сле того, как поток приобретет блокировку, и ничего не делает, если от- сутствуют потоки, ожидающие изменения этой переменной состояния. Аргумент n определяет количество потоков, которые смогут возобновить работу, и по умолчанию получает значение 1. Метод wait() не возвращает управление после возобновления потока, пока не сможет повторно приоб- рести блокировку. cv.notify_all() Возобновляет работу всех потоков, ожидающих изменения переменной со- стояния. В старых программах этот метод вызывается под именем notify- All() Ниже приводится пример использования переменной состояния, который можно использовать как заготовку: cv = threading.Condition() def producer(): while True: cv.acquire() produce_item() cv.notify() cv.release() ёё def consumer(): while True: cv.acquire() while not item_is_available(): cv.wait() # Ожидать появления нового элемента Модуль threading 553 cv.release() consume_item() Т онкость в использовании переменных состояния заключается в том, что при наличии нескольких потоков, ожидающих изменения одной и той же переменной состояния, метод notify() может возобновить работу одного или более из них (конкретное поведение часто зависит от операционной си- стемы). Вследствие этого существует вероятность, что после возобновления работы поток обнаружит, что интересующее его состояние уже отсутствует. Это объясняет, например, почему в функции consumer() используется цикл while . Если поток возобновил работу, но элемент уже был обработан другим потоком, он просто опять переходит к ожиданию следующего извещения. Работа с блокировками При работе с любыми механизмами синхронизации, такими как Lock, RLock или Semaphore, следует быть очень внимательными. Ошибки в управлении блокировками часто приводят к взаимоблокировкам потоков и к состоя- нию гонки за ресурсами. Программный код, использующий блокировки, должен гарантировать их освобождение даже в случае появления исключе- ний. Ниже приводится типичный пример такого программного кода: try: lock.acquire() # критический раздел инструкции ... finally: lock.release() Кроме того, все блокировки поддерживают протокол менеджера контекста, что позволяет писать более ясный код: with lock: # критический раздел инструкции ... В этом примере блокировка автоматически приобретается инструкцией with и освобождается, когда поток управления выходит за пределы кон- текста. Также следует избегать писать программный код, который в любой момент времени обладал бы более чем одной блокировкой одновременно. Напри- мер: with lock_A: # критический раздел A инструкции ... with lock_B: # критический раздел B инструкции ... 554 Глава 20. Потоки и многозадачность Обычно такой код является отличным источником непонятных взаимобло- кировок в программе. Несмотря на существование стратегий, позволяю- щих избегать взаимоблокировок (например, иерархические блокировки), лучше все-таки полностью отказаться от манеры писать такой код. Приостановка и завершение потока Потоки не имеют методов для принудительного их завершения или при- остановки. Отсутствие этих методов не является недоработкой, а обуслов- лено сложностями, свойственными процессу разработки многопоточных программ. Например, если поток владеет блокировкой, то принудительное его завершение или приостановка может вызвать зависание всего прило- жения. Кроме того, обычно нет никакой возможности просто взять и «осво- бодить все блокировки» по завершении, потому что при сложной процеду- ре синхронизации потоков часто бывает необходимо точно соблюсти после- довательность операций приобретения и освобождения блокировок. Если в программе потребуется возможность завершения или приостановки потока, ее придется реализовать самостоятельно. Обычно для этого поток проверяет в цикле свое состояние и определяет момент, когда он должен завершиться. Например: class StoppableThread(threading.Thread): def __init__(self): threading.Thread.__init__() self._terminate = False self._suspend_lock = threading.Lock() def terminate(self): self._terminate = True def suspend(self): self._suspend_lock.acquire() def resume(self): self._suspend_lock.release() def run(self): while True: if self._terminate: break self._suspend_lock.acquire() self._suspend_lock.release() инструкции ... Имейте в виду, что чтобы обеспечить надежную работу при таком подходе, поток не должен выполнять какие-либо операции ввода-вывода, которые могут быть заблокированы. Например, если поток приостанавливается в ожидании поступления новых данных, он не может быть завершен, пока не завершит эту операцию. По этой причине желательно ограничивать ожидание в операциях ввода-вывода некоторым интервалом времени, ис- пользовать неблокирующие версии операций ввода-вывода и использовать другие дополнительные возможности, чтобы гарантировать, что проверка необходимости завершения будет выполняться достаточно часто. Модуль threading 555 Вспомогательные функции Для работы с потоками управления имеются следующие вспомогательные функции: active_count() Возвращает текущее количество активных объектов класса Thread. current_thread() Возвращает объект класса Thread, соответствующий вызывающему потоку управления. enumerate() Возвращает список всех активных объектов класса Thread. local() Возвращает объект local, который служит хранилищем локальных для по- тока данных. Для каждого потока этот объект гарантированно будет уни- кальным. setprofile(func) Устанавливает функцию, которая будет использоваться для профилирова- ния всех создаваемых потоков. Функция func будет передаваться функции sys.setprofile() перед запуском каждого потока. settrace(func) Устанавливает функцию, которая будет использоваться для трассировки всех создаваемых потоков. Функция func будет передаваться функции sys. settrace() перед запуском каждого потока. stack_size([size]) Возвращает размер стека, который будет использоваться при создании новых потоков. Если в необязательном аргументе size функции переда- ется целое число, оно будет определять размер стека для вновь создавае- мых потоков. Для обеспечения переносимости программы в аргументе size следует передавать значения от 32 768 (32 Кбайта) и выше, кратные 4096 (4 Кбайта). Если эта операция не поддерживается системой, возбуждается исключение ThreadError. Глобальная блокировка интерпретатора Интерпретатор Python выполняется под защитой блокировки, которая по- Python выполняется под защитой блокировки, которая по- выполняется под защитой блокировки, которая по- зволяет выполняться только одному потоку управления в каждый конкрет- ный момент времени, даже если в системе имеется несколько процессоров. Это обстоятельство существенно ограничивает выгоды, которые могло бы принести использование потоков в программах, выполняющих массивные вычисления. Фактически использование потоков в подобных програм- мах часто приводит к существенному ухудшению производительности по сравнению с однопоточными программами, выполняющими ту же рабо- ту. По этой причине потоки управления должны использоваться только в программах, основной задачей которых является выполнение операций 556 Глава 20. Потоки и многозадачность ввода-вывода, таких как сетевые серверы. Для решения задач, связанных с массивными вычислениями, лучше использовать модули расширений на языке C или задействовать модуль multiprocessing. Расширения на языке C имеют возможность освободить блокировку интерпретатора и выполнять- ся параллельно, при условии, что они никак не будут взаимодействовать с интерпретатором после освобождения блокировки. Модуль multiprocess- ing позволяет переложить работу на независимые дочерние процессы, ко- торые не ограничиваются этой блокировкой. Разработка многопоточных программ Несмотря на имеющуюся возможность писать на языке Python тради- ционные многопоточные программы, используя различные комбинации блокировок и других механизмов синхронизации, существует еще один стиль программирования, который обладает преимуществами перед всеми остальными, – попытаться организовать многопоточную программу как коллекцию независимых задач, взаимодействующих между собой с помо- щью очередей сообщений. Об этом рассказывается в следующем разделе (модуль queue). Модуль queue (Queue) Модуль queue (в Python 2 он называется Queue) реализует различные типы очередей, поддерживающие возможность доступа из множества потоков и обеспечивающие сохранность информации при обмене данными между несколькими потоками управления. Модуль queue определяет три различных класса очередей: Queue([maxsize]) Создает очередь типа FIFO (first-in first-out – первым пришел, первым вы- FIFO (first-in first-out – первым пришел, первым вы- (first-in first-out – первым пришел, первым вы- first-in first-out – первым пришел, первым вы- -in first-out – первым пришел, первым вы- in first-out – первым пришел, первым вы- first-out – первым пришел, первым вы- first-out – первым пришел, первым вы- -out – первым пришел, первым вы- out – первым пришел, первым вы- – первым пришел, первым вы- шел). Аргумент maxsize определяет максимальное количество элементов, которое может поместиться в очередь. При вызове без аргумента или когда значение maxsize равно 0, размер очереди не ограничивается. LifoQueue([maxsize]) Создает очередь типа LIFO (last-in, first-out – последним пришел, первым вышел), которая также известна, как стек. PriorityQueue([maxsize]) Создает очередь с поддержкой приоритетов, в которой все элементы упо- рядочиваются по приоритетам, от низшего к высшему. Элементами очере- ди этого типа могут быть только кортежи вида (priority, data), где поле priority является числом. Экземпляр q любого из классов очередей обладает следующими методами: q.qsize() Возвращает примерный размер очереди. Так как другие потоки могут до- бавлять и извлекать элементы, результат вызова этой функции не может считаться надежным. Модуль queue (Queue) 557 q.empty() Возвращает True, если в момент вызова очередь была пустой, и False – в про- в про- про- про- тивном случае. q.full() Возвращает True, если в момент вызова очередь была полной, и False – в противном случае. q.put(item [, block [, timeout]]) Добавляет элемент item в очередь. Если необязательный аргумент block имеет значение True (по умолчанию), в случае отсутствия свободного про- странства в очереди вызывающий поток будет приостановлен. Иначе (когда в аргументе block передается значение False) в случае отсутствия свобод- ного пространства в очереди будет возбуждено исключение Full. Аргумент timeout определяет предельное время ожидания в секундах. По истечении времени ожидания будет возбуждено исключение Full. q.put_nowait(item) Соответствует вызову q.put(item, False). q.get([block [, timeout]]) Удаляет и возвращает элемент из очереди. Если необязательный аргумент block имеет значение True (по умолчанию), в случае отсутствия элементов в очереди вызывающий поток будет приостановлен. Иначе (когда в аргу- менте block передается значение False) в случае отсутствия элементов будет возбуждено исключение Empty. Аргумент timeout определяет предельное время ожидания в секундах. По истечении времени ожидания будет воз- буждено исключение Empty. q.get_nowait() Соответствует вызову q.get(0). q.task_done() Используется потребителем, чтобы сообщить, что элемент очереди был об- работан. Если используется, этот метод должен вызываться один раз для каждого элемента, удаленного из очереди. q.join() Приостанавливает поток, пока не будут удалены и обработаны все элемен- ты очереди. Возвращает управление только после того, как для каждого элемента очереди будет вызван метод q.task_done(). Пример использования очереди в потоках Разработку многопоточных программ часто можно упростить за счет ис- пользования очередей. Например, вместо того, чтобы использовать разде- ляемые данные, доступ к которым необходимо осуществлять под защитой блокировки, потоки могут обмениваться информацией с помощью оче- редей. В этой модели поток, занимающийся обработкой данных, обычно играет роль потребителя. Ниже приводится пример, иллюстрирующий эту концепцию: 558 Глава 20. Потоки и многозадачность import threading from queue import Queue # Use from Queue on Python 2 ёё class WorkerThread(threading.Thread): def __init__(self,*args,**kwargs): threading.Thread.__init__(self,*args,**kwargs) self.input_queue = Queue() def send(self,item): self.input_queue.put(item) def close(self): self.input_queue.put(None) self.input_queue.join() def run(self): while True: item = self.input_queue.get() if item is None: break # Обработать элемент # (замените инструкцию print какими-нибудь полезными операциями) print(item) self.input_queue.task_done() # Конец. Сообщить, что сигнальная метка была принята, и выйти self.input_queue.task_done() return ёё # Пример использования w = WorkerThread() w.start() w.send(“hello”) # Отправить элемент на обработку (с помощью очереди) w.send(“world”) w.close() Э тот класс проектировался очень тщательно. Во-первых, можно заметить, что программный интерфейс этого класса представляет собой подмноже- ство методов объектов класса Connection, которые создаются каналами в модуле multiprocessing. Это обеспечивает возможность дальнейшего рас- ширения. Например, позднее обрабатывающий поток может быть вынесен в отдельный процесс без переделки программного кода, который посылает данные этому потоку. Во-вторых, программный интерфейс предусматривает возможность завер- шения потока. Метод close() помещает в очередь сигнальную метку, кото- рая в свою очередь вызывает завершение потока. Наконец, программный интерфейс в значительной степени напоминает ин- терфейс сопрограмм. Если для выполнения операций не требуется прибе- гать к блокировкам, метод run() можно будет реализовать как сопрограмму и вообще обойтись без потоков. Этот последний способ может обеспечить более высокую производительность, потому что в нем отсутствует необхо- димость переключения контекста потоков. Сопрограммы и микропотоки 559 Сопрограммы и микропотоки В некоторых типах приложений вполне возможно реализовать коопера- тивную многозадачность в пространстве пользователя, основанную на использовании диспетчера задач и набора генераторов или сопрограмм. Иногда их называют микропотоками, однако могут использоваться и дру- гие термины – иногда их называют тасклетами (tasklets), зелеными по- токами 1 (green threads), гринлетами (greenlets) и так далее. Обычно этот прием используется в программах, где необходимо управлять большими коллекциями открытых файлов или сокетов. В качестве примера можно привести сетевой сервер, который одновременно должен управлять тыся- чами соединений с клиентами. Вместо того чтобы создавать тысячи пото- ков управления, можно использовать асинхронные операции ввода-вывода или операции опроса (модуль select) в комбинации с диспетчером задач, который обрабатывает события ввода-вывода. В основе этой методики лежит тот факт, что инструкция yield, используе- мая в функциях-генераторах и сопрограммах, приостанавливает работу функции, пока не будет вызван метод next() или send(). Это обеспечивает возможность реализации кооперативной многозадачности между множе- ством функций-генераторов на основе использования цикла обработки со- бытий. Эту идею иллюстрирует следующий пример: def foo(): for n in xrange(5): print(“Я – foo %d” % n) yield ёё def bar(): for n in xrange(10): print(“Я – bar %d” % n) yield ёё def spam(): for n in xrange(7): print(“Я – spam %d” % n) yield ёё # Создать и заполнить очередь задач from collections import deque taskqueue = deque() taskqueue.append(foo()) # Добавить несколько задач (генераторов) taskqueue.append(bar()) taskqueue.append(spam()) ёё # Запустить все задачи while taskqueue: # Перейти к следующей задаче task = taskqueue.pop() try: 1 Зеленые потоки и гринлеты иногда называют «легковесными потоками». – Прим. перев. 560 Глава 20. Потоки и многозадачность # Выполнить до следующей инструкции yield и опять поставить в очередь next(task) taskqueue.appendleft(task) except StopIteration: # Задача завершилась pass Э тот прием едва ли можно рекомендовать для использования в програм- мах, выполняющих массивные вычисления. Он в большей степени под- ходит для распределения работы между задачами, выполняющими опера- ции ввода-вывода, опрос или обработку событий. Более сложный пример использования этого приема можно найти в разделе с описанием модуля select , в главе 21 «Работа с сетью и сокеты». |