Главная страница
Навигация по странице:

  • 8.3.3 Политики насыщения

  • Листинг 8.4

  • 8.3.5 Настройка экземпляра ThreadPoolExecutor после построения

  • 8.4 Расширение класса ThreadPoolExecutor

  • 8.4.1 Пример: добавление статистики в пул потоков

  • Листинг 8.9

  • Листинг 8.12

  • При участии Тима Перлса, Джошуа Блоха, Джозева Боубира, Дэвида Холмса и Дага Ли Параллельное программирование в java на


    Скачать 4.97 Mb.
    НазваниеПри участии Тима Перлса, Джошуа Блоха, Джозева Боубира, Дэвида Холмса и Дага Ли Параллельное программирование в java на
    Анкорjava concurrency
    Дата28.11.2019
    Размер4.97 Mb.
    Формат файлаpdf
    Имя файлаJava concurrency in practice.pdf
    ТипДокументы
    #97401
    страница17 из 34
    1   ...   13   14   15   16   17   18   19   20   ...   34
    8.3.2 Управление задачами в очереди
    Ограниченные пулы потоков устанавливают предельное количество одновременно выполняемых задач. (Однопоточные исполнители являются примечательным частным случаем: они гарантируют, что никакие задачи не будут выполняться одновременно, предлагая возможность достижения потокобезопасности через ограничение потока.)
    Ранее, в разделе 6.1.2, мы видели, что создание неограниченного количества потоков может привести к нестабильности, и решили эту проблему, используя пул потоков фиксированного размера вместо создания нового потока для каждого поступающего запроса. Тем не менее, это лишь частичное решение проблемы; приложение все еще может исчерпать ресурсы при большой нагрузке, только сделать это будет сложнее. Если скорость поступления новых запросов превышает скорость, с которой они могут быть обработаны, запросы все равно будут помещаться в очередь. В случае использования пула потоков, запросы ожидают в очереди экземпляров
    Runnable
    , управляемой экземпляром
    Executor
    , вместо того, чтобы ожидать в очереди в качестве потоков, конкурирующих за ЦП.
    Представление ожидающей задачи с помощью экземпляра
    Runnable и узла списка, безусловно, обходится намного дешевле, чем представление с помощью потока, но риск исчерпания ресурсов по-прежнему остается, так как клиенты могут отправлять запросы на сервер быстрее, чем он сможет их обрабатывать.
    Запросы часто поступают пакетами, даже если в среднем скорость поступления запросов достаточно стабильна. Очереди могут помочь в сглаживании временных переходов между всплесками задач, но если задачи продолжат поступать слишком быстро, вам в конечном итоге придется регулировать скорость прибытия, чтобы избежать нехватки памяти
    99
    . Ещё до того, как закончится доступная память, время ответа будет постепенно увеличиваться, по мере роста количества задач в очереди.
    Класс
    ThreadPoolExecutor позволяет вам использовать экземпляр
    BlockingQueue для содержания задач, ожидающих выполнения. Существует три основных подхода к организации очереди задач: неограниченная очередь, ограниченная очередь и синхронная передача. Выбор типа очереди оказывает влияние на другие параметрами конфигурации, например, на размер пула.
    По умолчанию, фабричные методы newFixedThreadPool и newSingleThreadExecutor используют неограниченную реализацию очереди
    LinkedBlockingQueue
    . Задачи будут помещаться в очередь, если все рабочие потоки будут заняты, в случае, если задачи продолжат поступать быстрее, чем они могут быть выполнены, очередь будет расти без ограничений,
    Более стабильной стратегией управления ресурсами является использование ограниченной очереди, например класса
    ArrayBlockingQueue или ограниченной версии
    LinkedBlockingQueue или
    PriorityBlockingQueue
    . Ограниченные очереди помогают предотвратить исчерпание ресурсов, но поднимают вопрос о том, что делать с новыми задачами при заполнении очереди. (Существует ряд возможных политик насыщения (saturation policies) для решения этой проблемы;
    99
    Можно провести аналогию с управлением потоком в сетях связи: вы можете захотеть буферизировать определенный объем данных, но в конечном итоге вы будете вынуждены найти способ заставить другую сторону прекратить отправку данных или начать отбрасывать лишние данные и надеяться, что отправитель позже повторно отправит их вновь, когда вы будете свободнее.
    см. раздел
    8.3.3
    .) С ограниченной рабочей очередью размер очереди и размер пула должны настраиваться вместе. Большая очередь в сочетании с пулом небольшого размера может снизить использование памяти, ЦП и переключение контекста за счет потенциального ограничения пропускной способности.
    Для очень больших или неограниченных пулов потоков также можно полностью обойтись без помещения задач в очередь и передавать задачи непосредственно от производителей рабочим потокам, с помощью класса
    SynchronousQueue
    . Класс
    SynchronousQueue
    , на самом деле, является не очередью, а механизмом управления передачей между потоками. Следуя порядку размещения элементов в экземпляре
    SynchronousQueue
    , должен существовать поток, ожидающий принятия передачи. Если нет ожидающих потоков, но текущий размер пула меньше, чем максимально заданный, экземпляр
    ThreadPoolExecutor создает новый поток; иначе задача отбрасывается согласно политике насыщения.
    Использование прямой передачи является более эффективным механизмом взаимодействия, поскольку задача может быть напрямую передана потоку, который ее выполнит, а не помещена в очередь, из которой её извлечёт рабочий поток. Класс
    SynchronousQueue является оправданным выбором только в том случае, если пул потоков является неограниченным или если отклонение избыточных задач приемлемо. Фабричный метод newCachedThreadPool использует класс
    SynchronousQueue
    Использование очередей FIFO, например, таких как
    LinkedBlockingQueue или
    ArrayBlockingQueue
    , приводит к запуску задач в том порядке, в котором они были приняты. Для большего контроля над порядком выполнения задач можно использовать класс
    PriorityBlockingQueue
    , который упорядочивает задачи в соответствии с заданным приоритетом. Приоритет может быть определен естественным порядком (если задачи реализуют интерфейс
    Comparable
    ) или с помощью интерфейса
    Comparator
    Фабричный метод newCachedThreadPool является хорошим выбором по умолчанию для реализации Executor, обеспечивая лучшую производительность очереди, чем фиксированный пул потоков
    100
    . Пул потоков фиксированного размера является хорошим выбором, когда необходимо ограничить число параллельных задач в целях управления ресурсами, как в случае серверного приложения, которое принимает запросы от клиентов по сети, и в ином случае было бы уязвимо для перегрузки.
    Решение с ограничением пула потоков или рабочей очереди подходит только в том случае, если задачи независимы. С задачами, которые зависят от других задач, ограниченные пулы потоков или очереди могут привести к взаимоблокировке потоков вызванной голоданием; вместо этого используйте конфигурацию неограниченного пула, например фабричный метод newCachedThreadPool
    101 100
    Это различие в производительности происходит от использования класса
    SynchronousQueue вместо класса
    LinkedBlockingQueue
    . В Java 6 реализация класса
    SynchronousQueue была заменена новым неблокирующим алгоритмом, который улучшил пропускную способность экземпляра
    Executor в тестах на факторизацию более чем в три раза, по сравнению с реализацией класса
    SynchronousQueue из Java
    5.0
    (Scherer et al., 2006).
    101
    Альтернативной конфигурацией для задач, отправляющих другие задачи и ожидающих результатов их выполнения, является использование ограниченного пула потоков, класса SynchronousQueue в качестве рабочей очереди и политики насыщения вызывающего объекта.

    8.3.3 Политики насыщения
    Когда ограниченная рабочая очередь заполняется, вступает в игру политика
    насыщения
    (saturation
    policy).
    Политика насыщения для класса
    ThreadPoolExecutor может быть изменена путем вызова метода setRejectedExecutionHandler
    . (Политика насыщения также используется, когда задача передается экземпляру
    Executor
    , который был выключен.) Предоставляется несколько реализаций интерфейса RejectedExecutionHandler, реализующих ту или другую политику насыщения:
    AbortPolicy
    ,
    CallerRunsPolicy
    ,
    DiscardPolicy
    , и
    DiscardOldestPolicy
    Политика по умолчанию, прерывание (abort), заставляет метод execute бросить непроверяемое исключение
    RejectedExecutionException
    ; вызывающий объект может перехватить это исключение и реализовать собственную обработку переполнения пула по своему усмотрению. Политика сброса (discard) автоматически отбрасывает только что отправленную задачу, если она не может быть поставлена в очередь на выполнение; политика сброса старейшего элемента
    (discard-oldest) сбрасывает задачу, которая в противном случае была бы выполнена следующей, и пытается повторно отправить новую задачу. (Если рабочая очередь представляет собой очередь с приоритетом, это приводит к сбросу элемента с наивысшим приоритетом, таким образом, комбинация политики насыщения путём сброса старейшего элемента и очереди с приоритетом не является хорошей идеей.)
    Политика выполнение вызывающего объекта (caller-runs) реализует форму регулирования, при которой задачи не сбрасываются, исключения не бросаются, а наоборот прилагаются все усилия по замедлению поступления потока новых задач, путём возврата некоторой части работы обратно вызывающему объекту. Только что отправленная задача выполняется не в потоке пула, а в потоке, вызвавшем метод execute
    . Если мы внесём изменения в класс
    WebServer
    , представленный в качестве примера, чтобы использовать ограниченную очередь и политику
    “выполнение вызывающего объекта”, после того, как все пулы потоков будут заполнены, и рабочая очередь также будет заполнена, следующая задача будет выполняться в главном потоке, в процессе вызова на выполнение. Поскольку обработка, вероятно, займет некоторое время, основной поток не сможет отправлять задачи, по крайней мере, некоторое время, давая рабочим потокам какое-то время на то, чтобы сократить разрыв. Основной поток также не будет вызывать метод accept в течение этого времени, поэтому входящие запросы будут попадать в очередь на уровне TCP, а не на уровне приложения. Если перегрузка будет сохраняться, в конечном счёте, уровень TCP примет решение о том, что он поставил в очередь достаточное количество запросов на подключение и начнёт отбрасывать запросы на подключение. По мере того как сервер будет перегружаться, перегрузка будет постепенно выталкиваться наружу – из пула потоков в рабочую очередь приложения, далее на уровень TCP и, в конечном счете, клиенту, обеспечивая таким образом более плавное снижение производительности под нагрузкой.
    Выбор политики насыщения или внесение других изменений в политику выполнения может быть выполнен в процессе создания экземпляра
    Executor
    . В листинге 8.3 показано создание пула потоков фиксированного размера с помощью политики “выполнение вызывающего объекта”.
    ThreadPoolExecutor executor

    = new ThreadPoolExecutor(N_THREADS, N_THREADS,
    0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(CAPACITY)); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy());
    Листинг 8.3 Создание пула потоков фиксированного размера с ограниченной очередью и политикой насыщения
    “выполнение вызывающего объекта”
    Не существует предопределённой политики насыщения, выполняющей блокировку метода execute
    , когда очередь заполняется. Однако, похожий эффект может быть достигнут с помощью семафора, для ограничения частоты внедрения задач, как показано в классе
    BoundedExecutor из листинга 8.4. При таком подходе используйте неограниченную очередь (нет причин ограничивать и размер очереди, и частоту внедрения) и установите границу на семафоре равную размеру пула плюс количеству поставленных в очередь задач, которые вы хотите обработать, так как семафор ограничивает количество задач, как выполняемых в данный момент, так и ожидающих выполнения.
    @ThreadSafe public class BoundedExecutor { private final Executor exec; private final
    Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound);
    } public void submitTask(final Runnable command) throws InterruptedException { semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run();
    } finally { semaphore.release();
    }
    }
    });
    } catch (RejectedExecutionException e) { semaphore.release();
    }
    }
    }
    Листинг 8.4 Использование класса
    Semaphore для управления частотой отправки задач

    8.3.4 Фабрики задач
    Всякий раз, когда пул потоков должен создать поток, он делает это с помощью
    фабрики потоков (thread factory, см. листинг 8.5). Фабрика потоков по умолчанию создает новый поток, не демон, без специальной конфигурации. Определяемые фабрики потоков позволяет вам настроить конфигурацию пула потоков. Интерфейс
    ThreadFactory имеет один метод, newThread
    , который вызывается всякий раз, когда пулу потоков необходимо создать новый поток. public interface ThreadFactory {
    Thread newThread(Runnable r);
    }
    Листинг 8.5 Интерфейс
    ThreadFactory
    Существует ряд причин для использования настраиваемой версии (custom) фабрики потоков. Вы можете захотеть определить для пула потоков исключение
    UncaughtExceptionHandler или создать экземпляр настраиваемой версии класса
    Thread
    , например такого, который выполняет ведение журнала отладки. Вы можете захотеть изменить приоритет (обычно не очень хорошая идея; см. раздел
    10.3.1
    ) или установить статус демона (снова, не всё в этой идее хорошо; см. раздел
    7.4.2
    ) потокам в пуле. Или, может быть, вы просто хотите дать потокам пула более осмысленные имена, чтобы упростить интерпретацию дампов потоков и логов с ошибками.
    В классе
    MyThreadFactory из листинга 8.6, иллюстрируется пример настраиваемой фабрики потоков. Фабрика создаёт новый экземпляр
    MyAppThread
    , передавая конструктору специфичное для экземпляра пула потоков имя, так, чтобы потоки от каждого пула могли быть легко различимы в дампах потоков и журналах ошибок. Класс
    MyAppThread также можно использовать и в других частях приложения, так что все потоки могут воспользоваться предоставляемыми возможностями для облегчения отладки. public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName;
    } public Thread newThread(Runnable runnable) { return new MyAppThread(runnable, poolName);
    }
    }
    Листинг 8.6 Настраиваемая реализация фабрики потоков
    В классе
    MyAppThread показанном в листинге 8.7, выполняется интересная настройка, которая позволяет вам указать имя потока, устанавливает настраиваемое исключение
    UncaughtExceptionHandler
    , записывающее сообщение в экземпляр класса
    Logger
    , ведет статистику о том, сколько потоков было создано и уничтожено, и, опционально, записывает отладочное сообщение в лог, когда поток создается или завершается.
    public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); setUncaughtExceptionHandler( new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t,
    Throwable e) { log.log(Level.SEVERE,
    "UNCAUGHT in thread " + t.getName(), e);
    }
    });
    } public void run() {
    // Copy debug flag to ensure consistent value throughout.
    boolean debug = debugLifecycle;
    if (debug) log.log(Level.FINE, "Created "+getName()); try { alive.incrementAndGet(); super.run();
    } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting "+getName());
    }
    } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; }
    }
    Листинг 8.7 Настраиваемый класс, построенный на основе потока
    Если приложение использует преимущества политик безопасности в целях предоставления разрешений определенным кодовым базам, для создания фабрики потоков вы можете использовать фабричный метод privilegedThreadFactory класса
    Executors
    . Метод создает пул потоков, имеющий те же разрешения, экземпляры
    AccessControlContext и contextClassLoader с теми же правами
    ,
    что и поток, создаваемый фабричным методом privilegedThreadFactory
    . В противном случае потоки, созданные пулом потоков, наследуют разрешения от любого клиента вызвавшего методы execute или submit во время создания
    нового потока, что может привести к путанице в исключениях, связанных с безопасностью.
    8.3.5 Настройка экземпляра ThreadPoolExecutor после
    построения
    Большинство параметров, передаваемых конструкторам ThreadPoolExecutor, также можно изменить после построения с помощью сеттеров (например, корневой размер пула потоков, максимальный размер пула потоков, время ожидания, фабрику потоков и обработчик выполнения отклонённых запросов). Если экземпляр
    Executor создается одним из фабричных методов класса
    Executors
    (исключая фабричный метод newSingleThreadExecutor
    ), вы можете привести результат к классу
    ThreadPoolExecutor
    , для доступа к сеттерам, как показано в листинге 8.8.
    ExecutorService exec = Executors.newCachedThreadPool(); if (exec instanceof ThreadPoolExecutor)
    ((ThreadPoolExecutor) exec).setCorePoolSize(10); else throw new AssertionError("Oops, bad assumption");
    Листинг 8.8 Изменение экземпляра
    Executor
    , созданного с помощью стандартной фабрики
    Класс
    Executors включает в себя фабричный метод unconfigurableExecutorService
    , который принимает существующий экземпляр
    ExecutorService и обертывает его, раскрывая только методы интерфейса
    ExecutorService
    , без возможности настройки в дальнейшем. В отличие от реализаций с пулами потоков, фабричный метод newSingleThreadExecutor возвращает экземпляр
    ExecutorService
    , обернутый подобным образом, а не
    “сырой” (
    raw)
    экземпляр
    ThreadPoolExecutor
    . В то время как однопоточный исполнитель фактически реализован как пул потоков с одним потоком, он также берёт на себя обязательство не выполнять задачи параллельно. Если какой-то ошибочный код увеличит размер пула в однопоточном исполнителе, это нарушит предполагаемую семантику выполнения.
    Вы можете использовать этот подход с собственными исполнителями, для предотвращения внесения изменений в политику выполнения. Если вы будете предоставлять экземпляр
    ExecutorService коду, которому не доверяете, без внесения изменений в него, вы можете обернуть экземпляр с помощью метода unconfigurableExecutorService
    8.4 Расширение класса ThreadPoolExecutor
    Класс
    ThreadPoolExecutor был разработан для расширения, предоставляя несколько “хуков” для переопределения в подклассах - beforeExecute
    , afterExecute
    , и terminated
    – что может быть использовано для расширения поведения класса
    ThreadPoolExecutor
    Хуки beforeExecute и afterExecute вызываются в потоке, который выполняет задачу, и могут использоваться для добавления логирования, отсчёта времени выполнения, мониторинга или сбора статистики. Хук afterExecute вызывается независимо от того, завершается ли задача обычным образом, путём возврата управления из метода run или путём возбуждения исключения
    Exception
    . (Если задача завершается путём возбуждения исключения
    Error
    , хук
    afterExecute не будет вызван.) Если хук beforeExecute бросит исключение
    RuntimeException
    , задача не будет выполняться, и хук afterExecute также не будет вызван.
    Хук terminated вызывается, когда выполнен процесс завершения работы пула потоков, после завершения всех задач и завершения выполнения всех рабочих потоков. Он может быть использован для высвобождения ресурсов, выделенных исполнителем в течение его жизненного цикла, отправки уведомлений или логирования, или завершения сбора статистики.
    8.4.1 Пример: добавление статистики в пул потоков
    В классе
    TimingThreadPool из листинга 8.9, показан настраиваемый пул потоков, который использует хуки beforeExecute
    , afterExecute
    , и terminated для добавления возможностей логирования и сбора статистики. Чтобы измерить время выполнения задачи, хук beforeExecute должен записать время начала и сохранить его в таком месте, в котором хук afterExecute сможет найти его. Поскольку выполняемые хуки вызываются в том же потоке, который выполняет задачу, значение, помещенное в экземпляр
    ThreadLocal хуком beforeExecute
    , может быть получено хуком afterExecute
    . Класс
    TimingThreadPool использует пару переменных типа
    AtomicLong
    , чтобы отслеживать общее количество обработанных задач и общее время количество времени, потраченное на обработку, и использует хук terminated
    , чтобы вывести сообщение лога, отражающее среднее время выполнения задачи. public class TimingThreadPool extends ThreadPoolExecutor { private final ThreadLocal startTime
    = new ThreadLocal(); private final Logger log = Logger.getLogger("TimingThreadPool"); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); log.fine(String.format("Thread %s: start %s", t, r)); startTime.set(System.nanoTime());
    } protected void afterExecute(Runnable r, Throwable t) { try { long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
    } finally { super.afterExecute(r, t);
    }
    }
    protected void terminated() { try { log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));
    } finally { super.terminated();
    }
    }
    }
    Листинг 8.9 Пул потоков, расширенный механизмами логирования и учёта времени выполнения задач
    8.5 Распараллеливание рекурсивных алгоритмов
    Пример рендеринга страницы из раздела 6.3, подвергся серии доработок, в поисках подходящего для использования механизма параллелизма. Первая попытка была полностью последовательной; вторая использовала два потока, но все же выполняла все операции загрузки изображений последовательно; окончательная версия, для достижения большей степени параллелизма, рассматривала каждую операцию загрузки изображения как отдельную задачу. Циклы, тела которых содержат нетривиальные вычисления или выполняют потенциально подверженные блокировке операции ввода/вывода, часто являются хорошими кандидатами для распараллеливания, если итерации независимы.
    Если у нас есть цикл, итерации которого независимы, и для продолжения нам не нужно ожидать завершения выполняемых в них операций, мы можем использовать экземпляр
    Executor для преобразования последовательного цикла в параллельный, как показано в методах processSequentially и processInParallel из листинга 8.10. void processSequentially(List elements) { for (Element e : elements) process(e);
    } void processInParallel(Executor exec, List elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); }
    });
    }
    Листинг 8.10. Преобразование последовательного выполнения в параллельное
    Вызов метода processInParallel завершается быстрее, чем вызов метода processSequentially
    , потому что он возвращает управление, как только все задачи будут поставлены в очередь экземпляра
    Executor
    , вместо того чтобы ждать, пока все они будут завершены. Если вы хотите отправить набор задач и дождаться их завершения, можно использовать метод
    ExecutorService.invokeAll
    ; для получения результатов по мере их поступления, можно использовать экземпляр
    CompletionService
    , как в классе
    Renderer из раздела 6.3.6.

    Последовательные итерации цикла подходят для распараллеливания, когда каждая итерация независима от других и работа, выполняемая в каждой итерации тела цикла, достаточно значительна, чтобы компенсировать затраты на управление новой задачей.
    Распараллеливание циклов также может быть применено в случае некоторых рекурсивных дизайнов; в рекурсивном алгоритме часто встречаются последовательные циклы, к распараллеливанию которых можно применить такой же подход, как в листинге 8.10. Простейший случай - это когда итерации не ожидают возвращения результатов выполнения рекурсивных итераций, которые они вызывают. Например, метод sequentialRecursive в листинге 8.11 выполняет обход дерева “в глубину”, выполняя вычисления на каждом узле и помещая результат в коллекцию. Преобразованная версия, приведённая в методе parallelRecursive
    , также выполняет обход в глубину, но вместо вычисления результата при посещении каждого узла, она отправляет задачу для вычисления результата соответствующего узлу. public void sequentialRecursive(List> nodes,
    Collection results) { for (Node n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results);
    }
    } public void parallelRecursive(final Executor exec,
    List> nodes, final Collection results) { for (final Node n : nodes) { exec.execute(new Runnable() { public void run() { results.add(n.compute());
    }
    }); parallelRecursive(exec, n.getChildren(), results);
    }
    }
    Листинг 8.11 Преобразование последовательной хвостовой рекурсии в распараллеленную рекурсию
    К моменту возвращения управления методом parallelRecursive
    , каждый узел дерева уже был посещен (обход по-прежнему осуществляется последовательно: параллельно выполняются только вызовы метода compute
    ) и вычисления, выполняемые для каждого узла, уже были поставлены в очередь экземпляра
    Executor
    . Объекты, вызывающие метод parallelRecursive
    , могут ожидать получения всех результатов, создав специфичный, предназначенный для обхода, экземпляр
    Executor и используя методы shutdown и awaitTermination
    , как показано в листинге 8.12. public Collection getParallelResults(List> nodes) throws InterruptedException {

    ExecutorService exec = Executors.newCachedThreadPool();
    Queue resultQueue = new ConcurrentLinkedQueue(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue;
    }
    Листинг 8.12 Ожидание результатов параллельных вычислений
    8.5.1 Пример: фрэймворк для решения головоломок
    Такой подход привлекателен для поиска решений в головоломках, которые включают в себя поиск последовательности преобразований из некоторого начального состояния в некоторое целевое состояние, например, такие головоломки как «головоломки на перемещение элементов»
    102
    , «Hi-Q», «Instant
    Insanity» и прочие головоломки-пасьянсы.
    Мы определяем “головоломку” как комбинацию начальной позиции, целевой позиции и набора правил, определяющих допустимые ходы. Набор правил состоит из двух частей: вычисленного списка возможных, с заданной позиции, ходов и вычисления результата применения хода к позиции. В интерфейсе
    Puzzle из листинга 8.13, показана абстракция нашей головоломки; параметры типа P и M представляют собой классы позиции и хода. Основываясь на приведённом интерфейсе, мы можем написать простой последовательный решатель, который просматривает пространство шагов головоломки до тех пор, пока не будет найдено решение или пространство шагов головоломки не будет исчерпано. public interface Puzzle
    {
    P initialPosition(); boolean isGoal(P position);
    Set legalMoves(P position);
    P move(P position, M move);
    }
    Листинг 8.13 Абстракция для головоломки, подобной “головоломка на перемещение элементов”
    Класс
    Node из листинга 8.14, представляет собой позицию, которая была достигнута с помощью некоторой серии шагов, путём удержания ссылки на перемещение, создавшее позицию, и предыдущий узел. Переход по ссылкам назад, начиная с текущего экземпляра
    Node,
    позволяет восстановить последовательность ходов, которые привели к текущей позиции.
    @Immutable static class Node
    { final P pos; final M move; final Node prev;
    Node(P pos, M move, Node prev) {...}
    List asMoveList() {
    102
    See http://www.puzzleworld.org/SlidingBlockPuzzles

    List solution = new LinkedList(); for (Node n = this; n.move != null; n = n.prev) solution.add(0, n.move); return solution;
    }
    }
    Листинг 8.14 Связующий узел в фреймворке решения загадок
    В классе
    SequentialPuzzleSolver из листинга
    8.15, приведён последовательный решатель, используемый в фреймворке решения головоломок и выполняющий поиск решения “в глубину” в пространстве шагов головоломки. Он завершает свою работу, когда находит решение (которое не обязательно является кратчайшим решением). public class SequentialPuzzleSolver
    { private final Puzzle puzzle; private final Set seen = new HashSet
    (); public SequentialPuzzleSolver(Puzzle puzzle) { this.puzzle = puzzle;
    } public List solve() {
    P pos = puzzle.initialPosition(); return search(new Node
    (pos, null, null));
    } private List search(Node node) { if (!seen.contains(node.pos)) { seen.add(node.pos); if (puzzle.isGoal(node.pos)) return node.asMoveList(); for (M move : puzzle.legalMoves(node.pos)) {
    P pos = puzzle.move(node.pos, move);
    Node child = new Node
    (pos, move, node);
    List result = search(child); if (result != null) return result;
    }
    } return null;
    } static class Node
    { /
    *
    Listing 8.14
    *
    / }
    }
    Листинг 8.15 Последовательная версия решателя головоломок
    Переписав решатель с использованием параллелизма, мы сможем параллельно вычислять последующие шаги, а также, параллельно, проводить оценку на соответствие целевому условию, так как процесс оценки одного хода в основном
    не зависит от оценки других ходов. (Мы говорим "в основном", потому что задачи имеют некоторое совместно используемое состояние, такое как набор посещённых позиций.)
    Параллельная версия решателя головоломок, представленная в листинге 8.16, использует внутренний класс
    SolverTask расширяющий класс
    Node и реализующий интерфейс
    Runnable
    . Большая часть работы выполняется в методе run
    : оценка набора возможных следующих позиций, отсечение ранее посещённых позиций, проверка факта достижения успеха (этой задачей или какой-то другой) и отправка ранее не посещённых позиций экземпляру
    Executor public class ConcurrentPuzzleSolver
    { private final Puzzle puzzle; private final ExecutorService exec; private final ConcurrentMap seen; final ValueLatch> solution
    = new ValueLatch>(); public List solve() throws InterruptedException { try {
    P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null));
    // block until solution found
    Node solnNode = solution.getValue(); return (solnNode == null) ? null : solnNode.asMoveList();
    } finally { exec.shutdown();
    }
    } protected Runnable newTask(P p, M m, Node n) { return new SolverTask(p, m, n);
    } class SolverTask extends Node implements Runnable { public void run() { if (solution.isSet()
    || seen.putIfAbsent(pos, true) != null) return; // already solved or seen this position if (puzzle.isGoal(pos)) solution.setValue(this); else for (M m : puzzle.legalMoves(pos)) exec.execute( newTask(puzzle.move(pos, m), m, this));
    }
    }
    }
    Листинг 8.16 Параллельная версия решателя головоломок

    Чтобы избежать бесконечных циклов, последовательная версия поддерживала экземпляр
    Set
    , с набором ранее посещённый позиций; класс
    ConcurrentPuzzleSolver использует той же цели экземпляр
    ConcurrentHashMap
    Такой подход обеспечивает потокобезопасность и позволяет избежать возникновения состояния гонки, присущего условному обновлению совместно используемой коллекции, за счёт использования метода putIfAbsent для атомарного добавления позиции только в том случае, если она не была ранее посещена. Для хранения состояния поиска, параллельная версия решателя головоломок использует, вместо стека вызовов, внутреннюю рабочую очередь пула потоков.
    Параллельный подход также приводит к подмене ограничения одной формы на другую, которая может быть более подходящей для данной проблемной области.
    Последовательная версия выполняет поиск “в глубину”, поэтому поиск ограничен доступным размером стека. Параллельная версия выполняет поиск “в ширину” и поэтому свободна от ограничения размера стека (но все еще может столкнуться с проблемой нехватки памяти, если набор позиций для посещения или уже посещённых позиций превысит доступную память).
    Для того, чтобы остановить поиск, когда решение найдено, нам нужен способ определения, нашел ли какой-либо поток решение. Если мы хотим принять первое найденное решение, нам также необходимо обеспечить обновление решения только в том случае, если ни одна другая задача ранее не нашла его. Эти требования описывают некоторую разновидность защёлки (см. раздел
    5.5.1
    ) и в частности, защелку с результатом (result-bearing latch). Мы могли бы легко создать блокирующую защёлку с результатом, используя методы, описанные в главе 14, но часто проще и менее подвержено ошибкам использовать существующие библиотечные классы, а не низкоуровневые механизмы языка.
    Класс
    ValueLatch из листинга 8.17 использует класс
    CountDownLatch для обеспечения необходимого фиксирующего поведения и использует блокировку, чтобы гарантировать факт того, что решение будет установлено только один раз.
    @ThreadSafe public class ValueLatch {
    @GuardedBy("this") private T value = null; private final CountDownLatch done = new CountDownLatch(1); public boolean isSet() { return (done.getCount() == 0);
    } public synchronized void setValue(T newValue) { if (!isSet()) { value = newValue; done.countDown();
    }
    } public T getValue() throws InterruptedException { done.await(); synchronized (this) { return value;

    }
    }
    }
    Листинг 8.17 Защёлка с результатом, используемая классом
    ConcurrentPuzzleSolver
    Каждая задача сначала проверяет защёлку с решением и прекращает выполнение, если решение уже найдено. Главный поток должен ожидать, пока решение не будет найдено; метод getValue экземпляра
    ValueLatch блокируется, пока какой либо поток не установит значение. Класс
    ValueLatch обеспечивает способ хранения значения таким образом, что только первый вызов фактически устанавливает значение, вызывающие объекты могут проверить, было ли значение установлено, и вызывающие объекты могут быть заблокированы в ожидании его установки. Решение обновляется при первом вызове метода setValue
    , а счётчик экземпляра
    CountDownLatch уменьшается, освобождая, таким образом, основной поток решателя от ожидания в методе getValue
    Первый нашедший решение поток также завершает работу экземпляра
    Executor
    , для предотвращения отправки новых задач. Чтобы избежать необходимости иметь дело с исключением
    RejectedExecutionException
    , обработчик отклонённых задач должен быть настроен на то, чтобы отбрасывать отправляемые задачи. Затем все незавершенные задачи, в конечном итоге, завершают своё выполнение, и любые последующие попытки выполнить новые задачи тихо прерываются, позволяя исполнителю завершить свою работу. (Если задачи выполняются слишком долго, мы могли бы прервать их, вместо того, чтобы позволить им завершить своё выполнение.)
    Класс
    ConcurrentPuzzleSolver плохо справляется со случаем, когда у головоломки нет решения: если все возможные ходы и позиции были оценены и решение не найдено, метод solve зависнет в вечном ожидании результата вызова метода getSolution
    . Последовательная версия завершает свою работу, когда исчерпает всё пространство поиска, но процесс завершения работы параллельных программ иногда может быть сложнее. Одним из возможных решений является сохранение количества активных задач решателя и присвоение решению значения null при уменьшении количества активных задач до нуля, как показано в листинге
    8.18.
    public class PuzzleSolver extends ConcurrentPuzzleSolver
    { private final AtomicInteger taskCount = new AtomicInteger(0); protected Runnable newTask(P p, M m, Node n) { return new CountingSolverTask(p, m, n);
    } class CountingSolverTask extends SolverTask {
    CountingSolverTask(P pos, M move, Node prev) { super(pos, move, prev); taskCount.incrementAndGet();
    } public void run() { try {
    super.run();
    } finally { if (taskCount.decrementAndGet() == 0) solution.setValue(null);
    }
    }
    }
    }
    Листинг 8.18 Решатель, с механизмом распознавания отсутствия решения
    Поиск решения также может занять больше времени, чем мы готовы ожидать; есть несколько дополнительных условий завершения, которые мы могли бы наложить на решатель. Одним из них является ограничение по времени; его легко добавить, реализовав временную версию метода getValue в классе
    ValueLatch
    (который будет использовать временную версию метода await) и, завершая работу экземпляра
    Executor с объявлением сбоя в том случае, если истечёт время ожидания, установленное методу getValue
    . Другой подход - это своего рода метрика, специфичная для головоломки, такая как поиск только до определенного количества позиций. Или мы можем предусмотреть механизм отмены и позволить клиенту принять собственное решение о том, когда следует прекратить поиск.
    8.6 Итоги
    Фреймворк
    Executor представляет собой мощный и гибкий фреймворк для обеспечения параллельного выполнения задач. Он предлагает ряд настроечных параметров, таких как политика создания и завершения потоков, обработка задач в очереди и настройка действий по отношению к избыточным задачам, а также предоставляет несколько хуков, расширяющих его поведение. Однако, как и в большинстве других мощных фреймворков, существуют комбинации параметров, которые совместно работают плохо; некоторые типы задач требуют определенных политик выполнения, а некоторые комбинации настроечных параметров могут привести к странным результатам.

    1   ...   13   14   15   16   17   18   19   20   ...   34


    написать администратору сайта