Главная страница
Навигация по странице:

  • 6.2.4 Жизненный цикл экземпляра Executor

  • Листинг 6.8

  • 6.3 Поиск уязвимостей параллелизма

  • 6.3.1 Пример: последовательный генератор страниц

  • 6.3.2 Задачи возвращающие результат: Callable и Future

  • 6.3.3 Пример: отрисовка страниц с помощью Future

  • 6.3.4 Ограничения распараллеливания разнородных задач

  • 6.3.5 CompletionService: Executor пересекается с BlockingQueue

  • Листинг 6.14

  • 6.3.7 Ограничение времени выполнения задач

  • Листинг 6.16

  • Листинг 6.17

  • При участии Тима Перлса, Джошуа Блоха, Джозева Боубира, Дэвида Холмса и Дага Ли Параллельное программирование в java на


    Скачать 4.97 Mb.
    НазваниеПри участии Тима Перлса, Джошуа Блоха, Джозева Боубира, Дэвида Холмса и Дага Ли Параллельное программирование в java на
    Анкорjava concurrency
    Дата28.11.2019
    Размер4.97 Mb.
    Формат файлаpdf
    Имя файлаJava concurrency in practice.pdf
    ТипДокументы
    #97401
    страница12 из 34
    1   ...   8   9   10   11   12   13   14   15   ...   34
    newFixedThreadPool
    .
    Пул потоков фиксированного размера, создает потоки по мере отправки задач, вплоть до максимального размера пула, а затем старается сохранять размер пула неизменным (добавляя новые потоки, если поток умирает из-за возникновения неожиданного исключения
    Exception
    ).
    newCachedThreadPool
    .
    Кэшированный пул потоков обладает большой гибкостью в утилизации простаивающих потоков, когда текущий размер пула превышает потребность в обработчиках, и в добавления новых потоков, при увеличении потребности, но не накладывает ограничений на размер пула.
    newSingleThreadExecutor
    .
    Однопоточная реализация интерфейса
    Executor создает единственный рабочий поток для обработки задач, заменяя его, если он неожиданно умирает. Гарантируется, что задачи будут обрабатываться последовательно в соответствии с порядком, определяемым очередью задач
    (FIFO, LIFO, в порядке приоритета).
    75
    newScheduledThreadPool
    .
    Пул потоков фиксированного размера, поддерживающий отложенное и периодичное выполнение задач, подобно классу
    Timer
    (см. раздел
    6.2.5
    ).
    Фабричные методы newFixedThreadPool и newCachedThreadPool возвращают экземпляры класса общего назначения -
    ThreadPoolExecutor
    , которые могут непосредственно использоваться для создания более специализированных экземпляров интерфейса
    Executor
    . Подробнее параметры конфигурации пула потоков обсудим в главе 8.
    Веб-сервер в классе
    TaskExecutionWebServer использует экземпляр интерфейса
    Executor с ограниченным пулом рабочих потоков. Вызов метода execute отправляет задачу в рабочую очередь, а рабочие потоки непрерывно забирают задачи из рабочей очереди и выполняют их.
    Переход от политики "один поток на одну задачу" к политике на основе пула потоков, оказывает большое влияние на стабильность приложения: веб-сервер больше не будет отказывать под большой нагрузкой
    76
    . Как следствие, это приводит
    75
    Однопоточные реализации интерфейса
    Executor также обеспечивают достаточную внутреннюю синхронизацию, чтобы гарантировать, что любая запись в память, сделанная задачей, будет видна последующим задачам; это означает, что объекты могут быть безопасно ограничены “потоком задачи”, даже если этот поток может время от времени заменяться другим.
    76
    Сервер не может отказать (упасть) из-за создания слишком большого количества потоков, но, если скорость поступления задач превышает скорость их обработки достаточно длительное время, все еще возможно (просто сложнее) исчерпать память из-за растущей очереди экземпляров
    Runnable
    ,
    к плавному снижению производительности, поскольку не создаются тысячи потоков, конкурирующих за ограниченные ресурсы ЦП и памяти. А использование фреймворка
    Executor открывает двери для всех видов дополнительных возможностей по настройке, управлению, мониторингу, ведению журнала, отчетах об ошибках и других, которые было бы гораздо сложнее добавить без фреймворка выполнения задач.
    6.2.4 Жизненный цикл экземпляра Executor
    Ранее мы уже рассматривали, как создать экземпляр интерфейса
    Executor
    , но не как завершить его. Реализация
    Executor
    , вероятнее всего, создаст потоки для обработки задач. Но JVM не сможет завершить свою работу до тех пор, пока все потоки (не демоны) не завершаться, поэтому неудача с закрытием экземпляра
    Executor может помешать завершению работы JVM.
    Поскольку экземпляр
    Executor обрабатывает задачи асинхронно, то есть в любой момент времени, состояние ранее переданных на обработку задач не сразу становится очевидным. Некоторые из них могут быть завершены, некоторые могут быть запущены в текущий момент, а другие могут быть поставлены в очередь ожидания выполнения. Существует целый спектр вариантов завершения работы приложения - от мягкого выключения (завершается то, что было ранее начато, но новая работа не принимается) до внезапного выключения (выключается питание в комнате с компьютером), а также различные варианты между ними. Поскольку экземпляры
    Executor предоставляют приложениям службы, они также должны иметь возможность завершать свою работу, как плавно, так и внезапно, и передавать приложения сведения о состоянии задач, на которые повлияло завершение работы.
    Рассматривая вопросы жизненного цикла службы выполнения отметим, что интерфейс ExecutorService расширяет интерфейс Executor, добавляя ряд методов для управления жизненным циклом (а также некоторые удобные методы для отправки задач). Методы интерфейса ExecutorService, используемые для управления жизненным циклом, показаны в листинге 6.7. public interface ExecutorService extends Executor { void shutdown();
    List shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    // ... additional convenience methods for task submission
    }
    Листинг 6.7 Методы управления жизненным циклом интерфейса
    ExecutorService
    Жизненный цикл, подразумеваемый интерфейсом
    ExecutorService
    , включает в себя три состояния: запущено (running), завершение работы (shutting down) и
    завершено (terminated). Экземпляры интерфейса
    ExecutorService изначально создаются в состоянии запущено. Метод shutdown инициирует мягкое завершение ожидающих выполнения. Эта проблема может быть решена в рамках фреймворка
    Executor с помощью ограниченной рабочей очереди - см. раздел
    8.3.2
    работы: новые задачи не принимаются к обработке, но ранее переданные завершаются, включая те, которые, что еще не принимались на выполнение. Метод shutdownNow производит внезапное завершение работы: он пытается отменить выполняющиеся задачи и не запускает какие-либо задачи, которые находятся в очереди, но не запущены.
    Задачи, передаваемые экземпляру
    ExecutorService после того, как его работа была завершена, обрабатываются обработчиком отклонённых задач (rejected
    execution handler, см. раздел
    8.3.3
    ), который может тихо избавляться от задач или может бросать непроверяемое исключение
    RejectedExecutionException
    . Как только все задачи будут выполнены, экземпляр
    ExecutorService перейдёт в состояние завершено. Можно дождаться, пока служба
    ExecutorService достигнет завершенного состояния с помощью метода awaitTermination
    , или опрашивать, завершилась ли она с помощью метода isTerminated
    . Как правило, следом за вызовом метода shutdown следует вызов метода awaitTermination
    , тем самым создавая эффект синхронного завершения работы экземпляра
    ExecutorService
    (Завершение работы экземпляра
    Executor и отмена задач подробно рассматриваются в главе 7).
    Класс
    LifecycleWebServer из листинга 6.8, расширяет функциональность нашего веб-сервера, добавляя поддержку управления жизненным циклом. Сервер может быть погашен двумя способами: программным путем, с помощью вызова метода stop
    , и с помощью запроса со стороны клиента, путём отправки веб- серверу специально отформатированного HTTP-запроса. class LifecycleWebServer { private final ExecutorService exec = ...; public void start() throws IOException {
    ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { public void run() { handleRequest(conn); }
    });
    } catch (RejectedExecutionException e) { if (!exec.isShutdown()) log("task submission rejected", e);
    }
    }
    } public void stop() { exec.shutdown(); } void handleRequest(Socket connection) {
    Request req = readRequest(connection); if (isShutdownRequest(req)) stop(); else dispatchRequest(req);
    }

    }
    Листинг 6.8 Веб-сервер с поддержкой завершения работы
    6.2.5 Отложенные и периодические задачи
    Класс
    Timer предоставляет возможность управления выполнением отсроченных
    (“выполнить эту задачу через 100мс”) и периодических (“выполнять задачу каждые
    10 мс”) задач. Однако
    Timer
    Timer имеет некоторые недостатки, и класс
    ScheduledThreadPoolExecutor следует рассматривать как его замену
    77
    . Вы можете создать экземпляр класса
    ScheduledThreadPoolExecutor с помощью его конструктора или с помощью фабричного метода newScheduledThreadPool
    Класс
    Timer создает только один поток, выполняющий задания таймера. Если задание таймера выполняется слишком долго, может пострадать точность синхронизации других экземпляров
    TimerTask
    . Если экземпляр
    TimerTask запланирован для повторного выполнения каждые 10 мс, а другой экземпляр
    TimerTask на повтор каждые 40 мс, повторяющаяся задача либо (в зависимости от того, была ли она запланирована с фиксированной скоростью или с фиксированной задержкой) вызывается четыре раза подряд после завершения длительной задачи, либо “пропускает” четыре вызова полностью. Запланированные пулы потоков устраняют это ограничение, позволяя предоставлять несколько потоков для выполнения отложенных и периодических задач.
    Другая проблема с классом
    Timer заключается в том, что он ведет себя плохо, если экземпляр
    TimerTask бросает непроверяемое исключение. Поток таймера не перехватывает исключение, поэтому непроверяемое исключение, брошенное экземпляром
    TimerTask
    , завершает поток таймера. Класс
    Timer также не воскрешает поток в этой ситуации; вместо этого он ошибочно предполагает, что весь экземпляр
    Timer был отменен. В этом случае задачи таймера, которые уже были запланированы, но еще не выполнены, никогда не выполняются, и новые задачи не смогут быть запланированы (эта проблема, называемая "утечкой потоков", описана в разделе 7.3 вместе с методами ее предотвращения).
    Класс
    OutOfTime из листинга 6.9 иллюстрирует, каким образом класс
    Timer может запутаться, и как путаница любит компанию, как класс
    Timer разделяет своё замешательство со следующим незадачливым вызывающим объектом, который пытается отправить экземпляр
    TimerTask
    . Можно ожидать, что программа будет работать в течение шести секунд и завершит работу, но на самом деле происходит так, что она завершается через одну секунду с возбуждением исключения
    IllegalStateException
    , текст которого звучит, как “Таймер уже отменен”. Класс
    ScheduledThreadPoolExecutor должным образом имеет дело с некорректными задачами; существует мало причин для использования класса
    Timer в Java 5.0 и выше. public class OutOfTime { public static void main(String[] args) throws Exception {
    Timer timer = new Timer(); timer.schedule(new ThrowTask(), 1);
    SECONDS.sleep(1); timer.schedule(new ThrowTask(), 1);
    77
    Класс
    Timer осуществляет планирование на основе абсолютного, а не относительного времени, так что на задачи может оказываться влияние при изменении в системных часах; класс
    ScheduledThreadPoolExecutor использует в своей работе только относительное время.

    SECONDS.sleep(5);
    } static class ThrowTask extends TimerTask { public void run() { throw new RuntimeException(); }
    }
    }
    Листинг 6.9 Класс, иллюстрирующий запутанное поведение класса
    Timer
    Если вам необходимо создать свою собственную службу планировщик, вы все равно можете воспользоваться библиотекой, используя класс
    DelayQueue - реализацию
    BlockingQueue
    , обеспечивающую функциональность планировщика с помощью класса
    ScheduledThreadPoolExecutor
    . Класс
    DelayQueue управляет коллекцией объектов
    Delayed
    . Класс
    Delayed содержит время задержки: класс
    DelayQueue позволяет получить элемент, только если его задержка истекла.
    Объекты возвращаются из экземпляра
    DelayQueue в порядке, определённом временем их задержки.
    6.3 Поиск уязвимостей параллелизма
    Фрэймворк
    Executor упрощает определение политики выполнения, но для того, чтобы его использовать, вы должны быть в состоянии описать свою задачу как экземпляр интерфейса
    Runnable
    . В большинстве серверных приложений существует очевидная граница задачи: один запрос от клиента. Но иногда хорошие границы задач не так очевидны, как во многих десктопных приложениях. В серверных приложениях также может использоваться параллелизм в рамках одного запроса клиента, как это иногда бывает в серверах баз данных (для дальнейшего обсуждения конкурирующих тенденций проектирования при выборе границ задач см. [CPJ 4.4.1.1]).
    В этом разделе мы разрабатываем несколько версий компонента, допускающих различную степень параллелизма. Наш пример компонента - это часть отрисовки страницы в приложении браузера, которая берет страницу HTML и отображает ее в буфер изображений. Для простоты мы предполагаем, что HTML состоит только из размеченного текста с элементами изображения с заранее заданными размерами и
    URL.
    6.3.1 Пример: последовательный генератор страниц
    Простейший подход заключается в последовательной обработке HTML-документа.
    При обнаружении текстовой разметки отрисуйте ее в буфер изображений; при обнаружении ссылок на изображения извлеките изображения по сети и отрисуйте их в буфер изображений. Этот подход легко реализовать и он требует касания каждого элемента ввода только один раз (буферизации документа даже не требуется), но, вероятнее всего он будет раздражать пользователя, которому возможно придется долго ждать, прежде чем весь текст будет отображён.
    Менее раздражающий, но все же последовательный подход включает в себя сначала отрисовку текстовых элементов, оставляя прямоугольные заполнители для изображений, а после завершения начального прохода по документу, возврат, загрузку изображений и их отрисовку в соответствующие заполнители. Пример такого подхода представлен в классе
    SingleThreadRenderer
    , в листинге 6.10.
    public class SingleThreadRenderer { void renderPage(CharSequence source) { renderText(source);
    List imageData = new ArrayList(); for (ImageInfo imageInfo : scanForImageInfo(source)) imageData.add(imageInfo.downloadImage()); for (ImageData data : imageData) renderImage(data);
    }
    }
    Листинг 6.10 Последовательная отрисовка элементов страницы
    В основном, процесс загрузки изображения включает в себя ожидание завершения операций ввода/вывода, и в течение этого времени процессор выполняет достаточно мало работы. Таким образом, последовательный подход может привести к недоиспользованию процессора, а также заставляет пользователя ожидать дольше, чем необходимо, для того, чтобы увидеть готовую страницу. Мы можем добиться более эффективного использования и отзывчивости, разбив проблему на независимые задачи, которые могут выполняться одновременно.
    6.3.2 Задачи возвращающие результат: Callable и Future
    Фрэймворк
    Executor использует интерфейс
    Runnable в качестве базового представления задачи. Интерфейс
    Runnable представляет собой довольно ограниченную абстракцию; метод run не может возвращать значения или бросать проверяемые исключения, однако он может оказывать сторонние воздействия, такие как запись в файл журнала или размещение результата в общей структуре данных.
    Многие задачи, по своей сути, являются отложенными вычислениями - выполнение запроса к базе данных, получение ресурса по сети или вычисление сложной функции. Для этих типов задач интерфейс
    Callable является лучшей абстракцией: он ожидает, что основная точка входа, метод call
    , вернет значение и ожидает, что он может бросить исключение
    78
    . Класс
    Executors включает в себя несколько методов-утилит, предназначенных для упаковки других типов задач, в том числе экземпляров
    Runnable и java.security.PrivilegedAction
    , в экземпляры интерфейса
    Callable
    Интерфейсы
    Runnable и
    Callable описывают абстрактные вычислительные задачи. Задачи обычно конечны: у них есть четкая отправная точка, и они в конечном итоге завершаются. Жизненный цикл задачи, выполняемой экземпляром
    Executor
    , состоит из четырех фаз: создано (created), отправлено (submitted),
    запущено (started) и завершено (completed). Поскольку выполнение задач может занимать много времени, мы также хотим иметь возможность отменять задачу. В фреймворке
    Executor задачи, которые были отправлены, но еще не запущены, всегда могут быть отменены, а выполняющиеся задачи иногда могут быть отменены, если они реагируют на прерывание. Отмена ранее завершенной задачи не оказывает никакого эффекта (отмена более подробно рассматривается в главе 7).
    78
    Для описания задачи, не возвращающей значения, с помощью интерфейса
    Callable
    , используйте
    Callable

    Интерфейс
    Future отражает жизненный цикл задачи и предоставляет методы для проверки того, была ли задача завершена или отменена, получения результата выполнения и отмены задачи. Интерфейсы
    Callable и
    Future представлены в листинге 6.11. В спецификации интерфейса
    Future неявно подразумевается, что жизненный цикл задачи может продвигаться только вперед, а не назад - как и жизненный цикл интерфейса
    ExecutorService
    . Как только задача выполнена, она остается в этом состоянии навсегда. public interface Callable {
    V call() throws Exception;
    } public interface Future { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone();
    V get() throws InterruptedException, ExecutionException,
    CancellationException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
    CancellationException, TimeoutException;
    }
    Листинг 6.11 Интерфейсы
    Callable и
    Future
    Поведение метода get зависит от состояния задачи (еще не запущено, выполняется, завершено). Если задача уже выполнена метод, немедленно возвращает результат или бросает исключение
    Exception
    , иначе блокируется до завершения задачи. Если задача завершается путем возбуждения исключения, метод get оборачивает его в экземпляр
    ExecutionException и пробрасывает дальше; если задача была отменена, метод get бросает исключение
    CancellationException
    Если метод get бросает исключение
    ExecutionException
    , базовое исключение может быть получено с помощью метода getCause
    Существует несколько способов создания экземпляра
    Future
    , описывающего задачу. Все методы submit интерфейса
    ExecutorService возвращают экземпляры
    Future
    , так что вы можете отправить экземпляры
    Runnable или
    Callable исполнителю и получить назад экземпляр
    Future
    , который может быть использован для получения результата или отмены задачи. Можно также явно создать экземпляр
    FutureTask для переданного экземпляра
    Runnable или
    Callable
    (поскольку класс
    FutureTask реализует интерфейс
    Runnable
    , он может быть передан экземпляру
    Executor для выполнения или непосредственно выполнен, путем вызова метода run
    ).
    Начиная с Java 6, реализации интерфейса
    ExecutorService имеют возможность переопределять метод newTaskFor класса
    AbstractExecutorService
    , и таким образом получают возможность контролировать инстанциирование
    (instantiation) экземпляров
    Future
    , соответствующих представленным экземплярам
    Callable или
    Runnable
    Реализация по умолчанию просто создает новый экземпляр
    FutureTask
    , как показано в листинге 6.12.
    protected RunnableFuture newTaskFor(Callable task) { return new FutureTask(task);
    }
    Листинг 6.12 Реализация “по умолчанию” метода newTaskFor класса
    ThreadPoolExecutor
    Отправка экземпляров
    Runnable или
    Callable экземпляру
    Executor основывается на безопасной публикации (см. раздел
    3.5
    ) экземпляров
    Runnable или
    Callable от отправляющего потока к потоку, который, в конечном счете, выполнит задачу. Аналогично, установка значения результата экземпляру
    Future представляет собой безопасную публикацию результата из потока, в котором он был вычислен, в любой поток, который получает результат через вызов метода get
    6.3.3 Пример: отрисовка страниц с помощью Future
    В качестве первого шага в повышении степени параллелизма отрисовщика страниц, давайте разделим его на две задачи, одна из которых будет отрисовывать текст, а другая будет загружать все изображения (поскольку одна задача в значительной степени ограничена ЦП, а другая задача в большей степени ограничена операциями ввода/вывода, такой подход может привести к улучшению производительности даже в системах с одним ЦП).
    Интерфейсы
    Callable и
    Future могут помочь нам отразить взаимодействие между этими кооперирующимися задачами. В классе
    FutureRenderer в
    листинге
    6.13, мы создаем экземпляр интерфейса
    Callable для загрузки всех изображений и отправляем его экземпляру
    ExecutorService
    . Он возвращает экземпляр
    Future
    , описывающий выполнение задачи; когда основная задача доходит до точки, в которой ей нужны изображения, она ожидает результата вызова
    Future.get
    . Если нам повезет, результаты к моменту запроса уже будут готовы; в противном случае, мы, по крайней мере, получили фору при загрузке изображений. public class FutureRenderer { private final ExecutorService executor = ...; void renderPage(CharSequence source) { final List imageInfos = scanForImageInfo(source);
    Callable> task = new Callable>() { public List call() {
    List result
    = new ArrayList(); for (ImageInfo imageInfo : imageInfos) result.add(imageInfo.downloadImage()); return result;
    }
    };
    Future> future = executor.submit(task); renderText(source); try {

    List imageData = future.get(); for (ImageData data : imageData) renderImage(data);
    } catch (InterruptedException e) {
    // Re-assert the thread’s interrupted status
    Thread.currentThread().interrupt();
    // We don’t need the result, so cancel the task too
    future.cancel(true);
    } catch (ExecutionException e) { throw launderThrowable(e.getCause());
    }
    }
    }
    Листинг 6.13 Ожидание загрузки изображений с использованием экземпляра
    Future
    По своей природе метод get зависим от состояния, это значит, что вызывающий объект не должен ничего знать о состоянии задачи, а безопасная публикация, свойственная отправке задачи и извлечению результата, делает этот подход потокобезопасным. Код обработки исключений, окружающий метод
    Future.get
    , может столкнуться с двумя возможными проблемами: задача в процессе выполнения породила исключение
    Exception
    , или поток вызванный методом get был прерван, прежде чем стали доступны результаты (см. разделы
    5.5.2
    и
    5.4
    ).
    Класс
    FutureRenderer позволяет тексту отрисовываться одновременно с загрузкой данных изображения. Когда все изображения загружены, они отображаются на странице. Это улучшение заключается в том, что пользователь быстрее видит результат и в том, что используется некоторый параллелизм, но мы можем всё сделать значительно лучше. Нет никакой необходимости в том, чтобы заставлять пользователей ожидать загрузки всех изображений; они, вероятнее всего, предпочтут видеть отдельные изображения, отрисовывающиеся по мере доступности.
    6.3.4 Ограничения распараллеливания разнородных задач
    В предыдущем примере, мы попытались параллельно выполнить два разных типа задач - загрузку изображений и отрисовку страницы. Но получить значительные улучшения производительности, пытаясь распараллелить последовательные гетерогенные задачи, может быть сложно.
    Два человека могут разделить работу по уборке посуды достаточно эффективно: один человек моет её, а другой сушит. Тем не менее, назначение различных типов задач каждому работнику масштабируется не очень хорошо; если появятся еще несколько человек, не очевидно, как они смогут помочь, не вставая на пути или без проведения значительной реструктуризации разделения труда.
    Если не найти меньших параллелизуемых частей среди подобных задач, такой подход приведёт к снижению отдачи.
    Еще одна проблемой связанная с разделением разнородных задач между несколькими работниками заключается в том, что задачи могут иметь несопоставимые размеры. Если вы разделите задачи A и B между двумя работниками, но A выполняется в десять раз больше времени, чем B, вы ускорите общий процесс только на 9%. Наконец, разделение задачи между несколькими
    работниками всегда сопряжено с определенными накладными расходами на координацию; для того чтобы разделение было оправданным, эти накладные расходы должны быть более чем компенсированы повышением производительности за счет параллелизма.
    Класс
    FutureRenderer использует две задачи: одну для отрисовки текста, а другую для загрузки изображений. Если отрисовка текста происходит гораздо быстрее, чем загрузка изображений, как это часто бывает на практике, результирующая производительность будет не сильно отличаться от последовательной версии, но код станет намного сложнее. Лучшее, что мы можем сделать с двумя потоками, это получить ускорение в два раза. Таким образом, попытка увеличить параллелизм за счет распараллеливания разнородных действий может быть очень трудоемкой, и существуют пределы того, сколько дополнительного параллелизма можно получить за счёт распараллеливания. (См. разделы
    11.4.2
    и
    11.4.3
    , в них приводится другой пример того же явления.)
    Реальная отдача от разделения рабочей нагрузки программы на задачи возникает тогда, когда существует большое количество независимых однородных задач, которые могут обрабатываться параллельно.
    6.3.5 CompletionService: Executor пересекается с
    BlockingQueue
    Если у вас есть пакет вычислений для отправки экземпляру
    Executor и вы хотите получать результаты по мере их доступности, вы можете сохранить экземпляры интерфейса
    Future
    , связанные с каждой задачей, и периодически опрашивать их на предмет завершения выполнения, с помощью вызова метода get с нулевым таймаутом. Так делать можно, но это утомительно. К счастью, есть лучший способ:
    служба завершения (completion service).
    Интерфейс
    CompletionService объединяет в себе функционал интерфейсов
    Executor и
    BlockingQueue
    . Вы можете отправлять ему задачи для выполнения в виде экземпляров
    Callable и использовать методы подобные тем, что имеются у очередей - методы take и poll
    - для получения завершенных результатов, упакованных в экземпляры интерфейса
    Future
    , по мере их доступности. Класс
    ExecutorCompletionService реализует интерфейс
    CompletionService
    , делегируя вычисления интерфейсу
    Executor.
    Реализация класса
    ExecutorCompletionService довольно проста. Конструктор создает экземпляр
    BlockingQueue для хранения завершенных результатов. Класс
    FutureTask имеет метод done
    , который вызывается после завершения вычислений.
    Когда задача отправляется, она оборачивается классом
    QueueingFuture
    , являющимся подклассом
    FutureTask
    , переопределяющим метод done с целью помещать результат выполнения в экземпляр
    BlockingQueue
    , как показано в листинге 6.14. Методы take и poll делегируются экземпляру
    BlockingQueue
    , блокируясь, если результаты еще недоступны. private class QueueingFuture extends FutureTask {
    QueueingFuture(Callable c) { super(c); }
    QueueingFuture(Runnable t, V r) { super(t, r); }
    protected void done() { completionQueue.add(this);
    }
    }
    Листинг 6.14 Класс
    QueueingFuture используемый классом
    ExecutorCompletionService
    6.3.6 Пример: рендер страниц с использованием
    CompletionService
    Мы можем использовать интерфейс
    CompletionService для повышения производительности рендера страниц двумя способами: сократив общее времени выполнения и улучшив отзывчивость. Мы можем создавать отдельные задачи для загрузки каждого изображения и выполнять их в пуле потоков, превращая последовательную загрузку в параллельную: такой подход сократит общее время загрузки всех изображений.
    Извлекая результаты из экземпляра
    CompletionService и отрисовывая каждое изображение, как только оно станет доступно, мы можем предоставить пользователю более динамичный и отзывчивый пользовательский интерфейс. Пример реализации показан в классе
    Renderer
    , в листинге 6.15. public class Renderer { private final ExecutorService executor;
    Renderer(ExecutorService executor) { this.executor = executor; } void renderPage(CharSequence source) {
    List info = scanForImageInfo(source);
    CompletionService completionService = new ExecutorCompletionService(executor); for (final ImageInfo imageInfo : info)
    completionService.submit(new Callable() {
    public ImageData call() { return imageInfo.downloadImage();
    }
    }); renderText(source); try { for (int t = 0, n = info.size(); t < n; t++) {
    Future f = completionService.take();
    ImageData imageData = f.get(); renderImage(imageData);
    }
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    } catch (ExecutionException e) { throw launderThrowable(e.getCause());
    }

    }
    }
    Листинг 6.15 Использование интерфейса
    CompletionService для рендеринга элементов страницы по мере того, как они становятся доступны.
    Несколько экземпляров
    ExecutorCompletionService s могут совместно использовать один и тот же экземпляр
    Executor
    , так что будет вполне разумно создавать экземпляры
    ExecutorCompletionService s, закрытыми для конкретных вычислений, пока экземпляр
    Executor используется совместно. Когда интерфейс
    CompletionService используется подобным образом, он выступает в качестве обработчика для пакета вычислений, во многом повторяя поведение, интерфейса
    Future
    , выступающего в качестве обработчика для отдельного вычисления.
    Запомнив, сколько задач было отправлено экземпляру
    CompletionService
    , и подсчитывав, сколько завершенных результатов было получено, можно узнать, когда были получены все результаты для данного пакета, даже если экземпляр
    Executor используется совместно.
    6.3.7 Ограничение времени выполнения задач
    Иногда, если активность не завершает свою работу в течение определенного периода времени, результат становиться больше не нужен, и работа активности может быть прервана. Например, веб-приложение может получать объявления с внешнего сервера объявлений, но если объявление недоступно в течение двух секунд, оно отображает объявление по умолчанию, чтобы отсутствие объявлений не нарушало требований к отзывчивости сайта. Аналогично, сайт портала может параллельно получать данные из нескольких источников, но может быть готов ожидать, пока данные станут доступны, только определенное время, прежде чем отобразить страницу без них.
    Основной проблемой, при выполнении задач в рамках выделенного бюджета времени, является возможность убедиться, что для получения ответа или выяснения его отсутствия, вам не придётся ожидать дольше, чем позволяет бюджет времени. Версия
    Future.get с параметром “время ожидания” поддерживает это требование: она возвращает результат, как только он готов, в случае если результат не был подготовлен в течение заданного периода ожидания - бросает исключение
    TimeoutException
    Вторичной проблемой при использовании ограниченных временем задач является их остановка по истечении выделенного периода времени, чтобы они не тратили вычислительные ресурсы впустую, продолжая вычислять результат, который не будет использован. Этого можно добиться сделав так, чтобы задача строго управляла собственным бюджетом времени и прерывала своё выполнение, когда у нее заканчивается время, или, отменяя задачу по истечении времени ожидания. Опять же, интерфейс
    Future может помочь и в этом случае; если ограниченная по времени (timed) версия get завершается возбуждением исключения
    TimeoutException
    , вы можете отменить задачу с использованием экземпляра
    Future
    . Если задача написана как “отменяемая” (см. главу
    7
    ), ее можно завершить досрочно, чтобы не использовать излишние ресурсы. Такой подход используется в Листингах 6.13 и 6.16.
    В листинге 6.16 показано типичное применение “временного” метода
    Future.get
    . Метод renderPageWithAd генерирует составную веб-страницу, содержащую запрошенное содержимое и рекламу, получаемую с сервера объявлений. Он отправляет задачу, извлекающую рекламу, исполнителю, обрабатывает остальную часть содержимого страницы, а затем ожидает получения
    рекламы, пока не закончится выделенный ему бюджет времени
    79
    . Если время ожидания вызова метода get истекло, задача извлечения рекламы отменяется
    80
    и вместо нее используется объявление по умолчанию.
    Page renderPageWithAd() throws InterruptedException { long endNanos = System.nanoTime() + TIME_BUDGET;
    Future f = exec.submit(new FetchAdTask());
    // Render the page while waiting for the ad
    Page page = renderPageBody();
    Ad ad; try {
    // Only wait for the remaining time budget
    long timeLeft = endNanos - System.nanoTime(); ad = f.get(timeLeft, NANOSECONDS);
    } catch (ExecutionException e) { ad = DEFAULT_AD;
    } catch (TimeoutException e) { ad = DEFAULT_AD; f.cancel(true);
    } page.setAd(ad); return page;
    }
    Листинг 6.16 Получение рекламы в рамках выделенного на процедуру бюджета времени
    6.3.8 Пример: портал бронирования путешествий
    Подход к составлению бюджета времени, описанный в предыдущем разделе, можно легко обобщить на произвольное число задач. Рассмотрим портал бронирования путешествий: пользователь вводит даты поездки и требования, а портал извлекает и отображает предложения от ряда авиакомпаний, отелей или компаний по прокату автомобилей. В зависимости от компании, получение предложения может включать использование вызова веб-службы, консультации с базой данных, выполнение транзакции EDI или использование другого механизма.
    Вместо того чтобы время отклика страницы привязывать к скорости ответа самого медленного запроса, может быть предпочтительнее представлять только ту информацию, которая была получена в рамках выделенного бюджета времени.
    Провайдеров, которые не ответили вовремя на запрос, страница могла бы полностью опустить или отобразить заставку, вроде такой “ответ от Air Java не получен”.
    Получение предложения от одной компании не зависит от получения предложения от другой, поэтому получение одного предложения является разумной границей задачи, которая позволяет получать предложения параллельно.
    Было бы достаточно легко создать n задач, отправить их в пул потоков, сохранить
    79
    Значение тайм-аута, переданное методу get
    , вычисляется путем вычитания текущего времени из крайнего срока (deadline); на практике может быть вычислено отрицательное число, но все “временные” методы в java.util.concurrent рассматривают отрицательные значения тайм-аутов как ноль, поэтому никакой дополнительный код для обработки этого случая не требуется.
    80
    Параметр true в вызове метода
    Future.cancel означает, что поток задачи может быть прерван, если задача в данный момент выполняется; см. главу 7.
    экземпляры
    Future и использовать “временную” версию метода get для последовательной выборки каждого результата с помощью экземпляра
    Future
    , но есть еще более простой способ – вызов метода invokeAll
    В листинге 6.17, ограниченная по времени версия метода invokeAll используется для отправки нескольких задач экземпляру
    ExecutorService и получения результатов. Метод invokeAll принимает коллекцию задач и возвращает коллекцию экземпляров
    Future
    . Обе коллекции имеют идентичные структуры; метод invokeAll добавляет экземпляры
    Future к возвращаемой коллекции в порядке, установленном итератором коллекции задач, позволяя, таким образом, вызывающему объекту связывать экземпляры
    Future с экземплярами
    Callable
    , который он представляет. Ограниченная по времени версия метода invokeAll возвратит управление тогда, когда будут завершены все задачи, будет прерван вызывающий поток или когда истечёт время ожидания. Все задачи, которые не были завершены по истечении тайм-аута, отменяются. При возврате из метода invokeAll
    , каждая задача будет либо выполнена в обычном режиме, либо отменена; клиентский код может вызвать метод get или метод isCancelled
    , чтобы выяснить, в каком состоянии находится задача. Listing 6.17 uses the timed version of invokeAll to submit multiple tasks to an
    ExecutorService and retrieve the results. The invokeAll method takes a collection of tasks and returns a collection of
    Future s. The two collections have identical structures; invokeAll adds the
    Future s to the returned collection in the order imposed by the task collection’s iterator, thus allowing the caller to associate a
    Future with the
    Callable it represents. The timed version of invokeAll will return when all the tasks have completed, the calling thread is interrupted, or the timeout expires. Any tasks that are not complete when the timeout expires are cancelled.
    On return from invokeAll
    , each task will have either completed normally or been cancelled; the client code can call get or isCancelled to find out which. private class QuoteTask implements Callable { private final TravelCompany company; private final TravelInfo travelInfo; public TravelQuote call() throws Exception { return company.solicitQuote(travelInfo);
    }
    } public List getRankedTravelQuotes(
    TravelInfo travelInfo, Set companies,
    Comparator ranking, long time, TimeUnit unit) throws InterruptedException {
    List tasks = new ArrayList(); for (TravelCompany company : companies) tasks.add(new QuoteTask(company, travelInfo));
    List> futures =
    exec.invokeAll(tasks, time, unit);
    List quotes =
    new ArrayList(tasks.size());
    Iterator taskIter = tasks.iterator(); for (Future f : futures) {
    QuoteTask task = taskIter.next(); try { quotes.add(f.get());
    } catch (ExecutionException e) { quotes.add(task.getFailureQuote(e.getCause()));
    } catch (CancellationException e) { quotes.add(task.getTimeoutQuote(e));
    }
    }
    Collections.sort(quotes, ranking); return quotes;
    }
    Листинг 6.17 Запрос расценок на путешествия в рамках операции с ограниченным временем
    6.4 Итоги
    Структурирование приложений вокруг выполнения задач может упростить разработку и облегчить использование параллелизм. Фрэймворк Executor позволяет отделить отправку задач от политики выполнения и поддерживает множество различных политик выполнения; всякий раз, когда вы создаете потоки для выполнения задач, рассмотрите возможность использования фреймворка Executor.
    Чтобы извлечь максимальную пользу из разбиения приложения на задачи, необходимо определить разумные границы задачи. В некоторых приложениях очевидные границы задач работают хорошо, в то время как в других может потребоваться некоторый анализ для выявления деталей эксплуатируемого параллелизма.

    1   ...   8   9   10   11   12   13   14   15   ...   34


    написать администратору сайта