Программирование на Python 3. Руководство издательство СимволПлюс
Скачать 3.74 Mb.
|
474 Глава 9. Процессы и потоки В результате оба потока оказываются заблокированными, и програм ма оказывается в состоянии клинча, как показано на рис. 9.2. Изобразить данный конкретный случай взаимоблокировки оказалось достаточно просто, но выявить ситуации взаимоблокировки на прак тике бывает намного сложнее изза того что они не всегда очевидны. Некоторые библиотеки организации реализации механизмов многопо точной обработки данных способны распознавать потенциальные си туации взаимоблокировки и предупреждать о них, но, чтобы избежать их, все равно требуется участие человека. Существует один простой, но действенный прием, позволяющий избе жать взаимоблокировок, который заключается в строгом соблюдении порядка получения блокировок. Например, если правило гласит, что перед получением блокировки B всегда должна запрашиваться блоки ровка A и в потоке выполнения необходимо получить блокировку B, то, согласно правилу, поток сначала должен получить блокировку A. Тем самым гарантируется, что состояние взаимоблокировки, описан ное выше, никогда не возникнет, так как оба потока будут сначала пы таться получить блокировку A, и первый, кому это удастся, сможет за тем запросить блокировку B – при условии, что все потоки выполне ния следуют установленному правилу. Еще одна проблема, связанная с блокировкой, состоит в том, что, ко гда несколько потоков одновременно ожидают освобождения блоки ровки, они остаются заблокированными и не выполняют никакой по лезной работы. Эту ситуацию до некоторой степени можно смягчить, изменив стиль программирования так, чтобы минимизировать объе мы работ, выполняемых в контексте блокировки. В любой программе на языке Python имеется по крайней мере один по ток выполнения – главный поток. Чтобы создать дополнительные по токи, необходимо импортировать модуль threading и с его помощью создать необходимое число потоков. Создать потоки можно двумя спо собами: вызвать функцию threading.Thread() и передать ей вызывае мый объект или создать свой подкласс класса threading.Thread. В этой главе будут продемонстрированы оба способа. Прием, основанный на создании подкласса, обладает большей гибкостью и реализуется доста поток №1 поток №2 A B Рис. 9.2. Взаимоблокировка: два или более потоков пытаются перекрестно получить блокировки Делегирование работы потокам выполнения 475 точно просто. Подклассы могут переопределить метод __init__() (при этом они обязаны вызывать реализацию базового класса) и обязаны переопределить метод run() – в этом методе выполняется вся работа потока. Метод run() никогда не должен вызываться нашим программ ным кодом – поток запускается вызовом метода start(), а этот метод сам вызовет метод run(), когда он будет готов к этому. Ни один из дру гих методов класса threading.Thread не должен переопределяться, хотя добавление новых методов не возбраняется. Пример: многопоточная программа поиска слова В этом подразделе мы рассмотрим программный код программы grep wordt.py . Эта программа выполняет ту же самую работу, что и про грамма grepwordp.py, но в отличие от нее распределяет работу не меж ду несколькими процессами, а между несколькими потоками выполне ния. Схематическое изображение программы приводится на рис. 9.3. Одна из интересных особенностей программы состоит в том, что она вообще не имеет никаких блокировок. Такое стало возможным благо даря тому, что единственным совместно используемым элементом дан ных является список имен файлов, который реализован в виде объекта класса queue.Queue. Особенность класса queue.Queue заключается в том, что он сам выполняет все необходимые блокировки, поэтому всякий раз, когда мы выполняем добавление или удаление элементов, мы мо жем опираться на внутреннюю реализацию очереди, которая сама упо рядочивает доступ. В контексте потоков такое упорядочение означа ет, что в каждый конкретный момент времени доступом к данным об ладает только один поток выполнения. Еще одно преимущество ис пользования класса queue.Queue состоит в том, что нам не требуется распределять работу между потоками – мы просто добавляем в очередь элементы, которые требуется обработать, а рабочие потоки сами при нимаются за работу, когда они будут готовы к этому. Класс queue.Queue работает по принципу «первым пришел, первым ушел». Кроме того, в модуле queue имеется реализация списков queue.Li foQueue , которая работает по принципу «последним пришел, первым ушел», а также реализация очереди queue.PriorityQueue, которая состоит из двухэлементных кортежей (приоритет, элемент данных), где эле grepwordt.py главный поток поток №1 поток №2 поток №3 … Рис. 9.3. Многопоточная программа 476 Глава 9. Процессы и потоки менты с наименьшим значением приоритета обрабатываются первы ми. Для всех очередей можно определить максимальный размер – ес ли будет достигнут максимальный размер, очередь блокирует все по следующие попытки добавить элементы, пока из нее не будут удалены существующие элементы. Мы рассмотрим программу grepwordt.py, разделив ее на три части, и начнем с полного определения функции main(): def main(): opts, word, args = parse_options() filelist = get_files(args, opts.recurse) work_queue = queue.Queue() for i in range(opts.count): number = "{0}: ".format(i + 1) if opts.debug else "" worker = Worker(work_queue, word, number) worker.daemon = True worker.start() for filename in filelist: work_queue.put(filename) work_queue.join() Получение параметров командной строки и списка имен файлов вы полняется точно так же, как и раньше. После сбора всей необходимой информации создается очередь типа queue.Queue и затем выполняются циклы по числу создаваемых потоков (по умолчанию 7). Для каждого потока подготавливается строка, содержащая порядковый номер по тока при работе в отладочном режиме (пустая строка – в противном случае), и затем создается экземпляр класса Worker (подкласс класса threading.Thread ) – описание его свойства daemon приводится чуть ниже. После этого поток запускается на выполнение. В этот момент времени для него еще нет работы, поскольку очередь заданий еще пуста, поэто му поток тут же окажется заблокированным на попытке получить за дание из очереди. После того как будут созданы и запущены все потоки, функция выпол няет итерации по всем именам файлов, добавляя каждый из них в оче редь заданий. Как только в очередь будет добавлен первый файл, один из потоков сможет получить его и начать поиск слова, и это будет про исходить, пока все потоки не получат по файлу для выполнения своей работы. Как только поток завершит обработку файла, он тут же полу чит из очереди следующий файл, и так до тех пор, пока не будут обра ботаны все файлы. Обратите внимание, что такая организация распределения работы от личается от принятой в программе grepwordp.py, где каждому дочер нему процессу выделялся фрагмент списка, после чего дочерние про цессы последовательно обрабатывают файлы в своих списках. В случа ях, подобных этому, применение потоков выполнения дает более высо кую производительность. Например, если первые пять файлов в списке имеют очень большой размер, а остальные файлы очень маленькие, то Делегирование работы потокам выполнения 477 каждый большой файл будет обрабатываться отдельным потоком, так как каждый поток извлекает из очереди только одно задание за раз, что приведет к более равномерному распределению нагрузки. В про грамме gerpwordp.py, напротив, все большие файлы достанутся перво му дочернему процессу, а остальным процессам – маленькие файлы, поэтому на долю первого процесса может выпасть значительный объ ем работы, в то время как остальные процессы могут быстро завер шиться, сделав, по сути, не так много. Программа будет продолжать работу, пока имеется хотя бы один запу щенный поток выполнения. Это порождает определенную проблему, так как с технической точки зрения даже после выполнения всех зада ний потоки все равно считаются выполняющимися. Решение этой про блемы состоит в том, чтобы превратить потоки в демонов. Это позволит программе завершиться, как только в ней не останется ни одного рабо тающего потока, не являющегося демоном. Главный поток не является демоном, поэтому, как только главный поток завершится, программа завершит работу каждого потокадемона и завершится сама. Конечно, теперь может возникнуть противоположная проблема – после созда ния и запуска всех потоков нам необходимо гарантировать, что глав ный поток программы не завершится, пока не будет выполнена вся ра бота. Добиться этого можно вызовом метода queue.Queue. join() – этот метод блокирует вызывающий его поток, пока очередь не опустеет. Ниже приводится начало определения класса Worker: class Worker(threading.Thread): def __init__(self, work_queue, word, number): super().__init__() self.work_queue = work_queue self.word = word self.number = number def run(self): while True: try: filename = self.work_queue.get() self.process(filename) finally: self.work_queue.task_done() В методе __init__() в обязательном порядке должен вызываться метод __init__() базового класса. Аргумент work_queue – это та самая очередь заданий queue.Queue, которая совместно используется всеми потоками. Метод run() выполняет бесконечный цикл. Это типичная организация работы потокадемона, и в этом определенно есть смысл, так как зара нее неизвестно, сколько файлов предстоит обработать потоку. В каж дой итерации вызывается метод queue.Queue.get(), чтобы получить имя следующего файла для обработки. Этот вызов заблокирует выполне ние потока, если очередь окажется пуста, и нам не требуется преду 478 Глава 9. Процессы и потоки сматривать свои блокировки, так как все необходимые действия вы полняются классом queue.Queue автоматически. Получив файл, поток обрабатывает его, после чего необходимо сообщить очереди, что дан ное задание было выполнено, вызовом метода queue.Queue.task_done(), что совершенно необходимо для обеспечения корректной работы мето да queue.Queue.join(). Мы не показали реализацию функции process(), потому что, кроме строки с инструкцией def, ее программный код, начиная со строки pre vious = "" и до конца, остался тем же, что и в программе grepwordp child.py (на стр. 471). Последнее, что хотелось бы отметить: в состав примеров к книге вклю чена программа grepwordm.py, практически идентичная программе grepwordt.py , рассматривавшейся здесь, но в ней вместо модуля threa ding используется модуль multiprocessing. Ее программный код имеет ровно три отличия: первое – она импортирует модуль multiprocessing вместо модулей queue и threading; второе – класс Worker наследует класс multiprocessing.Process , а не threading.Thread и третье – очередь зада ний в ней имеет тип multiprocessing.JoinableQueue, а не queue.Queue. Модуль multiprocessing реализует функциональность, напоминающую потоки, используя механизм ветвления в системах, поддерживающих его (UNIX), и порождает дочерние процессы в системах, которые не поддерживают ветвление (Windows), благодаря чему механизм блоки ровок оказывается необходимым не всегда, а сами процессы будут вы полняться на всех ядрах процессора, доступных операционной систе ме. Пакет предоставляет несколько способов передачи данных между процессами, включая применение очереди, которая может использо ваться для распределения работы между процессами точно так же, как очередь queue.Queue использовалась для распределения заданий между потоками. Главное преимущество версии программы, в которой для обработки запускаются отдельные процессы, состоит в том, что на многоядерных процессорах она потенциально способна обеспечить более высокую производительность, поскольку процессы могут выполняться на всех доступных ядрах. Сравните это со стандартным интерпретатором Py thon (написанным на языке C и иногда называемым CPython), в кото ром имеется глобальная блокировка интерпретатора (Global Interpre ter Lock, GIL), что означает, что в каждый конкретный момент време ни может выполняться только один поток программного кода Python. Это ограничение является особенностью конкретной реализации и не обязательно применяется в других реализациях интерпретатора, та ких как Jython. 1 1 Краткое объяснение, для чего применяется глобальная блокировка интер претатора, приводится по адресу: www.python.org/doc/faq/library/#cant wegetridoftheglobalinterpreterlock и docs.python.org/api/threads.html. Делегирование работы потокам выполнения 479 Пример: многопоточная программа поиска дубликатов файлов Пример второй многопоточной программы по своей структуре напоми нает первую, но она более сложная. В ней используются две очереди – одна для заданий и вторая для результатов, а кроме того, имеется от дельный поток обработки результатов, который выводит их по мере поступления. Кроме того, демонстрируется прием создания подкласса класса threading.Thread и создание потока вызовом функции thread ing.Thread() , а также используется блокировка для упорядочения дос тупа к совместно используемым данным (тип dict). Программа findduplicatest.py является расширенной версией про граммы finddup.py, которая приводилась в главе 5. Она выполняет итерации по всем файлам в текущем каталоге (или в каталоге по ука занному пути) и рекурсивно выполняет обход подкаталогов. Она срав нивает размеры всех файлов с одинаковыми именами (точно так же, как программа finddup.py); для файлов с одинаковыми именами и оди наковыми размерами она вычисляет контрольную сумму по алгорит му MD5 (Message Digest), чтобы убедиться, что файлы действительно являются идентичными, и сообщает о таких файлах. Сначала мы рассмотрим функцию main(), разделив ее на четыре части: def main(): opts, path = parse_options() data = collections.defaultdict(list) for root, dirs, files in os.walk(path): for filename in files: fullname = os.path.join(root, filename) try: key = (os.path.getsize(fullname), filename) except EnvironmentError: continue if key[0] == 0: continue data[key].append(fullname) Каждый ключ словаря со значениями по умолчанию data представляет собой кортеж из двух элементов (размер и имя файла), где имя файла не включает путь к нему, а значение представляет собой список имен файлов (с путями к ним). Любой элемент, у которого в списке имеется более одного имени файла, потенциально может содержать дублика ты. Словарь заполняется в процессе итераций по всем файлам в задан ном пути, при этом пропускаются все файлы, для которых невозмож но получить размер (возможно, изза отсутствия необходимых прав доступа или потому что они не являются обычными файлами), а также файлы, имеющие нулевой размер (поскольку все файлы нулевого раз мера можно считать идентичными). 480 Глава 9. Процессы и потоки work_queue = queue.PriorityQueue() results_queue = queue.Queue() md5_from_filename = {} for i in range(opts.count): number = "{0}: ".format(i + 1) if opts.debug else "" worker = Worker(work_queue, md5_from_filename, results_queue, number) worker.daemon = True worker.start() Собрав исходные данные, можно приступать к созданию рабочих пото ков. Но сначала создается очередь заданий и очередь результатов. Оче редь заданий представляет собой очередь с приоритетами, поэтому она всегда возвращает элемент с наименьшим значением приоритета (или, в нашем случае, файлы с наименьшими размерами). Затем создается словарь, в котором каждый ключ – имя файла (включая путь к нему), а значение – контрольная сумма MD5. Назначение словаря состоит в том, чтобы избежать повторного вычисления контрольной суммы для одного и того же файла (поскольку эта процедура является доста точно дорогостоящей). Создав коллекцию для хранения совместно используемых данных, функция выполняет количество циклов, соответствующее количеству создаваемых потоков (по умолчанию семь циклов). Класс Worker напо минает одноименный класс, созданный в предыдущем примере, толь ко на этот раз его экземплярам передаются две очереди и словарь кон трольных сумм MD5. Как и прежде, каждый рабочий поток немедлен но запускается на выполнение и каждый из них тут же блокируется, пока в очереди не появятся свободные задания. results_thread = threading.Thread( target=lambda: print_results(results_queue)) results_thread.daemon = True results_thread.start() Вместо создания отдельного подкласса threading.Thread для обработки результатов мы создали функцию, которая передается функции thre ading.Thread() . Эта функция возвращает отдельный поток, который вызовет указанную функцию сразу же после запуска. Мы передаем функции очередь с результатами (которая, конечно же, пока пустая), поэтому поток тут же оказывается заблокированным. К этому моменту мы создали все рабочие потоки и поток обработки ре зультатов, причем все они оказались заблокированы в ожидании появ ления заданий. for size, filename in sorted(data): names = data[size, filename] if len(names) > 1: work_queue.put((size, names)) Делегирование работы потокам выполнения 481 work_queue.join() results_queue.join() Теперь выполняются итерации по элементам словаря data, и для всех кортежей, состоящих из двух элементов (размер, имя), для которых имеется список из двух или более потенциальных дубликатов файлов, размер и имена файлов с путями добавляются в виде элементов в оче редь заданий. Поскольку очередь является экземпляром класса из мо дуля queue, нам не нужно беспокоиться о блокировках. В заключение выполняется присоединение к очереди заданий и к оче реди с результатами, чтобы заблокировать главный поток программы до того момента, пока обе очереди не опустеют. Тем самым гарантиру ется, что программа будет продолжать работать, пока не будут выпол нены все задания и не будут выведены все результаты, после чего про грамма завершает свою работу. def print_results(results_queue): while True: try: results = results_queue.get() if results: print(results) finally: results_queue.task_done() Эта функция передается в виде аргумента функции threading.Thread() и вызывается, когда запускается данный поток. Функция выполняет бесконечный цикл, потому что она используется в потоке, который ра ботает в режиме демона. Эта функция просто извлекает результаты (многострочный текст) и выводит непустые строки, пока очередь с ре зультатами не опустеет. Начало определения класса Worker похоже на определение одноимен ного класса, который приводился выше: class Worker(threading.Thread): Md5_lock = threading.Lock() def __init__(self, work_queue, md5_from_filename, results_queue, number): super().__init__() self.work_queue = work_queue self.md5_from_filename = md5_from_filename self.results_queue = results_queue self.number = number def run(self): while True: try: size, names = self.work_queue.get() self.process(size, names) |