При участии Тима Перлса, Джошуа Блоха, Джозева Боубира, Дэвида Холмса и Дага Ли Параллельное программирование в java на
Скачать 4.97 Mb.
|
5.4 Методы блокирования и прерывания Потоки могут блокироваться или приостанавливаться по нескольким причинам: ожидание завершения операций ввода/вывода, ожидание получения блокировки, ожидание пробуждения из метода Thread.sleep или ожидание результатов вычислений других потоков. Когда поток блокируется, он обычно приостанавливается и переводится в одно из состояний блокировки ( BLOCKED , WAITING или TIMED_WAITING ). Различие между блокирующей операцией и обычной, только требующей длительного времени для завершения выполнения, состоит в том, что заблокированный поток должен ожидать события, которое находится вне его контроля, прежде чем сможет продолжить своё выполнение – например, завершение операции ввода/вывода, блокировка становится доступной или завершаются внешние (по отношению к потоку) вычисления. При возникновении такого внешнего события поток переходит в состояние RUNNABLE и снова становится доступен для планирования. Методы put и take интерфейса BlockingQueue бросают проверяемое исключение InterruptedException , как и несколько других библиотечных методов, таких как Thread.sleep . Когда метод может бросить исключение InterruptedException , это говорит вам о том, что это - блокирующий метод, и что в дальнейшем, если его выполнение будет прервано, это приведёт к раннему прекращению действия блокировки. Класс Thread предоставляет метод interrupt для прерывания потока и для запроса, был ли поток прерван. Каждый поток имеет свойство типа boolean , содержащее информацию о том, был ли поток прерван; прерывание потока устанавливает его состояние. Прерывание - кооперативный механизм. Один поток не может заставить другой прекратить то, что он делает, и сделать что-то иное; когда поток A прерывает поток B, поток A просто отправляет запрос, чтобы поток B остановил то, что он делает, когда достигнет удобной точки для остановки. Хотя в API или спецификации языка нет ничего, что требовало бы какой-либо конкретной семантики уровня приложения для выполнения прерывания, наиболее разумным использованием прерывания является отмена некоторого действия. Методы блокировки, реагирующие на прерывания, упрощают своевременную отмену операций, выполняющихся длительное время. Когда ваш код вызывает метод, который бросает исключение InterruptedException , ваш метод также будет являться блокирующим, и должен иметь план реакции на прерывание. Для библиотечного кода, в основном, существует два варианта: Передача InterruptedException выше по стеку. Часто, наиболее разумной политикой будет передача дальше – просто распространите исключение InterruptedException до вызывающего кода. Это может подразумевать отсутствие перехвата исключения InterruptedException , или перехват и повторное возбуждение, после выполнения некоторой краткой, специфичной дла активности, чистки. Восстановление прерывания. Иногда вы не можете бросить исключение InterruptedException , например, в том случае, когда ваш код является частью интерфейса Runnable . В такой ситуации необходимо перехватить исключение InterruptedException и восстановить состояние прерывания, путём вызова метода interrupt в текущем потоке, чтобы код выше по стеку вызовов мог увидеть, что было выполнено прерывание, как показано в листинге 5.10. Вы можете получить гораздо более сложные варианты с прерыванием, но эти два подхода должны работать в подавляющем большинстве ситуаций. Есть одна вещь, которую вы не должны делать с исключением InterruptedException – перехватывать его и ничего не делать в ответ. Это лишает код, находящийся выше в стеке вызовов, возможности действовать при прерывании, поскольку теряется доказательство того, что поток был прерван. Единственная ситуация, в которой приемлемо проглотить прерывание, - это когда вы расширяете класс Thread и, следовательно, контролируете весь код выше по стеку вызовов. Отмена и прерывание более подробно рассматриваются в главе 7. public class TaskRunnable implements Runnable { BlockingQueue processTask(queue.take()); } catch (InterruptedException e) { // restore interrupted status Thread.currentThread().interrupt(); } } } Листинг 5.10 Восстановление состояния interrupted, чтобы не “проглотить” прерывание 5.5 Синхронизаторы Блокирующие очереди уникальны среди классов коллекций: они не только действуют как контейнеры для объектов, но и могут координировать управление потоками производителя и потребителя, поскольку методы take и put блокируются до тех пор, пока очередь не перейдет в желаемое состояние (не пустая или не полная). Синхронизатор является любым объектом, который координирует поток управления (control flow) потоками на основе своего состояния. Блокирующие очереди могут действовать как синхронизаторы; другие типы синхронизаторов включают семафоры, барьеры и защёлки. В библиотеке платформы есть несколько классов синхронизатора; если они не соответствуют вашим потребностям, вы можете создать свой собственный синхронизатор, используя механизмы, описанные в главе 14. Все синхронизаторы разделяют определенные структурные свойства: они инкапсулируют состояние, которое определяет, должны ли потоки, поступающие в синхронизатор, проходить или принудительно ожидать, предоставляя методы для манипулирования этим состоянием, и предоставляя методы для эффективного ожидания синхронизатора, с целью ввода желаемого состояния. 5.5.1 Защёлки Защёлка (latches) - это синхронизатор, который может задерживать ход выполнения потоков до достижения ими конечного (terminal) состояния [CPJ 3.4.2]. Защелка действует как затвор: до тех пор, пока защелка не достигнет конечного состояния, затвор будет закрыт и никакие потоки не смогут пройти, в конечном состоянии затвор открываются, позволяя пройти всем потокам. Как только защелка достигнет конечного состояния, она более не сможет изменять своё состояние, поэтому она навсегда останется открытой. Защелки можно использовать для обеспечения того, чтобы определенные действия не выполнялись до завершения других разовых действий, таких как: • Обеспечение того, чтобы вычисление не выполнялось до тех пор, пока необходимые ему ресурсы не будут инициализированы. Простую бинарную (двухпозиционную) защелку можно использовать, чтобы указать, что “Ресурс R был инициализирован”, и любая активность, которая нуждается в ресурсе R, будет сначала ожидать на этой защелке. • Обеспечение того, чтобы служба не запускалась до тех пор, пока не запустятся другие службы, от которых она зависит. У каждой службы есть ассоциированная бинарная защелка; запуск службы S сперва, будет включать ожидание на защелках других служб, от которых зависит служба S, а затем освобождение защелки службы S после завершения запуска других служб, чтобы затем все службы, зависящие от S, могли продолжить выполнение. • Ожидание, пока все стороны, участвующие в некоторой деятельности, например игроки в многопользовательской игре, будут готовы продолжить. В этом случае защелка достигает конечного состояния после того, как все игроки готовы. Класс CountDownLatch представляет собой гибкую реализацию защелки, которая может использоваться в любой из этих ситуаций; он позволяет одному или нескольким потокам ожидать возникновения набора событий. Состояние защелки состоит из счетчика, инициализированного положительным числом, представляющего количество ожидающих событий. Метод countDown уменьшает счетчик, указывая на то, что событие произошло, и методы await ожидают достижения счетчиком значения ноль, что является следствием возникновения всех ожидаемых событий. Если на входе счетчик отличен от нуля, выполнение метода await блокируется до тех пор, пока счётчик не достигнет нуля, ожидающий поток будет прерван или истечёт время ожидания. Класс TestHarness из листинга 5.11, иллюстрирует два распространённых случая применения защелок Класс TestHarness создает несколько потоков, которые одновременно выполняют переданное задание. Он использует две защелки, «начальный затвор» и «конечный затвор». Начальный затвор инициализируется значением “один”; конечный затвор инициализируется счетчиком, равным количеству рабочих потоков. Первое, что делает каждый рабочий поток - выполняет ожидание на стартовом затворе; это гарантирует, что ни один из потоков не начнет выполнять работу, пока все они не будут готовы к запуску. Последнее, что делает каждый из них, - это уменьшение счётчика на конечном затворе; это позволяет главному потоку дождаться того момента, когда последний рабочий поток завершится, и он сможет вычислить прошедшее с момента запуска потоков время. public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end-start; } } Листинг 5.11 Использование класса CountDownLatch для запуска и остановки потоков в тестах на время Почему мы побеспокоились о защелках в классе TestHarness вместо того, чтобы просто запускать потоки сразу после их создания? По видимому, мы хотели измерить, сколько времени потребуется для параллельного выполнения задачи n раз. Если бы мы просто создали и запустили потоки, потоки, начатые ранее, имели бы "фору" перед потоками, запущенными позднее, и степень конкуренции изменялась бы с течением времени по мере увеличения или уменьшения числа активных потоков. Использование стартового затвора позволяет главному потоку сразу отпустить все рабочие потоки, а конечный затвор позволяет главному потоку ждать завершения только последнего потока, а не ждать последовательного завершения каждого потока. 5.5.2 Класс FutureTask Класс FutureTask также действует как защелка. (Класс FutureTask реализует интерфейс Future , описывающий абстрактный опорный результат (result-bearing) вычисления [CPJ 4.3.3].) Вычисление, представленное классом FutureTask , реализуется с использованием интерфейса Callable , возвращающим результат эквивалентом интерфейса Runnable и может находиться в одном из трех состояний: ожидание выполнения, выполнение или завершено. Под завершением понимаются все способы выполнения вычисления, включая обычное завершение, отмену и исключение. Как только экземпляр класса FutureTask переходит в завершенное состояние, он остается в этом состоянии навсегда. Поведение метода Future.get зависит от состояния задачи. Если она завершена, метод get немедленно возвращает результат, в противном случае метод блокируется до тех пор, пока задача не перейдет в завершенное состояние, а затем не вернет результат или не бросит исключение. Класс FutureTask передает результат потока, выполняющего вычисление, в поток (и), получающий(е) результат; спецификация класса FutureTask гарантирует, что эта передача основана на безопасной публикации результата. Класс FutureTask используется фрэймворком Executor для представления асинхронных задач и может также использоваться для представления любых потенциально длительных вычислений, которые могут быть запущены до того, как их результаты понадобятся. Класс Preloader из листинга 5.12 использует класс FutureTask для выполнения дорогостоящих вычислений, результаты которых понадобятся позже; начав вычисление на раннем этапе, вы сократите время, которое вам придется подождать, когда вам позже действительно понадобятся результаты. public class Preloader { private final FutureTask future = new FutureTask (new Callable () { public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; else throw launderThrowable(cause); } } } Листинг 5.12 Использование FutureTask для предварительной загрузки данных, котрые понадобятся позднее Класс Preloader создает экземпляр FutureTask , который описывает задачу загрузки информации о продукте из базы данных и поток, в котором будет выполняться вычисление. Он предоставляет метод start для запуска потока, так как нецелесообразно запускать поток из конструктора или статического инициализатора. Когда программе позже понадобится экземпляр ProductInfo , она может вызвать метод get , который вернёт загруженные данные, если они готовы, или будет ожидать завершения загрузки, если они ещё не готовы. Задачи, описанные интерфейсом Callable , могут бросать проверяемые и непроверяемые исключения, и любой код может бросать исключение Error Независимо от того, какое исключение может бросить код задачи, оно оборачивается исключением ExecutionException и перебрасывается из метода Future.get . Это усложняет код вызывающий метод get , и не только потому, что он должен иметь дело с возможностью возбуждения исключения ExecutionException (и непроверяемого исключения CancellationException ), но также и потому, что причина исключения ExecutionException возвращается как исключение Throwable, что неудобно в использовании. Когда метод get класса Preloader бросает исключение ExecutionException , причина исключения попадает в одну из трех категорий: проверяемое исключение брошенное интерфейсом Callable , RuntimeException или Error . Мы должны обрабатывать каждый из этих случаев отдельно, но в листинге 5.13 мы будем использовать метод-утилиту launderThrowable , чтобы инкапсулировать “грязную” часть логики обработки исключений. Перед вызовом метода launderThrowable , класс Preloader тестирует исключения на соответствие известным проверяемым исключения и, в случае успеха, пробрасывает их дальше. Таким образом, остаются только непроверяемые исключения, которые класс Preloader обрабатывает, вызывая метод launderThrowable и бросая результат. Если исключение Throwable , переданное методу launderThrowable , является экземпляром Error , метод launderThrowable пробрасывает его напрямую; если это не исключение RuntimeException , он бросает исключение IllegalStateException , чтобы указать на логическую ошибку. Таким образом, остаётся только исключение RuntimeException , которое метод launderThrowable возвращает вызывающему методу и которое вызывающий метод, как правило, пробрасывает дальше. / ** If the Throwable is an Error, throw it; if it is a * RuntimeException return it, otherwise throw IllegalStateException * / public static RuntimeException launderThrowable(Throwable t) { if (t instanceof RuntimeException) return (RuntimeException) t; else if (t instanceof Error) throw (Error) t; else throw new IllegalStateException("Not unchecked", t); } Листинг 5.13 Приведение непроверяемого исключения Throwable к типу RuntimeException 5.5.3 Семафоры Подсчет семафоров используется для управления количеством активностей, которые могут обращаться к определенному ресурсу или выполнять определенное действие в некоторый момент времени[CPJ 3.4.1]. Подсчет семафоров может использоваться для реализации пулов ресурсов или для наложения ограничений на коллекцию. Класс Semaphore управляет набором виртуальных разрешений; начальное количество разрешений передается в конструкторе класса Semaphore . Активности могут приобретать разрешения (до тех пор, пока есть в наличии) и освобождать разрешения, когда выполняют свою работу. Если разрешение не доступно, метод acquire блокируется до тех пор, пока не получит его (или до тех пор, пока не будет прерван или пока не истечёт время выполнения операции). Метод release возвращает разрешение семафору 67 Вырожденным случаем счетного семафора является бинарный семафор, то есть, семафор с начальным значением счётчика равным единице. Бинарный семафор может использоваться как мьютекс с семантикой нереентерабельной блокировки; тот, кто имеет единственное разрешение, держит мьютекс. Семафоры полезны для реализации пулов ресурсов, таких как пулы соединений с базами данных. Довольно легко построить пул фиксированного размера, который упадёт с ошибкой, если вы запросите ресурс из пустого пула, но что вы действительно хотите, так это заблокировать операцию получения, если пул пуст и вновь разблокировать её, как только появятся данные. Если вы инициализируете класс Semaphore размером пула, с помощью метода acquire приобретите 67 Реализация не имеет реальных объектов разрешений, и класс Semaphore не ассоциирует выданные разрешения с потоками, поэтому разрешение, приобретённое в одном потоке, может быть освобождено из другого потока. Вы можете думать о методе acquire как о потреблении разрешения и методе release , как о его создании; класс Semaphore не ограничен количеством разрешений, с которым он был создан. разрешение, прежде чем попытаетесь извлечь ресурс из пула, и отпустите разрешение с помощью метода release , после помещения ресурса в пул, метод acquire блокируется до тех пор, пока пул остаётся пустым. Этот подход используется в классе ограниченного буфера, в главе 12. (Более простой способ создания блокирующего пула объектов - использовать класс BlockingQueue для хранения полученных ресурсов.) Аналогичным образом, вы можете использовать класс Semaphore , чтобы превратить любую коллекцию в блокирующую ограниченную коллекцию, как показано в классе BoundedHashSet , в листинге 5.14. Семафор инициализируется желаемым максимальным значением размера коллекции. Операция add приобретает разрешение перед добавлением элемента в базовую коллекцию. Если базовая операция add фактически ничего не добавляет, она немедленно освобождает разрешение. Аналогичным образом, успешное выполнение операции remove освобождает разрешение, позволяя добавлять дополнительные элементы. Базовая реализация интерфейса Set ничего не знает об ограничении; всё это ложится на класс BoundedHashSet public class BoundedHashSet } public boolean add(T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) sem.release(); } } public boolean remove(Object o) { boolean wasRemoved = set.remove(o); if (wasRemoved) sem.release(); return wasRemoved; } } Листинг 5.14 Использование класса Semaphore для ограничения коллекции 5.5.4 Барьеры Мы видели, как защёлки могут облегчить запуск исполнения группы связанных активностей или ожидание завершения исполнения группы связанных активностей. Защёлки - одноразовые объекты; как только защелка переходит в терминальное состояние, ее нельзя сбросить. Барьеры похожи на защелки тем, что они блокируют группу потоков до тех пор, пока не произойдет какое-либо событие [CPJ 4.4.3]. Ключевое отличие барьера от защёлки состоит в том, что все потоки должны совместно подойти к барьеру в некоторый момент времени, только после этого они смогут продолжить выполнение. Защелки используются для ожидания событий; барьеры используются для ожидания других потоков. Барьер реализует протокол, который некоторые семьи используют для встречи в течение дня в торговом центре: “все встречаются в Макдональдсе в 6:00; как только вы доберетесь туда, оставайтесь там, пока все не появятся, а затем мы решим, что будем делать дальше”. Класс CyclicBarrier позволяет фиксированному числу участников неоднократно встречаться у барьера и полезен в параллельных итерационных алгоритмах, которые разбивают задачу на фиксированное число независимых подзадач. При достижении барьера потоки вызывают метод await , и метод await блокирует выполнение потоков до тех пор, пока все из них не достигнут барьера. Если все потоки приходят к барьеру, то барьер переходит в состояние “успешно пройден”, в этом случае все потоки освобождаются и барьер сбрасывается, поэтому его можно использовать вновь. Если истекает время вызова метода await или поток заблокированный вызовом метода await прерывается, то барьер считается сломанным и все исходящие вызовы метода await будут завершены с возбуждением исключения BrokenBarrierException . Если барьер успешно пройден, метод await возвращает уникальный индекс прихода для каждого потока, который можно использовать для “выбора” лидера, выполняющего особые действия в следующей итерации. Класс CyclicBarrier также позволяет передать конструктору действие барьера (barrier action); это экземпляр интерфейса Runnable , который выполняется (в одной из подзадач потоков), когда барьер успешно пройден, но до того, как заблокированные потоки будут освобождены. Барьеры часто используются в моделировании, где работа по вычислению одного шага может выполняться параллельно, но вся работа, связанная с заданным шагом, должна завершиться до перехода на следующий шаг. Например, при n- мерном моделировании частиц, каждый шаг вычисляет изменения в местоположении каждой частицы на основе местоположений и других атрибутов прочих частиц. Ожидание у барьера гарантирует, что все обновления для шага k завершаться прежде, чем станет доступной возможность перейти к шагу k + 1. Класс CellularAutomata в листинге 5.15 демонстрируют использование барьера для компьютерного моделирования клеточного автомата, например такого, как игра Конвея в жизнь (Gardner, 1970). При распараллеливании моделирования, как правило, нецелесообразно назначать отдельный поток для каждого элемента (в случае игры “Жизнь”, это клетка); это потребует слишком много потоков, и накладные расходы на их координацию сильно замедлят вычисления. Вместо этого имеет смысл разбить проблему на несколько частей, позволить каждому потоку заниматься только своей частью, а затем объединить результаты. Класс CellularAutomata разделяет доску на N CPU частей, где N CPU - это количество доступных процессоров, и назначает каждую часть отдельному потоку 68 . На каждом шаге рабочие потоки вычисляют новые значения для всех ячеек в своей части доски. Когда все рабочие потоки достигают барьера, действие барьера фиксирует новые значения в модели данных. После выполнения действия барьера, рабочие потоки освобождаются для вычисления следующего шага вычисления, который включает в себя сверку с результатом выполнения метода isDone , с целью определения, требуются ли дальнейшие итерации. public class CellularAutomata { private final Board mainBoard; private final CyclicBarrier barrier; private final Worker[] workers; public CellularAutomata(Board board) { this.mainBoard = board; int count = Runtime.getRuntime().availableProcessors(); this.barrier = new CyclicBarrier(count, new Runnable() { public void run() { mainBoard.commitNewValues(); }}); this.workers = new Worker[count]; for (int i = 0; i < count; i++) workers[i] = new Worker(mainBoard.getSubBoard(count, i)); } private class Worker implements Runnable { private final Board board; public Worker(Board board) { this.board = board; } public void run() { while (!board.hasConverged()) { for (int x = 0; x < board.getMaxX(); x++) for (int y = 0; y < board.getMaxY(); y++) board.setNewValue(x, y, computeValue(x, y)); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } 68 Для вычислительных задач, подобных этой, не осуществляющих операции ввода/вывода и не обращающихся к общим данным, количество потоков равное N cpu или N cpu + 1 даёт оптимальную пропускную способность; большее количество потоков не помогает и, фактически, может ухудшить производительность, поскольку потоки начинают конкурировать за ресурсы ЦП и памяти. public void start() { for (int i = 0; i < workers.length; i++) new Thread(workers[i]).start(); mainBoard.waitForConvergence(); } } Листинг 5.15 Координация вычислений в клеточном автомате с использованием класса CyclicBarrier Другой формой барьера является класс Exchanger - двусторонний барьер, в котором стороны обмениваются данными у барьера [CPJ 3.4.3]. Обменники полезны, когда стороны выполняют асимметричные действия, например, когда один поток заполняет буфер данными, а другой поток использует данные из буфера; эти потоки могут использовать класс Exchanger для встречи и обмена полного буфера на пустой. Когда два потока обмениваются объектами через класс Exchanger , обмен представляет собой безопасную публикацию обоих объектов другой стороне. Время обмена зависит от требований к скорости отклика приложения. Простейший подход заключается в том, что задача заполнения обменивается, когда буфер заполнен, и задача очистки обменивается, когда буфер пуст; это минимизирует количество обменов, но может задержать обработку некоторых данных, если скорость прибытия новых данных непредсказуема. Другой подход заключается в том, что заполнитель обменивается, когда буфер заполнен, но также и в том случае, когда буфер частично заполнен и прошло определенное время. 5.6 Создание эффективного и масштабируемого кэша результатов Почти каждое серверное приложение использует некоторую форму кэширования. Повторное использование результатов предыдущих вычислений позволяет уменьшить задержки и увеличить пропускную способность, за счет дополнительного использования памяти. Подобно многим другим часто изобретаемым велосипедам, кэширование обычно выглядит проще, чем есть на самом деле. Наивная реализация кэша, вероятно превратит узкое место производительности в узкое место масштабируемости, даже если это улучшит однопоточную производительность. В этом разделе мы разрабатываем эффективный и масштабируемый кэш результатов для ресурсоемкой функции. Начнем с очевидного подхода – простого объекта HashMap - а затем рассмотрим некоторые недостатки его параллелизма и способы их устранения. Интерфейс Computable в листинге 5.16 описывает функцию с входными данными типа A и результатом типа V . Класс ExpensiveFunction , реализующий интерфейс Computable , тратит значительное количество времени на вычисление результатов; мы хотели бы создать обёртку для интерфейса Computable, которая бы запоминала результаты предыдущих вычислений и инкапсулировала процесс кэширования. (Эта техника называется мемоизацией (memorization)) public interface Computable { V compute(A arg) throws InterruptedException; } public class ExpensiveFunction implements Computable { public BigInteger compute(String arg) { // after deep thought... return new BigInteger(arg); } } public class Memoizer1 implements Computable { @GuardedBy("this") private final Map cache = new HashMapV>(); private final Computable c; public Memoizer1(Computable c) { this.c = c; } public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } } Листинг 5.16 Начальная попытка кэширования с использованием класса HashMap и синхронизации. В классе Memoizer1 в листинге 5.16 показана первая попытка: использование класса HashMap для хранения результатов предыдущих вычислений. Метод compute сначала проверяет, закэширован ли желаемый результат, и возвращает предварительно вычисленное значение, если это так. В противном случае результат вычисляется и перед возвратом кэшируется в экземпляре HashMap Класс HashMap не является потокобезопасным, поэтому для обеспечения того, чтобы два потока не обращались к экземпляру HashMap одновременно, класс Memoizer1 использует консервативный подход, заключающийся в синхронизации всего метода compute . Такой подход гарантирует потокобезопасность, но имеет очевидную проблему с масштабируемостью: в общем случае, только один поток за раз может выполнять вычисления. Если другой поток занят вычислением результата, другие потоки, вызывающие метод compute , могут быть заблокированы на длительное время. Если несколько потоков находятся в очереди, ожидая вычисления значений, которые еще не вычислены, вычисление может занять больше времени, чем без использования мемоизации. На рисунке 5.2 иллюстрируется, что может произойти, когда несколько потоков пытаются использовать функцию, мемоизированную с помощью такого подхода. Это явно не тот сорт повышения производительности, которого мы надеялись достичь благодаря кешированию. Рисунок 5.2 Плохой параллелизм в классе Memoizer1 Класс Memoizer2 из листинга 5.17, улучшает ужасное параллельное поведение Memoizer1 , путём замены класса HashMap на класс ConcurrentHashMap Поскольку класс ConcurrentHashMap потокобезопасен, нет необходимости обеспечивать синхронизацию при доступе к кэширующему объекту Map , тем самым устраняется сериализация, вызванная синхронизацией метода compute в классе Memoizer1 Класс Memoizer2 , безусловно, обеспечивает лучшее параллельное поведение, чем класс Memoizer1 : несколько потоков могут использовать его одновременно. Но он все еще, выступая в качестве кэша, обладает некоторыми недостатками - существует окно уязвимости, в котором два потока, одновременно вызывающие метод compute , могут вычислить одно и то же значение. В случае использования принципа мемоизации это просто неэффективно - предназначение кэша состоит в том, чтобы предотвратить многократное вычисление одних и тех же данных. Для более универсального механизма кэширования дела обстоят намного хуже; для кэша объектов, который должен обеспечивать однократную инициализацию, эта уязвимость также представляет угрозу безопасности. public class Memoizer2 implements Computable { private final Map cache = new ConcurrentHashMap(); private final Computable c; public Memoizer2(Computable c) { this.c = c; } public V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } } Листинг 5.17 Замена класса HashMap классом ConcurrentHashMap Проблема с классом Memoizer2 заключается в том, что если один поток начинает выполнение дорогостоящего вычисления, другие потоки не знают, что вычисление выполняется, и поэтому могут начать то же вычисление, как показано A B L compute f(1) U L compute f(1) U L compute f(1) U C на рисунке 5.3. Мы хотели бы как-то отметить, что “поток X в данный момент вычисляет значение f (27)”, так чтобы если другой поток прибудет для поиска значения f (27), он знал, что самый эффективный способ найти значение, это встать в очередь за потоком X, дождаться завершения выполнения X, а потом спросить “Эй, что же вы получили при вычислении значения f (27)?”. Рисунок 5.3 Два потока вычисляют одно и тоже значение, используя класс Memoizer2 Мы уже ранее встречали класс, который делает почти именно то, что мы хотим получить: FutureTask . Класс FutureTask представляет собой вычислительный процесс, который может находиться в завершенном или незавершённом состоянии. Вызов FutureTask.get немедленно возвращает результат вычисления, если он доступен; в противном случае вызов блокируется до тех пор, пока результат не будет вычислен, а затем возвращает его. Класс Memoizer3 из листинга 5.18 переопределяет экземпляр интерфейса Map, используемый для хранения кэша, как ConcurrentHashMap> вместо ConcurrentHashMap . Класс Memoizer3 сначала проверяет, было ли запущено соответствующее вычисление (в отличие от принятого по умолчанию как “завершённое”, в классе Memoizer2 ). Если это не так, он создает экземпляр класса FutureTask , регистрирует его в экземпляре Map и запускает вычисление; в противном случае он ожидает результата существующего вычисления. Результат может быть доступен немедленно или находиться в процессе вычисления - но он прозрачен для объекта, вызывающего метод Future.get Реализация класса Memoizer3 почти идеальна: она демонстрирует очень хороший параллелизм (в основном, полученный за счёт превосходной реализации параллелизма в классе ConcurrentHashMap ), результат возвращается эффективно, если он уже известен, и если вычисление выполняется другим потоком, вновь прибывающие потоки терпеливо ожидают результата. У него есть только один недостаток - все еще существует небольшое окно уязвимости, в котором два потока могли бы начать вычисление одного и того же значения. public class Memoizer3 implements Computable { private final Map> cache = new ConcurrentHashMap>(); private final Computable c; public Memoizer3(Computable c) { this.c = c; } public V compute(final A arg) throws InterruptedException { Future Callable f(1) not in cache compute f(1) add f(1) to cache f(1) not in cache compute f(1) A B add f(1) to cache public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask ft.run();// call to c.compute happens here } try { return f.get(); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } } Листинг 5.18 Мемоизированная обёртка использующая класс FutureTask Это окно намного меньше, чем в классе Memoizer2 , но поскольку блок if в методе compute по прежнему является неатомарной последовательностью операций проверить-затем-выполнить, два потока могут вызвать метод compute с одинаковым значением приблизительно в одно и то же время, оба могут увидеть, что кэш не содержит желаемого значения, и оба могут начать вычисление. Пример такого неудачного момента показан на рисунке 5.4. Рисунок 5.4 Неудачный момент времени, который может складываться в классе Memoizer3 и быть причиной двойного вычисления некоторого значения Класс Memoizer3 уязвим для этой проблемы, потому что составное действие (положить-если-отсутствует) выполняется над базовым 69 экземпляром Map и не может быть сделано атомарным с помощью блокировки. Класс Memoizer из листинга 5.19 использует атомарный метод putIfAbsent интерфейса ConcurrentMap , закрывая окно уязвимости в классе Memoizer3 Кэширование экземпляра Future вместо значения создает предпосылки к загрязнению кэша (cache pollution): если вычисление отменяется или завершается неудачей, дальнейшие попытки вычислить результат также будут указывать на отмену или сбой. Чтобы избежать этого, класс Memoizer удаляет экземпляр Future из кэша, если он обнаруживает, что вычисление было отменено; также может быть желательным удалять экземпляр Future при обнаружении возникновения исключения RuntimeException , в том случае, если есть шанс, что последующая попытка вычисления будет успешной. 69 Речь идёт об экземпляре класса Map , содержащемся в классе ConcurrentHashMap f(1) not in cache put Future for f(1) in cache compute f(1) f(1) not in cache put Future for f(1) in cache A B compute f(1) set result set result Класс Memoizer также не рассматривает вопросы устаревания значений кэша (cache expiration), но это может быть реализовано с помощью подкласса FutureTask , который связывает время устаревания с каждым результатом и периодически просматривает кэш на наличие устаревших записей. (Подобным образом, не рассматривается вопрос вытеснения значений кэша, при котором старые записи удаляются, чтобы освободить место для новых, чтобы кэш не потреблял слишком много памяти.) public class Memoizer implements Computable { private final ConcurrentMap> cache = new ConcurrentHashMap>(); private final Computable c; public Memoizer(Computable c) { this.c = c; } public V compute(final A arg) throws InterruptedException { while (true) { Future Callable } }; FutureTask } try { return f.get(); } catch (CancellationException e) { cache.remove(arg, f); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } } } Листинг 5.19 Финальная реализация класса Memoizer С завершением работы над нашей параллельной реализацией кэша, мы теперь можем добавить реальное кэширование к сервлету факторизации из главы 2, как и было обещано. Класс Factorizer из листинга 5.20 использует класс Memoizer для эффективного и масштабируемого кэширования ранее вычисленных значений. @ThreadSafe public class Factorizer implements Servlet { private final Computable public BigInteger[] compute(BigInteger arg) { return factor(arg); } }; private final Computable = new Memoizer ServletResponse resp) { try { BigInteger i = extractFromRequest(req); encodeIntoResponse(resp, cache.compute(i)); } catch (InterruptedException e) { encodeError(resp, "factorization interrupted"); } } } Листинг 5.20 Сервлет факторизации, кэширующий результаты с использованием класса Memoizer Итоги части I Мы рассмотрели очень много материала! В следующей “шпаргалке по параллелизму” кратко излагаются основные понятия и правила, представленные в Части 1. • Это изменчивое состояние, тупица. 70 Все проблемы параллелизма сводятся к координации доступа к изменяемому состоянию. Чем менее доступно изменяемое состояние, тем легче обеспечить потокобезопасность. • Сделайте поля final, если они не должны быть изменяемыми. • Неизменяемые объекты автоматически потокобезопасны. Неизменяемые объекты значительно упрощают параллельное программирование. Они проще и безопаснее, и могут свободно использоваться без использования блокировки или защитного копирования. • Инкапсуляция позволяет управлять сложностью. Можно написать потокобезопасную программу со всеми данными, хранящимися в глобальных переменных, но зачем вам это? Инкапсуляция данных внутри объектов упрощает сохранение их инвариантов; инкапсуляция синхронизации внутри объектов упрощает соблюдение политики синхронизации. • Защищайте каждую изменяемую переменную с помощью блокировки. • Защищайте все переменные инварианта одной и той же блокировкой. • Удерживайте блокировки во время выполнения составных действий. • Программа, которая обращается к изменяемой переменной из нескольких потоков без использования синхронизации, является поврежденной программой. • Не полагайтесь на умные рассуждения о том, почему вам не нужно использовать синхронизацию. • Учитывайте потокобезопасность в процессе проектирования или явно задокументируйте, что класс не является потокобезопасным. • Задокументируйте вашу политику синхронизации. 70 During the 1992 U.S. presidential election, electoral strategist James Carville hung a sign in Bill Clinton’s campaign headquarters reading “The economy, stupid”, to keep the campaign on message. Часть II Структурирование параллельных приложений |