При участии Тима Перлса, Джошуа Блоха, Джозева Боубира, Дэвида Холмса и Дага Ли Параллельное программирование в java на
Скачать 4.97 Mb.
|
7.1.3 Ответ на прерывание Как было отмечено ранее в разделе 5.4, когда вы вызываете прерывание блокирующих методов, таких как Thread.sleep или BlockingQueue.put , существуют две практические стратегии для обработки исключения InterruptedException : • Распространить исключение (возможно, после некоторой очистки в рамках конкретной задачи), фактически сделав ваш метод прерываемым блокирующим методом; или • Восстановить статус прерывания, чтобы код, находящийся выше по стеку вызовов, мог с ним работать. Распространение исключения InterruptedException может быть так же просто, как добавление исключения InterruptedException в выражение throws , как показано в методе getNextTask , в листинге 7.6. BlockingQueue } Листинг 7.6 Распространение исключения InterruptedException до вызывающего кода Если Вы не хотите или не можете распространять исключение InterruptedException (возможно потому, что ваша задача определена с помощью интерфейса Runnable ), вам нужно найти другой способ сохранить запрос на прерывание. Стандартным способом сделать это, является восстановление статуса прерывания путем повторного вызова метода interrupt . Чего вы не должны делать, так это проглатывать исключение InterruptedException , путём его перехвата и невыполнения никаких действий в блоке catch , если только ваш код не реализует политику прерывания для потока. Класс PrimeProducer проглатывает прерывание, но делает это со знанием того, что поток собирается завершить свою работу, и что в связи с этим нет никакого кода выше по стеку вызовов, который должен знать о прерывании. Большая часть кода не знает, в каком потоке будет выполняться, поэтому статус прерывания следует сохранять. Только код, реализующий политику прерывания потока, может проглатывать запрос на прерывание. Задачи общего назначения и библиотечный код никогда не должны проглатывать запросы на прерывание. Активности, которые не поддерживают отмену, но по-прежнему вызывают прерываемые блокирующие методы, должны вызывать их в цикле, повторяя попытку при обнаружении прерывания. В этом случае они должны локально сохранить статус прерывания и восстановить его непосредственно перед возвратом, как показано в листинге 7.7, а не сразу после перехвата исключения InterruptedException . Слишком ранняя установка статуса прерывания может привести к бесконечному циклу, поскольку большинство прерываемых блокируемых методов проверяет состояние прерывания на входе и немедленно кидает исключение InterruptedException , если статус прерывания установлен. (Прерываемые методы обычно проводят опрос на предмет установки флага прерывания перед блокировкой или выполнением какой-либо значительной работы, чтобы быть максимально отзывчивыми к прерыванию.) public Task getNextTask(BlockingQueue } catch (InterruptedException e) { interrupted = true; // fall through and retry } } } finally { if (interrupted) Thread.currentThread().interrupt(); } } Листинг 7.7 Неотменяемые задачи, восстанавливающие прерывание до своего завершения Если код не вызывает прерываемые блокирующие методы, он по-прежнему может реагировать на прерывание путем опроса статуса прерывания текущего потока по всему коду задачи. Выбор частоты опроса является компромиссом между эффективностью и отзывчивостью. При наличии высоких требований к быстродействию нельзя вызывать потенциально долговременные методы, которые сами не реагируют на прерывания, что потенциально ограничивает возможности по вызову библиотечного кода. Отмена может включать в себя состояние, отличное от состояния прерывания; прерывание может использоваться для привлечения внимания потока, а информация, хранящаяся в другом месте, может быть использована для предоставления дальнейших инструкций прерванному потоку (при получении доступа к этой информации обязательно используйте синхронизацию). Например, когда рабочий поток, принадлежащий экземпляру ThreadPoolExecutor , обнаруживает прерывание, он проверяет, завершает ли пул свою работу. Если это так, он выполняет некоторую очистку пула перед завершением; в противном случае он может создать новый поток, для восстановления численности потоков в пуле до заданных значений. 7.1.4 Пример: запуск по времени Для решения многих проблем может потребоваться вечность (например, перечисление всех простых чисел); для других ответ может быть найден достаточно быстро, но также может занять вечность. Возможность сказать “потратьте до десяти минут на поиск ответа” или “перечислите все ответы, полученные за десять минут”, может быть крайне полезна в таких ситуациях. Метод aSecondOfPrimes в листинге 7.2 запускает на выполнение экземпляр PrimeGenerator и прерывает его выполнение через секунду. Несмотря на то, что остановка экземпляра PrimeGenerator может занять несколько дольше времени, чем одна секунда, в конечном счёте, прерывание будет замечено и выполнение остановится, позволив потоку завершиться. Но другой аспект выполнения задачи заключается в том, что необходимо выяснить, бросает ли задача исключение. Если экземпляр PrimeGenerator бросит непроверяемое исключение до истечения времени ожидания, это, вероятнее всего, останется незамеченным, так как генератор простых чисел работает в отдельном потоке, в котором обработка исключений в явном виде не прописана. В листинге 7.8 показана попытка выполнения произвольного экземпляра Runnable в течение заданного промежутка времени. Метод запускает задачу в вызывающем потоке и планирует запуск отменяющей задачи, чтобы прервать ее после истечения заданного интервала времени. Это решает проблему непроверяемых исключений, которые могут быть брошены задачей, поскольку они могут быть перехвачены объектами, вызвавшими метод timedRun private static final ScheduledExecutorService cancelExec = ...; public static void timedRun(Runnable r, long timeout, TimeUnit unit) { final Thread taskThread = Thread.currentThread(); cancelExec.schedule(new Runnable() { public void run() { taskThread.interrupt(); } }, timeout, unit); r.run(); } Листинг 7.8 Планирование прерывания в заимствованном потоке. Не делайте так Такой подход умоляюще прост, но он нарушает правила: вы должны знать о политике прерывания потока, прежде чем прервать его. Так как метод timedRun может быть вызван из произвольного потока, он не может знать о политике прерывания вызывающего потока. Если задача завершается до истечения времени ожидания, то отменяющая задача, предназначенная для прерывания работы потока в котором был вызван метод timedRun , может завершиться после возвращения управления от метода timedRun вызывающему объекту. Мы не знаем, какой код будет выполняться, когда это произойдет, но результат хорошим точно не назовёшь. (Устранить риск возникновения такой ситуации возможно, хотя и на удивление сложно, если для отмены отменяющей задачи использовать экземпляр интерфейса ScheduledFuture , возвращаемый методом schedule .) Кроме того, если задача не реагирует на прерывания, метод timedRun не вернет управление до тех пор, пока задача не завершится, что может сильно превысить указанное время ожидания (или даже не завершится вовсе). Служба, ограниченная по времени, но не возвращающая управление по истечении указанного промежутка времени, скорее всего, будет вызывать раздражение у вызвавших её пользователей. В листинге 7.9 рассматриваются проблемы обработки исключений, возникающих в методе aSecondOfPrimes , а также проблемы с предыдущей попыткой. Поток, созданный для выполнения задачи, может иметь собственную политику выполнения, и даже если задача не отвечает на прерывание, метод ограниченный по времени все равно может вернуть управление вызывающему объекту. После запуска потока задачи, метод timedRun вызывает метод join с параметром ограничения по времени у вновь созданного потока. После того, как метод join возвращает управление, проверяется, было ли из задачи брошено исключение и если было, то пробрасывает его в поток, вызвавший метод timedRun Сохраненный экземпляр Throwable совместно используется двумя потоками, и поэтому для его безопасной публикации из потока задачи в поток метода timedRun , объявляется с ключевым словом volatile public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InterruptedException { class RethrowableTask implements Runnable { private volatile Throwable t; public void run() { try { r.run(); } catch (Throwable t) { this.t = t; } } void rethrow() { if (t != null) throw launderThrowable(t); } } RethrowableTask task = new RethrowableTask(); final Thread taskThread = new Thread(task); taskThread.start(); cancelExec.schedule(new Runnable() { public void run() { taskThread.interrupt(); } }, timeout, unit); taskThread.join(unit.toMillis(timeout)); task.rethrow(); } Листинг 7.9 Прерывание задачи в выделенном потоке Эта версия устраняет проблемы, описанные в предыдущих примерах, но так как она основана на использовании метода join с параметром ограничения по времени, она имеет тот же недостаток, что и при использовании простого метода join : мы не знаем, было ли управление возвращено в результате нормального завершения потока или в связи с истечением времени ожидания метода join 82 7.1.5 Выполнение отмены с помощью интерфейса Future Мы уже использовали абстракцию для управления жизненным циклом задачи, обработки исключений и облегчения операции отмены - интерфейс Future . Следуя общему принципу, заключающемуся в том, что лучше использовать существующие библиотечные классы, чем разворачивать свои собственные, давайте построим метод timedRun с использованием интерфейса Future и фреймворка выполнения задач. Вызов ExecutorService.submit возвращает экземпляр Future описывающий задачу. Интерфейс Future имеет метод cancel , принимающий аргумент mayInterruptIfRunning типа boolean , и возвращает значение, указывающее, была ли попытка отмены операции успешной. (Результат выполнения метода расскажет вам только о том, удалось ли выполнить прерывание, а не о том, была ли обнаружена задача, и затронуло ли её прерывание.) Если параметр mayInterruptIfRunning имеет значение true и задача в данный момент выполняется в некотором потоке, то этот поток прерывается. Установка этого параметра в false означает “не выполнять эту задачу, если она еще не запущена” и должна применяться к задачам, которые изначально не были спроектированы для обработки прерывания. Поскольку вы не должны прерывать поток, если не знаете о его политике прерывания, каким образом приемлемо вызвать метод cancel с аргументом true ? Потоки, выполняющие задачи, созданные стандартными реализациями интерфейса Executor , реализуют политику прерывания, которая позволяет задачам быть отмененными с помощью прерывания, таким образом, вполне безопасно установить значение параметру mayInterruptIfRunning при отмене задач через их экземпляры Future , когда они выполняются стандартной реализацией интерфейса Executor . При попытке отменить задачу, вы не должны напрямую прерывать поток, находящийся в пуле, так как вы не знаете, какая задача будет выполняться в момент доставки запроса на прерывание - делайте это только через связанный с задачей экземпляр Future . Это еще одна причина для кодирования задач, рассматривающих прерывание как запрос на отмену: в таком случае они могут быть отменены через их экземпляры Future 82 Это является недостатком API потоков, потому что независимо от того, завершается ли вызов метода join успешно или нет, согласно модели памяти Java для видимости памяти наступают последствия, но метод join всё равно не возвращает статус успешности завершения выполнения. В листинге 7.10 показана версия метода timedRun , которая отправляет задачу экземпляру ExecutorService и получает результат с помощью ограниченной по времени версии Future.get . Если вызов метода get завершается возбуждением исключения TimeoutException , задача отменяется с помощью собственного экземпляра Future . (В целях упрощения кодирования, приведённая версия безусловно вызывает метод Future.cancel в блоке finally , пользуясь тем преимуществом, что отмена завершенной задачи не оказывает никакого влияния.) Если базовое вычисление 83 бросает исключение до момента отмены, оно пробрасывается дальше из метода timedRun , что позволяет вызывающему объекту иметь дело с исключением наиболее удобным способом. В листинге 7.10 также иллюстрируется другая хорошая практика: отмена задач, результат выполнения которых больше не нужен. public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future> task = taskExec.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { // task will be cancelled below } catch (ExecutionException e) { // exception thrown in task; rethrow throw launderThrowable(e.getCause()); } finally { // Harmless if task already completed task.cancel(true);// interrupt if running } } Листинг 7.10 Отмена задачи с использованием Future Когда вызов Future.get бросает исключение InterruptedException или TimeoutException, и вы знаете, что результат программе больше не нужен, отменяйте задачу с помощью вызова метода Future.cancel. 7.1.6 Работа с непрерываемыми блокировками Многие блокирующие библиотечные методы реагируют на прерывание, возвращая управление как можно раньше и бросая исключение InterruptedException , что упрощает построение задач, реагирующих на отмену. Однако не все блокирующие методы или механизмы блокировки реагируют на прерывание; если поток блокируется при выполнении операций синхронного ввода/вывода сокета или ожидает захвата встроенной блокировки, прерывание не окажет никакого эффекта, кроме установки статуса прерывания потока. Иногда мы можем убедить потоки, заблокированные в непрерываемых действиях, чтобы они остановились, с помощью средств, аналогичных прерыванию, но это требует большего понимания того, по какой причине поток был заблокирован. 83 Код, выполняемый в экземпляре Runnable Синхронные операции ввода/вывода сокета в java.io . Распространенной формой блокировки ввода/вывода в серверных приложениях является чтение или запись в сокет. К сожалению, методы read и write в классах InputStream и OutputStream не реагируют на прерывание, но закрытие лежащего в основе сокета приводит к тому, что любые потоки, заблокированные в методах read или write , бросают исключение SocketException Синхронные операции ввода/вывода в java.nio . Прерывание потока ожидающего канал InterruptibleChannel приводит к возбуждению исключения ClosedByInterruptException и закрытию канала (а также вынуждает все прочие потоки, заблокированные на этом канале бросить исключение ClosedByInterruptException ). Закрытие экземпляра InterruptibleChannel AsynchronousCloseException заставляет потоки, заблокированные на канальных операциях, бросать исключение AsynchronousCloseException . Большая часть стандартных экземпляров Channel реализует интерфейс InterruptibleChannel Асинхронные операции ввода/вывода c классом Selector . Если поток заблокирован в вызове метода Selector.select (в языке java.nio.channels ), вызов методов close или wakeup приводит к преждевременному возврату. Захват блокировки. Если поток блокируется в ожидании встроенной блокировки, вы ничего не можете сделать для того, чтобы вскоре остановить его, за исключением того, что вы могли бы привлечь его внимание каким-то другим способом, когда он, в конечном итоге, захватит блокировку и достигнет достаточного прогресса в выполнении задачи. Однако явно определённые классы Lock предлагают метод lockInterruptibly , который позволяет ожидать захвата блокировки и, при этом, оставаться отзывчивым к прерываниям - см. главу 13 В классе ReaderThread из листинга 7.11, демонстрируется метод инкапсуляции нестандартной отмены. Класс ReaderThread управляет одиночным соединением сокета, осуществляя синхронное чтение из сокета и передавая все полученные данные методу processBuffer . Для упрощения разрыва пользовательского соединения или завершения работы сервера, класс ReaderThread переопределяет метод interrupt , используя его как для доставки стандартного прерывания, так и для закрытия нижележащего сокета; таким образом, прерывание экземпляра ReaderThread приводит к остановке того, что он делает, независимо от того, заблокирован ли он в вызове метода read или в прерываемом блокирующем методе public class ReaderThread extends Thread { private final Socket socket; private final InputStream in; public ReaderThread(Socket socket) throws IOException { this.socket = socket; this.in = socket.getInputStream(); } public void interrupt() { try { socket.close(); } catch (IOException ignored) { } finally { super.interrupt(); } } public void run() { try { byte[] buf = new byte[BUFSZ]; while (true) { int count = in.read(buf); if (count < 0) break; else if (count > 0) processBuffer(buf, count); } } catch (IOException e) { / * Allow thread to exit * / } } } Листинг 7.11 Инкапсулирование нестандартной отмены в потоке путем переопределения метода interrupt 7.1.7 Инкапсуляция нестандартной отмены с помощью метода newTaskFor Подход, используемый классом ReaderThread для инкапсуляции нестандартной отмены, может быть усовершенствован с помощью метода-ловушки newTaskFor , добавленного классу ThreadPoolExecutor в Java 6. При отправке экземпляра Callable службе ExecutorService , метод submit возвращает экземпляр Future , который можно использовать для отмены задачи. Метод-ловушка newTaskFor является фабричным методом, который создает экземпляр Future , представляющий задачу. Он возвращает экземпляр RunnableFuture - интерфейс, расширяющий интерфейсы Future и Runnable (и реализуемый классом FutureTask ). Настройка экземпляра Future , связанного с задачей, позволяет переопределить метод Future.cancel . Пользовательский код отмены может выполнять ведение журнала или собирать статистику по отменам, а также может использоваться для отмены действий, которые не реагируют на прерывание. Класс ReaderThread инкапсулирует отмену работы с сокетом – используя потоки переопределяет метод interrupt ; то же самое может быть сделано для задач, путём переопределения метода Future.cancel В листинге 7.12, определяется интерфейс CancellableTask , расширяющий интерфейс Callable и добавляются методы cancel и фабричный метод newTask для построения экземпляра RunnableFuture . Класс CancellingExecutor расширяет класс ThreadPoolExecutor и переопределяет метод newTaskFor , чтобы позволить интерфейсу CancellableTask создать свой собственный экземпляр Future public interface CancellableTask RunnableFuture } @ThreadSafe public class CancellingExecutor extends ThreadPoolExecutor { protected } } public abstract class SocketUsingTask @GuardedBy("this") private Socket socket; protected synchronized void setSocket(Socket s) { socket = s; } public synchronized void cancel() { try { if (socket != null) socket.close(); } catch (IOException ignored) { } } public RunnableFuture SocketUsingTask.this.cancel(); } finally { return super.cancel(mayInterruptIfRunning); } } }; } } |