Главная страница
Навигация по странице:

  • Многопоточный веб-краулинг

  • Состояния гонки и очереди

  • Многопроцессный веб-краулинг

  • Пример многопроцессного веб-краулинга

  • Обмен данными между процессами

  • Многопроцессный веб-краулинг: еще один подход

  • Современный_скрапинг_веб_сайтов_с_помощью_Python_2021_Райан_М. Руководство по решению его наиболее распространенных задач


    Скачать 3.96 Mb.
    НазваниеРуководство по решению его наиболее распространенных задач
    Дата01.03.2023
    Размер3.96 Mb.
    Формат файлаpdf
    Имя файлаСовременный_скрапинг_веб_сайтов_с_помощью_Python_2021_Райан_М.pdf
    ТипРуководство
    #961920
    страница3 из 6
    1   2   3   4   5   6
    Глава 16. Параллельный веб-краулинг
    Веб-краулинг выполняется быстро. По крайней мере это, как правило, гораздо быстрее, чем если нанять дюжину стажеров,
    которые станут вручную копировать данные из Интернета!
    Конечно, развитие технологии и гедонистическое нетерпение в какой-то момент приведут к тому, что вам заявят, будто даже это «недостаточно быстро». Обычно в подобные моменты люди начинают задумываться о распределенных вычислениях.
    В отличие от большинства других технологий веб-краулинг часто попросту невозможно улучшить, «бросая больше циклов на амбразуру». Быстрое выполнение одного процесса еще не означает, что с двумя процессами задача будет решена в два раза быстрее. А выполнение трех процессов может привести к блокировке вас на удаленном сервере, который вы уже заклевали своими запросами!
    Однако есть случаи, когда параллельный веб-краулинг или запуск параллельных потоков/процессов могут быть полезны:
    • сбор данных не из одного, а из нескольких источников (с нескольких удаленных серверов);
    • выполнение длинных или сложных операций с собранными данными (например, анализ изображений или OCR),
    которые можно совершать параллельно с извлечением данных;
    • сбор данных из крупного веб-сервиса, где вы платите за каждый запрос или где создание нескольких подключений к сервису не выходит за рамки пользовательского соглашения.

    Процессы или потоки
    Python поддерживает как многопроцессность, так и многопоточность. И та и другая обработка в итоге имеют одну и ту же цель: одновременное решение двух задач программирования вместо выполнения программы более традиционным линейным способом.
    В информатике каждый процесс, работающий в операционной системе, может иметь несколько потоков. У него есть своя выделенная память — таким образом, несколько потоков могут обращаться к одной и той же памяти, а несколько процессов — не могут и должны передавать информацию явно.
    Часто считается, что использовать многопоточное программирование и выполнять задачи в отдельных потоках с общей памятью проще, чем применять многопроцессное программирование. Но за это удобство приходится платить.
    Глобальная блокировка интерпретатора Python (Global
    Interpreter
    Lock,
    GIL) предотвращает одновременное выполнение разными потоками одной и той же строки кода.
    GIL гарантирует отсутствие повреждений общей памяти,
    совместно используемой всеми процессами (например, не случится так, что в одни и те же байты памяти наполовину запишется одно значение, а наполовину — другое). Такая блокировка позволяет писать многопоточные программы,
    всегда точно зная, что именно вы получите в одной и той же строке, но может создавать и узкие места.
    Многопоточный веб-краулинг
    В Python 3.x используется модуль the_thread; модуль thread устарел.

    В следующем примере показано использование нескольких потоков для выполнения задачи:
    import _thread import time def print_time(threadName, delay, iterations):
    start = int(time.time())
    for i in range(0,iterations):
    time.sleep(delay)
    seconds_elapsed = str(int(time.time())
    - start)
    print ("{} {}".format(seconds_elapsed,
    threadName))
    try:
    _thread.start_new_thread(print_time,
    ('Fizz', 3, 33))
    _thread.start_new_thread(print_time,
    ('Buzz', 5, 20))
    _thread.start_new_thread(print_time,
    ('Counter', 1, 100))
    except:
    print ('Error: unable to start thread')
    while 1:
    pass
    Это пример классического программного теста FizzBuzz
    (http://wiki.c2.com/?Fizz BuzzTest) с несколько измененным выводом результатов:
    1 Counter

    2 Counter
    3 Fizz
    3 Counter
    4 Counter
    5 Buzz
    5 Counter
    6 Fizz
    6 Counter
    Скрипт запускает три потока, один из которых каждые три секунды выводит слово Fizz, другой каждые пять секунд —
    Buzz, а третий — каждую секунду слово Counter.
    После запуска потоков основной поток выполнения запускает цикл while1, благодаря которому программа (и ее дочерние потоки) продолжает работать до тех пор, пока пользователь не нажмет Ctrl+C, чтобы остановить выполнение программы.
    Вместо того чтобы выводить слова Fizz и Buzz, можно выполнять в потоках какую-либо полезную задачу, например,
    собрать данные с сайта:
    from urllib.request import urlopen from bs4 import BeautifulSoup import re import random import _thread import time def get_links(thread_name, bs):
    print('Getting links in
    {}'.format(thread_name))
    return bs.find('div',
    {'id':'bodyContent'}).find_all('a',
    href=re.compile('^(/wiki/)((?!:).)*$'))
    # Определяем функцию для потока def scrape_article(thread_name, path):
    html
    =
    urlopen('http://en.wikipedia.org{}'.format(path
    ))
    time.sleep(5)
    bs = BeautifulSoup(html, 'html.parser')
    title = bs.find('h1').get_text()
    print('Scraping
    {} in thread
    {}'.format(title, thread_name))
    links = get_links(thread_name, bs)
    if len(links) > 0:
    newArticle = links[random.randint(0,
    len(links)-1)].attrs['href']
    print(newArticle)
    scrape_article(thread_name, newArticle)
    # Создаем следующие два потока:
    try:
    _thread.start_new_thread(scrape_article,
    ('Thread 1', '/wiki/Kevin_Bacon',))
    _thread.start_new_thread(scrape_article,
    ('Thread 2', '/wiki/Monty_Python',))
    except:
    print ('Error: unable to start threads')
    while 1:
    pass

    Обратите внимание, что мы включили в код следующую строку:
    time.sleep(5)
    Поскольку мы сканируем «Википедию» почти вдвое быстрее, чем если бы поток был один, эта строка предотвращает чрезмерную нагрузку, которую скрипт мог бы создать на серверы «Википедии». На практике при работе с сервером, где количество запросов не является проблемой,
    данную строку следует удалить.
    А если немного переписать код, чтобы он отслеживал статьи, которые потоки уже встречали, с целью исключить повторное посещение статей? Для этого можно использовать список в многопоточной среде — точно так же, как и в однопоточной:
    visited = []
    def get_links(thread_name, bs):
    print('Getting links in
    {}'.format(thread_name))
    links
    = bs.find('div',
    {'id':'bodyContent'}).find_all('a',
    href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link for link in links if link not in visited]
    def scrape_article(thread_name, path):
    visited.append(path)
    Обратите внимание: первым действием, которое выполняет scrape_article
    , является добавление пути в список посещенных путей. Это уменьшает, но не исключает полностью
    вероятность того, что веб-скрапер обработает данную страницу дважды.
    Если вам особенно не повезет, то оба потока наткнутся на один и тот же путь в одно и то же время. Оба увидят, что этого пути нет в списке посещенных, и, соответственно,
    одновременно добавят его в список и проведут веб-скрапинг.
    Однако на практике такое вряд ли случится вследствие скорости выполнения и количества страниц, содержащихся в
    «Википедии».
    Это пример состояния гонки. Такие состояния вызывают сложности при отладке даже у опытных программистов, так что важно определить, насколько ваш код способен создавать подобные ситуации, оценить их вероятность и предвидеть серьезность возможных последствий.
    В случае конкретно этого состояния гонки, когда веб- скрапер дважды обрабатывает одну и ту же страницу,
    возможно, и не стоит переписывать код.
    Состояния гонки и очереди
    Мы могли бы наладить коммуникацию между потоками с помощью списков, однако те не предназначены специально для обмена данными между потоками и их неправильное использование вполне может привести к тому, что программа будет выполняться медленно или даже возникнут ошибки вследствие состояния гонки.
    Списки отлично подходят для добавления или чтения элементов. Но с удалением элементов в произвольных точках,
    особенно в начале списка, дела обстоят далеко не так хорошо.
    Используя строку, подобную этой:
    myList.pop(0)
    мы фактически требуем, чтобы Python переписал весь список, а это замедлило бы выполнение программы.
    Еще более опасно то, что при использовании списка легко сделать в строке случайную запись, которая не является поточно-ориентированной. Например, такая запись:
    myList[len(myList)-1]
    в многопоточной среде может в действительности получить не последний элемент списка или даже выдать исключение,
    если непосредственно после вычисления значения len(myList)-1
    другая операция изменит список.
    Вы можете возразить, что было бы более в духе Python записать предыдущее выражение как myList[-1]. Ну да,
    конечно же, никто из нас в минуту слабости никогда не писал код не в стиле Python (особенно Java-разработчики не любят признаваться, что было время, когда они писали нечто вроде myList[myList.length-1]
    )! Но даже если ваш код безупречен, посмотрите на другие варианты потоконебезопасных строк, где используются списки:
    my_list[i] = my_list[i] + 1
    my_list.append(my_list[-1])
    Обе эти записи могут привести к состоянию гонки, которое будет иметь неожиданные последствия. Так что откажемся от списков и станем передавать сообщения в потоки другими способами!
    # Считываем входящее сообщение из глобального списка my_message = global_message
    # Записываем сообщение обратно
    global_message = 'I've retrieved the message'
    # делаем что-то с my_message
    Это выглядит отлично, пока мы не обнаружим, что могли случайно стереть сообщение, поступившее из другого потока в момент между первой и второй строками, записав вместо него текст I’ve retrieved the message. И теперь нам придется для каждого потока построить сложную последовательность объектов личных сообщений с какой-то логикой с целью выяснять, кто что получает... или же использовать модуль
    Queue
    , созданный как раз для этого.
    Очереди — это объекты, похожие на списки, которые работают либо по принципу «первым пришел — первым вышел» (First In First Out, FIFO), либо по принципу «последним пришел — первым вышел» (Last In First Out, LIFO). Очередь получает сообщения из любого потока с помощью функции queue.put('Mymessage')
    и может передать сообщение любому потоку, вызывающему функцию queue.get().
    Очереди предназначены не для хранения статических данных, а для их передачи потокобезопасным способом. После извлечения из очереди данные должны существовать только в том потоке, который их извлек. Поэтому очереди обычно используются для делегирования задач или отправки временных уведомлений.
    Это может быть полезно при веб-краулинге. Например, мы хотим сохранить данные, собранные веб-скрапером, в базе, и чтобы при этом каждый поток сохранял свои данные быстро.
    Одно общее соединение для всех потоков может вызвать проблемы (одно соединение неспособно обрабатывать запросы параллельно), однако нет смысла присваивать каждому потоку веб-скрапера отдельное соединение с базой данных. По мере увеличения размера веб-скрапера (возможно, вы в итоге будете
    собирать данные с сотен разных сайтов в сотне потоков) это может привести к большому количеству неиспользуемых соединений с базой данных, которые лишь время от времени выполняют запись после загрузки страницы.
    Вместо этого можно создать меньшее количество потоков базы данных, у каждого из которых будет собственное соединение; каждый из них будет время от времени извлекать элементы из очереди и сохранять их в базе. Так мы получим гораздо более управляемый набор соединений с базой данных.
    from urllib.request import urlopen from bs4 import BeautifulSoup import re import random import _thread from queue import Queue import time import pymysql def storage(queue):
    conn = pymysql.connect(host='127.0.0.1',
    unix_socket='/tmp/mysql.sock',
    user='root', passwd='', db='mysql',
    charset='utf8')
    cur = conn.cursor()
    cur.execute('USE wiki_threads')
    while 1:
    if not queue.empty():
    article = queue.get()
    cur.execute('SELECT * FROM pages
    WHERE path = %s',
    (article["path"]))
    if cur.rowcount == 0:
    print("Storing article
    {}".format(article["title"]))
    cur.execute('INSERT INTO pages
    (title, path) VALUES (%s, %s)', \
    (article["title"],
    article["path"]))
    conn.commit()
    else:
    print("Article already exists:
    {}".format(article['title']))
    visited = []
    def getLinks(thread_name, bs):
    print('Getting links in
    {}'.format(thread_name))
    links
    = bs.find('div',
    {'id':'bodyContent'}).find_all('a',
    href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link for link in links if link not in visited]
    def scrape_article(thread_name, path, queue):
    visited.append(path)
    html
    =
    urlopen('http://en.wikipedia.org{}'.format(path
    ))
    time.sleep(5)
    bs = BeautifulSoup(html, 'html.parser')
    title = bs.find('h1').get_text()
    print('Added {} for storage in thread
    {}'.format(title, thread_name))
    queue.put({"title":title, "path":path})
    links = getLinks(thread_name, bs)
    if len(links) > 0:
    newArticle = links[random.randint(0,
    len(links)-1)].attrs['href']
    scrape_article(thread_name, newArticle,
    queue)
    queue = Queue()
    try:
    _thread.start_new_thread(scrape_article,
    ('Thread 1',
    '/wiki/Kevin_Bacon', queue,))
    _thread.start_new_thread(scrape_article,
    ('Thread 2',
    '/wiki/Monty_Python', queue,))
    _thread.start_new_thread(storage, (queue,))
    except:
    print ('Error: unable to start threads')
    while 1:
    pass
    В этом скрипте создаются три потока: два из них предназначены для веб-скрапинга страниц «Википедии»,
    выбираемых случайным образом, а третий — для сохранения собранных данных в базе данных MySQL. Подробнее о MySQL и хранении данных см. в главе 6.
    Модуль threading

    Python-модуль _thread — довольно низкоуровневый,
    позволяющий управлять всеми нюансами потоков, однако высокоуровневых функций, которые могли бы облегчить жизнь разработчику, у этого модуля немного. Модуль threading является высокоуровневым интерфейсом, позволящим аккуратно использовать потоки, одновременно реализуя все функции лежащего в его основе модуля _thread.
    Например, мы можем использовать статические функции,
    такие как enumerate, чтобы получить список всех активных потоков, инициализированных через модуль threading, и для этого не придется отслеживать потоки самостоятельно.
    Аналогичным образом, функция activeCount возвращает общее количество потоков. Многие функции из модуля
    _thread получили в threading более удобные или запоминающиеся имена, например currentThread вместо get_ident
    , которая позволяет узнать имя текущего потока.
    Вот простой пример использования модуля threading:
    import threading import time def print_time(threadName, delay, iterations):
    start = int(time.time())
    for i in range(0,iterations):
    time.sleep(delay)
    seconds_elapsed = str(int(time.time())
    - start)
    print ('{} {}'.format(seconds_elapsed,
    threadName))
    threading.Thread(target=print_time, args=
    ('Fizz', 3, 33)).start()
    threading.Thread(target=print_time, args=
    ('Buzz', 5, 20)).start()
    threading.Thread(target=print_time, args=
    ('Counter', 1, 100)).start()
    Этот код выводит те же результаты алгоритма FizzBuzz, что и предыдущий простой пример с _thread.
    Одна из приятных особенностей модуля threading —
    простота создания локальных данных потоков, недоступных для других потоков. Это может быть удобным, если у вас есть несколько потоков, каждый из которых выполняет веб- скрапинг своего сайта и ведет собственный локальный список посещенных страниц.
    Эти локальные данные можно создавать в любой точке внутри функции потока, вызвав threading.local():
    import threading def crawler(url):
    data = threading.local()
    data.visited = []
    # Сканируем сайт threading.Thread(target=crawler, args=
    ('http://brookings.edu')).start()
    Это решает проблему состояния гонки, которое могло бы возникнуть между общими объектами потоков. Если объект не должен быть общедоступным, то его и не следует таким делать;
    подобный объект следует хранить в локальной памяти потока.
    Для безопасного общего доступа нескольких потоков к объекту можно использовать тот же модуль Queue, описанный в предыдущем подразделе.

    Модуль threading играет роль своеобразной «няни» для потока, и такие его обязанности можно легко настроить.
    Функция isAlive по умолчанию проверяет, активен ли поток.
    Поток будет активным до тех пор, пока не завершит веб- краулинг (или не случится сбой).
    Веб-краулеры часто рассчитаны на очень длительное время работы. Метод isAlive позволяет гарантировать, что в случае сбоя потока веб-краулер перезапустится:
    threading.Thread(target=crawler)
    t.start()
    while True:
    time.sleep(1)
    if not t.isAlive():
    t = threading.Thread(target=crawler)
    t.start()
    Чтобы добавить другие методы мониторинга, нужно расширить объект threading.Thread:
    import threading import time class Crawler(threading.Thread):
    def __init__(self):
    threading.Thread.__init__(self)
    self.done = False def isDone(self):
    return self.done
    def run(self):
    time.sleep(5)
    self.done = True raise Exception('Something bad happened!')
    t = Crawler()
    t.start()
    while True:
    time.sleep(1)
    if t.isDone():
    print('Done')
    break if not t.isAlive():
    t = Crawler()
    t.start()
    Этот новый класс Crawler содержит метод isDone,
    который годится для проверки того, завершил ли веб-краулер работу. Это может быть полезно, если есть какие-то дополнительные методы журналирования, которые необходимо завершить перед закрытием потока, при этом основная часть работы по веб-краулингу уже выполнена. Как правило, isDone можно заменить каким-либо статусом или показателем прогресса — например, показывающим текущую страницу или сколько страниц зарегистрировано.
    Любые исключения, вызванные Crawler.run, приведут к перезапуску класса, пока isDone не станет равным True, после чего программа завершит работу.

    Создавая классы веб-краулеров как расширения threading.Thread
    , можно сделать их более надежными и гибкими, а также одновременно контролировать все свойства нескольких веб-краулеров.
    Многопроцессный веб-краулинг
    Python-модуль Processing создает новые объекты процессов,
    которые можно запускать и присоединять из главного процесса. В следующем коде для демонстрации используется пример FizzBuzz из раздела, посвященного потокам и процессам.
    from multiprocessing import Process import time def print_time(threadName, delay, iterations):
    start = int(time.time())
    for i in range(0,iterations):
    time.sleep(delay)
    seconds_elapsed = str(int(time.time())
    - start)
    print (threadName if threadName else seconds_elapsed)
    processes = []
    processes.append(Process(target=print_time,
    args=('Counter', 1, 100)))
    processes.append(Process(target=print_time,
    args=('Fizz', 3, 33)))
    processes.append(Process(target=print_time,
    args=('Buzz', 5, 20)))
    for p in processes:
    p.start()
    for p in processes:
    p.join()
    Помните, что операционная система рассматривает каждый процесс как отдельную независимую программу. Если вы посмотрите на свои процессы через монитор активности или диспетчер задач ОС, то увидите картину, похожую на ту, что показана на рис. 16.1.
    Рис. 16.1. Пять процессов Python, выполняемых во время работы FizzBuzz
    Четвертый процесс с PID 76154 — это действующий экземпляр Jupyter Notebook, который должен присутствовать,
    если вы работаете из редактора iPython. Пятый процесс, 83560,
    является основным потоком выполнения, запускающимся при первом запуске программы. PID присваиваются операционной системой последовательно. Если у вас нет другой программы,
    которая одновременно работает с FizzBuzz и быстро выделяет
    PID, то вы должны увидеть еще три последовательных PID — в данном случае это 83561, 83562 и 83563.
    Эти PID также можно получить из кода с помощью модуля os
    :
    import os

    # Выводит дочерний PID
    os.getpid()
    # Выводит родительский PID
    os.getppid()
    Каждый процесс программы при выполнении os.getpid()
    должен выводить свой, отдельный PID, а в момент выполнения os.getppid()
    — один и тот же родительский PID.
    Есть пара строк кода, которые в данной конкретной программе, строго говоря, не нужны. Если не добавить заключительный оператор join:
    for p in processes:
    p.join()
    то родительский процесс все равно завершится и его дочерние процессы автоматически подойдут к концу. Однако данное объединение необходимо, если после завершения этих дочерних процессов вы захотите выполнить еще какой-либо код.
    Например:
    for p in processes:
    p.start()
    print('Program complete')
    Если убрать оператор join, то результат будет следующим:
    Program complete
    1 2
    Если включить оператор join, то программа сначала дождется завершения всех процессов и только потом
    продолжит выполняться:
    for p in processes:
    p.start()
    for p in processes:
    p.join()
    print('Program complete')
    Fizz
    99
    Buzz
    100
    Program complete
    Желая преждевременно остановить выполнение программы, можно, конечно, нажать Ctrl+C, чтобы завершить родительский процесс. При его завершении также подойдут к концу и все дочерние процессы, которые были им порождены,
    поэтому можно спокойно использовать Ctrl+C, не беспокоясь о том, что некоторые процессы случайно могут остаться работать в фоновом режиме.
    Пример многопроцессного веб-краулинга
    Пример многопоточного веб-краулинга «Википедии» можно изменить, чтобы использовать отдельные не потоки, а процессы:
    from urllib.request import urlopen from bs4 import BeautifulSoup import re
    import random from multiprocessing import Process import os import time visited = []
    def get_links(bs):
    print('Getting links in
    {}'.format(os.getpid()))
    links
    = bs.find('div',
    {'id':'bodyContent'}).find_all('a',
    href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link for link in links if link not in visited]
    def scrape_article(path):
    visited.append(path)
    html
    =
    urlopen('http://en.wikipedia.org{}'.format(path
    ))
    time.sleep(5)
    bs = BeautifulSoup(html, 'html.parser')
    title = bs.find('h1').get_text()
    print('Scraping
    {} in process
    {}'.format(title, os.getpid()))
    links = get_links(bs)
    if len(links) > 0:
    newArticle = links[random.randint(0,
    len(links)-1)].attrs['href']
    print(newArticle)
    scrape_article(newArticle)
    processes = []
    processes.append(Process(target=scrape_article,
    args=('/wiki/Kevin_Bacon',)))
    processes.append(Process(target=scrape_article,
    args=('/wiki/Monty_Python',)))
    for p in processes:
    p.start()
    Мы снова искусственно замедляем процесс веб-скрапера,
    добавляя строку time.sleep(5), чтобы его можно было использовать в качестве примера без чрезмерной нагрузки на серверы «Википедии».
    Здесь мы заменяем определенную пользователем переменную thread_name, передаваемую в качестве аргумента, на результат вызова функции os.getpid(),
    который не нужно передавать как аргумент и к которому можно получить доступ в любой момент.
    В результате получим примерно такой результат:
    Scraping Kevin Bacon in process 84275
    Getting links in 84275
    /wiki/Philadelphia
    Scraping Monty Python in process 84276
    Getting links in 84276
    /wiki/BBC
    Scraping BBC in process 84276
    Getting links in 84276
    /wiki/Television_Centre,_Newcastle_upon_Tyne
    Scraping Philadelphia in process 84275

    Теоретически веб-краулинг в отдельных процессах должен выполняться немного быстрее, чем в отдельных потоках, по двум основным причинам.
    • Процессы не подлежат блокировке GIL. Они могут выполнять одни и те же строки кода и изменять один и тот же объект
    (на самом деле это разные экземпляры одного и того же объекта) одновременно.
    • Процессы могут выполняться на нескольких ядрах процессора, что позволит обеспечить выигрыш в скорости,
    если каждый из процессов или потоков интенсивно использует процессор.
    Однако здесь, наряду с преимуществами, есть существенный недостаток. В рассмотренной программе все найденные URL хранятся в глобальном списке visited. Когда мы использовали несколько потоков, этот список был общим для всех потоков; при этом один поток, за редким исключением состояния гонки, не мог посетить страницу, уже посещенную другим потоком. Однако теперь каждый процесс получает собственную, независимую версию списка посещенных страниц и вполне может посещать страницы, уже просмотренные другими процессами.
    Обмен данными между процессами
    Каждый процесс работает в собственной, независимой памяти,
    что может вызвать проблемы, если мы хотим, чтобы процессы использовали общую информацию.
    Изменив предыдущий пример так, чтобы выводилась текущая версия списка посещенных страниц, мы можем увидеть, как работает этот принцип:
    def scrape_article(path):
    visited.append(path)
    print("Process {} list is now:
    {}".format(os.getpid(), visited))
    В результате получим следующее:
    Process
    84552 list is now:
    ['/wiki/Kevin_Bacon']
    Process
    84553 list is now:
    ['/wiki/Monty_Python']
    Scraping Kevin Bacon in process 84552
    Getting links in 84552
    /wiki/Desert_Storm
    Process
    84552 list is now:
    ['/wiki/Kevin_Bacon', '/wiki/Desert_Storm']
    Scraping Monty Python in process 84553
    Getting links in 84553
    /wiki/David_Jason
    Process
    84553 list is now:
    ['/wiki/Monty_Python', '/wiki/David_Jason']
    Но есть способ обмена информацией между процессами,
    работающими на одной машине, через два типа объектов
    Python: очереди и каналы.
    Очередь похожа на рассмотренную ранее очередь потоков.
    Один процесс может поместить в нее информацию, а другой —
    удалить ее оттуда. После удаления этой информации больше нет в очереди. Поскольку очереди спроектированы как способ временной передачи данных, они не особенно хорошо подходят для хранения статических ссылок, таких как список уже посещенных веб-страниц.

    Но что если заменить этот статический список веб-страниц каким-либо делегатором веб-скрапинга? Веб-скраперы могут извлекать задачу из одной очереди в виде пути для продолжения веб-скрапинга
    (например,
    /wiki/Monty_Python
    ), а затем помещать список «найденных
    URL» в другую очередь. А она будет обрабатываться делегатором веб-скрапинга, который следит, чтобы в первую очередь задач добавлялись только новые URL:
    from urllib.request import urlopen from bs4 import BeautifulSoup import re import random from multiprocessing import Process, Queue import os import time def task_delegator(taskQueue, urlsQueue):
    # Инициализируем задачу для каждого процесса.
    visited
    =
    ['/wiki/Kevin_Bacon',
    '/wiki/Monty_Python']
    taskQueue.put('/wiki/Kevin_Bacon')
    taskQueue.put('/wiki/Monty_Python')
    while 1:
    # Проверяем, есть ли в urlsQueue
    # новые ссылки, доступные для обработки.
    if not urlsQueue.empty():
    links = [link for link in urlsQueue.get() if link not in visited]
    for link in links:
    # Добавляем в taskQueue новую ссылку.
    taskQueue.put(link)
    def get_links(bs):
    links
    = bs.find('div',
    {'id':'bodyContent'}).find_all('a',
    href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link.attrs['href'] for link in links]
    def scrape_article(taskQueue, urlsQueue):
    while 1:
    while taskQueue.empty():
    # Засыпаем на 100 мс, ожидая очередь задач.
    # Это должно происходить редко.
    time.sleep(.1)
    path = taskQueue.get()
    html
    =
    urlopen('http://en.wikipedia.org{}'.format(path
    ))
    time.sleep(5)
    bs = BeautifulSoup(html, 'html.parser')
    title = bs.find('h1').get_text()
    print('Scraping {} in process
    {}'.format(title, os.getpid()))
    links = get_links(bs)
    # Отправляем это на обработку делегатору.
    urlsQueue.put(links)
    processes = []
    taskQueue = Queue()
    urlsQueue = Queue()
    processes.append(Process(target=task_delegator,
    args=(taskQueue, urlsQueue,)))
    processes.append(Process(target=scrape_article,
    args=(taskQueue, urlsQueue,)))
    processes.append(Process(target=scrape_article,
    args=(taskQueue, urlsQueue,)))
    for p in processes:
    p.start()
    У этого веб-скрапера есть ряд структурных отличий от скриптов, созданных нами ранее. Если раньше каждый процесс или поток выполнял случайный переход от назначенной ему начальной точки, то теперь они работают совместно,
    производя полный сбор данных с сайта. Каждый процесс может извлечь из очереди любую задачу, а не только те ссылки,
    которые он сам нашел.
    Многопроцессный веб-краулинг: еще один подход
    Все обсуждаемые здесь подходы многопоточного и многопроцессного веб-краулинга предполагают необходимость некоего родительского контроля для дочерних потоков и процессов. Мы можем их все одновременно запустить и завершить, передавать сообщения между ними или выделить для них общую память.
    А если веб-скрапер спроектирован таким образом, что не нуждается в каких-либо указаниях или обмене данными? Тогда
    у вас едва ли найдутся причины, чтобы озадачиваться импортом _thread на данном этапе.
    Например, мы хотим параллельно просканировать два схожих сайта. У нас есть веб-краулер, который может просканировать любой из них, для чего потребуется небольшое изменение конфигурации или, вероятно, другие аргументы командной строки. Нет совершенно никаких причин, по которым мы не могли бы просто сделать следующее:
    $ python my_crawler.py website1
    $ python my_crawler.py website2
    И — вуаля! — мы только что запустили многопроцессный веб-краулер, сэкономив ресурсы процессора и избежав издержек на хранение родительского процесса для загрузки!
    Конечно, представленный подход имеет свои недостатки.
    Если мы хотим запустить таким образом два веб-краулера на
    одном сайте, то нам понадобится какой-то способ,
    позволяющий убедиться, что эти веб-краулеры случайно не станут выполнять веб-скрапинг одних и тех же страниц. Для решения этой проблемы можно создать URL-правило («краулер
    1 выполняет веб-скрапинг страниц блога, а краулер 2 —
    страниц товаров») или разделить сайт еще каким-либо способом.
    Кроме того, можно организовать координацию через какую-либо промежуточную базу данных. Прежде чем перейти по новой ссылке, краулер может сделать запрос к ней и спросить: «Эта страница уже просканирована?» Веб-краулер использует базу в качестве системы межпроцессного взаимодействия. Разумеется, если метод тщательно не продумать, то он может привести к состоянию гонки или задержкам в случае медленного соединения с базой данных

    (вероятно, такая проблема возможна только при подключении к удаленной БД).
    Кроме того, вы можете обнаружить, что этот метод масштабируется хуже, чем предыдущие. Использование модуля
    Process позволяет динамически увеличивать или уменьшать количество процессов, сканирующих сайт или даже хранящих данные. Для их отключения вручную понадобится либо человек, физически выполняющий скрипт, либо отдельный управляющий скрипт (будь то скрипт bash, задание cron или что-либо еще).
    Тем не менее я не раз с успехом использовала данный метод. Для небольших разовых проектов это отличный способ быстро получить много информации, особенно с нескольких сайтов.

    1   2   3   4   5   6


    написать администратору сайта