Главная страница

Java. Полное руководство. 8-е издание. С. Н. Тригуб Перевод с английского и редакция


Скачать 25.04 Mb.
НазваниеС. Н. Тригуб Перевод с английского и редакция
АнкорJava. Полное руководство. 8-е издание.pdf
Дата28.02.2017
Размер25.04 Mb.
Формат файлаpdf
Имя файлаJava. Полное руководство. 8-е издание.pdf
ТипДокументы
#3236
страница81 из 90
1   ...   77   78   79   80   81   82   83   84   ...   90
void l o c k ()
void l o c k l n t e r r u p t i b l y ()
throws InterruptedException
Condition n e w C o n d i t i o n ()
boolean tryLock()
boolean tryLock(long ожидать
tu)
throws
InterruptedException
void u n l o c k ()

8 9 8 Часть II. Библиотека Пакет j a v a . u t i l . c o n c u r r e n t . l o c k s содержит реализацию интерфейсов
Lock и R e e n tra n tL o c k , которые реализуют реентерабельную блокировку, представляющую собой блокировку, в которую поток, удерживающий на данный момент эту блокировку, может входить повторно. (Естественно, если поток входит в блокировку повторно, все вызовы метода l q c k () должны быть смещены на равное количество вызовов метода u n lo c k ().) В противном случае поток, пытающийся запросить блокировку, перейдет в режим ожидания до тех пор, пока она не будет освобождена.
В следующей программе демонстрируется пример использования блокировок. В ней создается два потока, которые обращаются к общему ресурсу, переменной
S h a r e d , c o u n t. Прежде чем поток сможет обратиться к переменной S h a r e d , c o u n t, он должен получить блокировку. После получения блокировки значение переменной S h a r e d , c o u n t увеличивается, после чего, не снимая блокировки, поток входит в режим простоя. Вследствие этого другой поток будет пытаться получить блокировку. Однако поскольку блокировка все еще удерживается первым потоком, другой поток будет ожидать до тех пор, пока первый поток не выйдет из режима простоя и не снимет блокировку. Результаты выполнения показывают, что доступ к переменной S h a r e d , c o u n t синхронизируется с помощью блокировки Простой пример блокировки java.util.concurrent.locks.*;

class LockDemo {
public static void main(String a r g s []) {
ReentrantLock lock = new ReentrantLock();
new LockThread(lock, "A");
new LockThread(lock, "B");
}
}
// Общий ресурс Shared {
static int count = 0;
}
// Поток выполнения, увеличивающий значение счета
class LockThread implements Runnable {
String name;
ReentrantLock lock;
LockThread(ReentrantLock lk, String n) {
lock = lk;
name = n ;
new Thread(this).start();
}
public void r u n () Запуск " + name);
try {
// Сначала блокируется счетчик + " ожидает блокирования счетчика
lock.l o c k ();
System.out.println(name + " блокирует счетчик
+
+;
System.out.printlji (name + ": " + Shared.count);
// Теперь, если это возможно, разрешается контекстное
Глава 27. Параллельные утилиты
8 9 9
// переключение + " простаивает catch (InterruptedException exc) {
System.out.println(exc);
} finally {
// Снятие блокировки + " разблокирует счетчик
lock.u n l o c k (Ниже показаны результаты выполнения. (У вас порядок выполнения потоков может быть другим.)
Запуск А
А ожидает блокирования счетчика.
А блокирует счетчик.
А: А простаивает.
Запуск В
В ожидает блокирования счетчика.
А разблокирует счетчик.
В блокирует счетчик.
В: В простаивает.
В разблокирует счетчик.
Пакет j ava.
util.
concurrent.
locks также реализует интерфейс
Read-
Wri teLock, определяющий реентерабельную блокировку, которая поддерживает отдельные блокировки для доступа на чтение и запись. Это позволит предоставлять читателям ресурса несколько блокировок до его записи. Класс rant
ReadWri teLock предлагает реализацию интерфейса
ReadWri Атомарные операции

Пакет j ava.
ut il.
concurrent.
atomic предлагает альтернативный вариант другим функциональным возможностям синхронизации при чтении или записи значения некоторых типов переменных. В этом пакете доступны методы, которые получают, задают или сравнивают значение переменной вовремя одной непрерывной (те. атомарной) операции. А это значит, что теперь не будет нужна ни блокировка, ни любой другой механизм синхронизации.
Атомарные операции выполняются с помощью классов
Atomic
Integer nAtomicLomg, а также методов get
( )
,
set
( )
,
compareAndSet
( )
,
decrementAnd-
Get
() и getAndSet
( ), которые реализуют действия, соответствующие их именам.
Ниже показан пример, показывающий, как можно синхронизировать доступ к общему ресурсу с помощью класса
Atomic
Integer.
// Простой пример атомарных операций j ava.util.concurrent.atomic.*;
class AtomicDemo {
public static void main(String a r g s []) {
new AtomThread("A " );
new AtomThread("B");
new С

9 0 Часть II. Библиотека Java
}
class Shared {
static Atomiclnteger ai = new Atomiclnteger(0);
}
// Поток выполнения, при котором увеличивается значение счета
class AtomThread implements Runnable {
String name;
AtomThread(String n) {
name = n ;
new T h read(this).start();
}
public void run() Запуск " + name);
for(int i=l; i <= 3; i
+
+)
System.out.println(name + " получено " +
Shared.a i .getAndSet(i В этой программе при помощи класса
S h a r e d создается статический объект a i класса
A t o m i c l n t e g e r . Затем создаются три потока класса
A to m T h read . В методе r u n (
) объект
S h a r e d , a i изменяется вызовом метода g e tA n d S e t ( )
. Этот метод возвращает предыдущее значение и устанавливает то значение, которое было передано в качестве параметра. Благодаря классу
A t o m i c l n t e g e r исключается вероятность того, что два потока будут одновременно осуществлять запись в объект a i В общем случае атомарные операции предлагают удобную (а, возможно, ибо лее эффективную) альтернативу другим механизмам синхронизации при работе с одной переменной.
Параллельное программирование при помощи инфраструктуры Fork/Join Fram ew В последние годы появилась новая важная тенденция в разработке программного обеспечения — параллельное программирование (parallel program ming). Параллельное программирование — это общее название технологий, которые используют в своих интересах многоядерные процессоры, содержащие два или больше ядер. Как известно большинству читателей, ныне многоядерные компьютеры — вполне обычное явление. Преимуществом многопроцессорных систем является возможность значительного увеличения производительности программы. В результате возросла потребность в механизме, который предоставит программистам Java простой, но эффективный способ использования нескольких процессоров ясными масштабируемым способом. В ответ на эту потребность в комплект
JDK 7 было добавлено несколько новых классов и интерфейсов поддержки параллельного программирования. Обычно они упоминаются как инфраструктура
F o rk /Jo in Framework. Это одно из наиболее важных добавлений, внесенных комплектом JDK 7 в библиотеку классов Java. Инфраструктура F o rk /Jo in Framework определена в пакете j a v a . u t i l . c o n c u r r e n t Инфраструктура F ork/Join Framework улучшает многопоточное программирование двумя важными способами. Во-первых, она упрощает создание и использование нескольких потоков. Во-вторых, это автоматизирует использование несколь­
}
Глава 27. Параллельные утилиты
9 0 1
ких процессоров. Другими словами, при использовании инфраструктуры F ork/Join
Framework вы позволяете вашим приложениям автоматически масштабировать количество доступных для использования процессоров. Эти две возможности позволяют считать инфраструктуру Fork/Join Framework рекомендованным подходом многопоточного программирования, когда желательна параллельная обработка.
Прежде чем продолжить, следует указать на различие между традиционным мно­
гопоточным и параллельным программированием. В прошлом большинство компьютеров имело один процессор, и многопоточность применялась, прежде всего, для использования периодов ожидания, таких как при ожидании программой пользовательского ввода. При этом подходе один поток может выполняться, в то время как другой ожидает. Другими словами, в системе с одним процессором многопоточ­
ность используется для того, чтобы позволить двум или более задачам совместно использовать процессор. Этот тип многопоточности, как правило, поддерживается объектом класса
T h r e a d как описано в главе 11). Хотя этот тип многопоточности будет оставаться весьма полезным всегда, он не был оптимизирован для ситуаций, когда у вас есть два или больше процессоров (многоядерный компьютер).
Когда доступно несколько процессоров, необходим другой тип многопоточно­
сти, который обеспечивает истинное параллельное выполнение. С двумя или больше процессорами можно выполнять части программы одновременно — каждая часть выполняется на собственном процессоре. Это применяется для значительного ускорения выполнения операций некоторых типов, таких как сортировка, преобразование или поиск в большом массиве. Во многих случаях такие операции могут быть разделены на меньшие части (каждая из них воздействует на часть массива, и каждая часть может быть запущена на собственном процессоре. Как можно догадаться, выигрыш в эффективности может быть огромным. Просто представьте в будущем каждый программист будет применять параллельное программирование, поскольку это открывает путь к существенному повышению производительности программы.
Основные классы инфраструктуры
Fork/Join Fram ew Инфраструктура F o rk /Jo in Framework расположена в пакете j a v a . u t i 1 . c o n ­
c u r r e n t . Ее ядро составляют следующие классы n T a s k Абстрактный класс, определяющий задачу
ForkJoinPool
Управляет выполнением объекта класса Производный от класса
ForkJoinTask< класс для задач,
которые не возвращают значений Производный от класса
ForkJoinTask< класс для задач,
возвращающих значения
Рассмотрим отношения между ними. Класс
F o r k J o i n P o o l управляет выполнением объекта класса
F o r k J o i n T a s k . Класс
F o r k J o i n T a s k
— это абстрактный класс, который расширяется двумя другими абстрактными классами
R e c u r s i v e A c t i o n и
R e c u r s iv e T a s k . Как правило, ваш код расширяет эти классы, чтобы создать задачу. Прежде чем перейти к подробному рассмотрению процесса, сделаем краткий обзор ключевых аспектов каждого класса.
Класс F o r k J o in T a s k < v Класс
F o rk J o in T a s k < V > является абстрактными определяет задачу, которой может управлять объект класса
F o r k J o i n P o o l . Параметр типа У опреде­

9 0 Часть II. Библиотека Java
ляет тип результата задачи. Класс
ForkJoinTask отличается от класса
Thread тем, что представляет облегченную абстракцию задачи, а не поток выполнения. Класс
ForkJoinTask выполняется потоками, управляемыми пулом потока класса
ForkJoinPool. Этот механизм позволяет управлять большим количеством задач небольшим количеством фактических потоков. Таким образом, класс
ForkJoinTask весьма эффективен, по сравнению с потоками.
В классе
ForkJoinTask определено много методов. Основные) и join ()
, представлены ниже ForkJoinTask<1/> fork()

final
V
j o i n (Метод fork () передает вызывающую задачу для асинхронного выполнения. Это значит, что поток, который вызывает метод fork ()
, продолжает выполняться. После того как задача запланирована для выполнения, метод fork () возвращает this. Это может быть сделано только внутри вычислительной части другого объекта класса
ForkJoinTask, который выполняется в пределах объекта класса
ForkJoinPool. Вскоре вы узнаете, как это делается) Метод j oin
() ожидает завершения задачи, для которой он вызван. Возвращается результат задачи. Таким образом, с помощью методов fork() ивы можете запустить одну или несколько новых задача затем ждать их завершения.
Другой важный метод класса
ForkJoinTask
— это метод invoke ()
. Он объединяет операции ветвления и объединения в единый вызов, поскольку запускает задачу, а затем ждет ее завершения. Это продемонстрировано здесь V Возвращается результат вызывающей задачи.
При помощи метода invokeAll
() вы можете вызвать несколько задач за один раз. Две его формы представлены здесь void invokeAll(ForkJoinTask

задачаА,
ForkJoinTask
задачаВ)
static void invokeAll(ForkJoinTask ...
списокЗадач)
В первом случае выполняются задачи задача Аи задача В
, во втором случае — все определенные задачи. В обоих случаях вызывающий поток ожидает завершения всех определенных задач. Метод invokeAll
() может быть вызван только внутри вычислительной части другого объекта класса
ForkJoinTask, который выполняется в пределах объекта класса Класс Rec Этот класс происходит от класса
ForkJoinTask и инкапсулирует задачу, которая не возвращает результат. Как правило, ваш код расширяет класс
Recurs iveAct ion, чтобы создать задачу, типом возвращаемого значения которого является void. В классе
Re curs iveAct ion определено четыре метода, но только один обычно представляет интерес — абстрактный метод по имени compute (). Когда вы расширите класс
Recurs iveAct ion, чтобы создать реальный класс, помещаете код, который определяет задачу, в метод compute
(). Метод compute
() представляет вычислительную (com putational) часть задачи.
Класс
Recurs iveAct ion определяет метод compute
() так abstract void Обратите внимание на то, что метод compute
() защищенный. Это означает, что он может быть вызван только другими методами данного класса или класса, производного от него. Кроме того, поскольку метод абстрактный, его следует реализовать в производном классе (если этот производный класс тоже не абстрактный
Глава 27. Параллельные утилиты 0 Как правило, класс
Recurs
iveAct
ion используется для реализации рекурсивной стратегии для задач, которые не возвращают результатов. (См. раздел Стратегия разделяй и властвуй далее в этой главе.)
Класс Еще одним производным от класса
ForkJoinTask является класс
RecursiveTask<
V>. Данный класс инкапсулирует задачу, которая возвращает результат. Тип результата определяет параметр У. Как правило, ваш код расширит класс
RecursiveTask< У, чтобы создать задачу, которая возвращает значение. Как ив классе
Recurs
iveAct
ion, здесь определено четыре метода, но обычно используется только абстрактный метод
compute ()
, который представляет вычислительную часть задачи. Когда вы расширяете класс
RecursiveTask для создания реального класса, в метод
compute
() помещают код, который представляет задачу. Этот код также должен возвратить результат задачи.
Класс
RecursiveTaskc
V> определяет метод
compute () так abstract
V
compute О
Обратите внимание на то, что метод
compute
() защищенный. Это означает, что он может быть вызван только другими методами данного класса или класса, производного от него. Кроме того, поскольку метод абстрактный, его следует реализовать в производном классе. Будучи реализованным, он должен возвращать результат задачи.
Как правило, класс
RecursiveTask используется при реализации рекурсивной стратегии для задач, которые возвращают результат. (См. раздел Стратегия разделяй и властвуй далее в этой главе.)
Класс Выполнение объекта класса
ForkJoinTask происходит в пределах объекта класса
ForkJoinPool, который управляет также выполнением задач. Поэтому, чтобы запустить объект класса
ForkJoinTask, сначала необходим объект класса
ForkJoinPool.
В классе
ForkJoinPool определено несколько конструкторов. Вот два наиболее популярных уровень Пара л л Первый создает стандартный пул, обеспечивающий уровень параллелизма, равный количеству процессоров, доступных в системе. Второй позволяет задать уровень параллелизма. Его значение должно быть больше нуля и не больше предела для реализации. Уровень параллелизма определяет количество потоков, которые могут выполняться одновременно. В результате уровень параллелизма фактически определяет количество задач, которые могут выполняться одновременно. (Конечно, количество одновременно выполняемых задач не может превышать количество процессоров) Следует однако уяснить, что уровень параллелизма не ограничивает количество задач, которыми может управлять пул. Объект класса
ForkJoinPool может управлять существенно большим количеством задач, чем его уровень параллелизма. Кроме того, уровень параллелизма — это только цель, а не гарантия.
После того как вы создали экземпляр класса
ForkJoinPool, можете запустить задачу многими способами. Задачу, запущенную первой, обычно называют основной. Эта задача нередко запускает подчиненные задачи, которыми также управляет пул. Наиболее распространенный способ запуска основной задачи подразумевает вызов метода
invoke
() класса
ForkJoinPool, как показано далее.
<Т> Т invoke(ForkJoinTask задача Часть II. Библиотека Этот метод запускает задачу, определенную параметром задача, и возвращает результат ее выполнения. Это значит, что вызывающий код ожидает завершения метода invoke (Чтобы запустить задачу и не ждать ее завершения, вы можете использовать метод execute ()
. Вот одна из его форм execute(ForkJoinTask

задача)
В данном случае задача запускается, но вызывающий код не ждет ее завершения. Вместо этого вызывающий код продолжает выполнение асинхронно.
Класс
ForkJoinPool управляет выполнением своих потоков, используя подход под названием захват задачи (work-stealing). Каждый рабочий поток поддерживает очередь задач. Если очередь одного рабочего потока окажется пуста, то она возьмет задачу от другого рабочего потока. Это способствует повышению общей производительности и помогает поддерживать баланс нагрузки. (Из-за запросов процессорного времени другими процессами в системе, даже два рабочих потока с одинаковыми задачами в их очередях не могут завершиться одновременно.)
Еще один момент класс
ForkJoinPool использует потоки-демоны (daem on thread). Поток-демон автоматически заканчивается, когда заканчиваются все пользовательские потоки. Таким образом, нет никакой необходимости явно завершить работу объекта класса
ForkJoinPool. Однако это можно сделать при вызове метода shutdown (Стратегия разделяй и властвуй"

Как правило, инфраструктура F o rk /Jo in Framework применяет стратегию разделяй и властвуй , лежащую в основе рекурсии. Вот почему два класса, производных от класса
ForkJoinTask, называются
Recurs iveAct ion и
Recurs iveTask. Ожидается, что вы расширите один из этих классов при создании собственной задачи ветвления/объединения.
Стратегия разделяй и властвуй, лежащая в основе рекурсии, подразумевает разделение задачи на подзадачи, пока их размер не станет достаточно маленьким для последовательной обработки. Например, задача, которая применяет преобразование к каждому из N элементов в массиве целых чисел, может быть разделена на две подзадачи, каждая из которых преобразует половину элементов в массиве. Таким образом, одна подзадача преобразует элементы от 0 до N / 2 , а другая — элементы от N /2 до N. В свою очередь, каждая подзадача может быть сведена к набору подзадач, каждая из которых преобразует половину остальных элементов. Этот процесс деления массива продолжится до тех пор, пока не будет достигнуто пороговое значение, при котором последовательное решение оказывается быстрее, чем создание следующего разделения.
Преимущество стратегии разделяй и властвуй заключается в том, что обработка может осуществляться параллельно. Поэтому, вместо того чтобы циклически перебирать весь массив, используя один поток, части массива могут быть обработаны одновременно. Конечно, подход разделяй и властвуй работает во многих ситуациях и без массива (или коллекции, но наиболее распространенная область применения подразумевает некоторый тип массива, коллекции или группы данных.
Одним из ключевых моментов наилучшего использования стратегии разделяй и властвуй является правильное определение порогового значения, после которого используется последовательная обработка (а не дальнейшее деление. Как правило, оптимальное пороговое значение получается при профилировании характеристик выполнения. Однако даже при использовании порогового значения меньше оптимального, все равно произойдет весьма существенное ускоре­
Глава 27. Параллельные утилиты
9 0 5
ние. Однако лучше избегать чрезмерно больших или маленьких пороговых значений. На момент написания этой книги документация по API Java для метода
ForkJoinTask утверждает, что эмпирически выведено правило, согласно которому задача должна выполняться где-то за 100-10 ООО этапов вычисления.
Важно также понимать, что на оптимальное пороговое значение влияет также период времени, занимаемый вычислением. Если каждый этап вычисления достаточно продолжителен, то предпочтительнее меньшие пороговые значения, и, наоборот, если каждый вычислительный этап очень короткий, то большие пороговые значения могут обеспечить лучшие результаты. Для приложений, которые должны быть запущены на известной системе с известным количеством процессоров, вы можете использовать информацию о количестве процессоров для обоснования решения о пороговом значении. Но для приложений, которые будут выполняться на множестве систем, возможности которых неизвестны заранее, вы не можете сделать предположение о среде выполнения.
Еще один момент хотя в системе может быть доступно несколько процессоров, другие задачи (и сама операционная система) будут конкурировать с вашим приложением за процессорное время. Таким образом, не стоит полагать, что у вашей программы будет неограниченный доступ ко всем процессорам. Кроме того, разные процессы той же программы могут показать разные характеристики времени выполнения из-за различий в нагрузке.
Первый простой пример ветвления/объединения
Теперь рассмотрим простой пример, демонстрирующий инфраструктуру F o rk /
Join Framework и стратегию разделяй и властвуй в действии. Вот программа, которая преобразует элементы массива типа double в их квадратные корни. Для этого используется производный класс
Recurs iveAct ion.
// Простой пример фундаментальной стратегии "разделяй и властвуй В данном случае используется класс RecursiveAction.
import java.util.concurrent.*;
import java.util.*;
// Класс ForkJoinTask (через RecursiveAction) преобразует элементы
// массива double в их квадратные корни
class SqrtTransform extends RecursiveAction {
// В этом примере пороговое значение произвольно устанавливается
// равным 1 ООО. В реальном коде его оптимальное значение может
// быть определено при профилировании и экспериментально
final int seqThreshold = 1000;
// Обрабатываемый массив
doublet] data;
// Определяет часть обрабатываемых данных
int start, end;
SqrtTransform(double[] vals, int s, int e )
{
data = vals;
start = s;
end = e;
}
// Этот метод осуществляет параллельное вычисление
protected void compute() {
// Если количество элементов ниже порогового значения, то

9 0 Часть II. Библиотека Java
// выполнять обработку последовательно
if((end - start) < seqThreshold) {
// Преобразовать каждый элемент в его квадратный корень
for(int i = start; i < end; i++) {
data[i] = M a t h .sqrt(data[i ]);
}
}
else {
// В противном случае продолжить разделение данных на
// меньшие части Найти середину middle = (start + end) / 2;
// Запустить новые задачи, используя разделенные на
// части данные SqrtTransform(data,
start, middle),
new SqrtTransform(data,
middle, end));
}
}
}
// Демонстрация параллельного выполнения
class ForkJoinDemo {
public static void main(String a r g s []) {
// Создать пул задач fjp = new ForkJoinPool();
doublet] nums = new d o uble[100000];
// Присвоить некие значения
for(int i = 0; i < nums.length; i++)
nums[i] = (double) i;
System.out.println("A portion of the original sequence:");
for(int i=0; l < 10; i++)
System.out.print(nums[l] + " ");
System.out.println("\n");
SqrtTransform task = new SqrtTransform(nums,
0, n u m s .length);
// Запустить главный ForkJoinTask.
fjp.invoke(task);
System.out.printIn("A portion of the transformed sequence" +
" (to four decimal places):");
for(int i=0; i < 10; i++)
System.out.format("%.4f ", Вот вывод программы portion of the original sequence:
0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0
A portion of the transformed sequence (to four decimal places):
0.0000 1.0000 1.4142 1.7321 2.0000 2.2361 2.4495 2.6458 2.8284 3.0000
Глава 27. Параллельные утилиты
9 0 Как можно заметить, значения элементов массива были преобразованы в их квадратные корни.
Давайте внимательнее рассмотрим работу этой программы. В первую очередь обратите внимание на то, что класс
SqrtTransf orm расширяет класс
RecursiveAction. Как упоминалось, класс
RecursiveAction расширяет класс
ForkJoinTask для задач, которые не возвращают результатов. Затем обратите внимание на финальную переменную seqThreshold. Ее значение определяет, когда будет иметь место последовательная обработка. Это значение устанавливается (несколько произвольно) равным 1 ООО. Далее обратите внимание на то, что ссылка на обрабатываемый массив сохраняется в переменной data и что поля start и end используются для указания границ элементов для обращения.
Основное действие программы осуществляется в методе compute ()
. Все начинается с проверки количества подлежащих обработке элементов на предмет соответствия нижнему пороговому значению для последовательной обработки. Если это так, то эти элементы обрабатываются (в данном примере вычисляется их квадратный корень. Если пороговое значение для последовательной обработки не достигнуто, то вызов метода invokeAll
() запускает две новые задачи. В данном случае каждая подзадача обрабатывает половину элементов. Как объяснялось ранее, метод invokeAll
() ожидает завершения обеих задач. После завершения всех рекурсивных вызовов каждый элемент в массиве будет изменен, причем большая частью действий осуществляется параллельно (если доступно несколько процессоров).
Влияние уровня параллелизма
Прежде чем завершить тему, важно рассмотреть влияние уровня параллелизма на эффективность выполнения задачи ветвления/объединения, а также взаимосвязь между параллелизмом и пороговым значением. Программа, приведенная в этом разделе, позволяет вам экспериментировать с различными уровнями параллелизма и пороговыми значениями. С учетом использования многоядерного компьютера, вы сможете в интерактивном режиме наблюдать результат изменения этих значений.
В предыдущем примере, поскольку там использовался стандартный конструктор класса
ForkJoinPool, по умолчанию был задан уровень параллелизма, равный количеству процессоров в системе. Однако вы можете задать уровень параллелизма по своему желанию. Один из продемонстрированных ранее способов подразумевает его определение при создании объекта класса
ForkJoinPool с использованием следующего конструктора

уровеньПаралл)
Здесь параметру ровен ь Пара л л определяет уровень параллелизма. Его значение должно быть больше нуля, но меньше предела, определенного реализацией.
Следующая программа создает задачу ветвления/объединения, которая преобразует массив типа double. Преобразование произвольно, но оно задумано так, чтобы использовалось несколько циклов процессора. Это сделано для большей наглядности отображения эффекта от изменения порогового значения или уровня параллелизма. При использовании программы укажите пороговое значение и уровень параллелизма в командной строке. Затем программа запускает задачи. Она отображает также период времени, занятый выполнением задачи. Для этого используется метод
System. nanoTime
(), возвращающий значение высокоточного таймера JVM.
// Пример программы, позволяющей экспериментировать с эффектом от
// изменения порогового значения и параллелизма класса ForkJoinTask.
import java.util.concurrent.*;

9 0 Часть II. Библиотека Java
// Класс ForkJoinTask (через RecursiveAction) преобразует элементы массива double.
class Transform extends RecursiveAction {
// Порог последовательного выполнения, устанавливаемый конструктором
int seqThreshold;
// Обрабатываемый массив
doublet] data;
// Определяет часть обрабатываемых данных
int start, end;
Transform(double[] vals, int s, int e, int t )
{
data = vals;
start = s;
end = e;
seqThreshold = t;
}
// Этот метод осуществляет параллельное вычисление
protected void compute() {
// Если количество элементов ниже порогового значения, то
// выполнять обработку последовательно
if((end - start) < seqThreshold) {
// Следующий код присваивает элементу почетному индексу
// квадратный корень его первоначального значения. Элементу
/ / по нечетному индексу присваивается кубический корень его
// первоначального значения Этот код предназначен только для использования
// процессорного времени, чтобы эффект от параллельного
// выполнения был нагляднее i = start; i < end; i
+
+) {
i f ((data[i ]
% 2) == 0)
data[i] = M a t h .sqrt(data[i ]);
else
data[i] = Math.cbrt(data[i]);
}
}
else {
I I В противном случае продолжить разделение данных на
// меньшие части Найти середину middle = (start + end) / 2;
// Запустить новые задачи, используя разделенные на
// части данные Transform(data,
start, middle, seqThreshold),
new Transform(data,
middle, end, seqThreshold));
}
}
}
// Демонстрация параллельного выполнения
class FJExperiment {
public static void main(String a r g s []) {
int pLevel;
int threshold;
Глава 27. Параллельные утилиты
9 0 9
if(args.length != 2) {
System.out.println("Usage:
FJExperiment parallelism
threshold ");
return;
}
pLevel = Integer.parselnt(args[0]);
threshold = Integer.parselnt(args[1]);
// Эти переменные используются для замера времени задачи
long beginT, endT;
// Создать пул задач Обратите внимание на установку уровня параллелизма fjp = new ForkJoinPool(pLevel);
doublet] nums = new double[1000000];
for(int i = 0; i < nums.length; i
+
+)
nums[i] = (double) i;
Transform task = new Transform(nums,
0, nums.length, threshold);
// Старт хронометража = System.nanoTime();
// Запустить главный ForkJoinTask.
fjp.invoke(task);
// Конец хронометража = System.nanoTime();
System.out.println("Level of parallelism: " + pLevel);
System.out.printIn("Sequential threshold: " + threshold);
System.out.println("Elapsed time: " + (endT - beginT) + " Чтобы использовать программу, укажите уровень параллелизма, а затем пороговое значение. Вы можете экспериментально опробовать различные значения для каждого параметра и понаблюдать за результатами. Помните, вы должны запускать код на компьютере, по крайней мере, с двумя процессорами. Кроме того, выполнение на двух разных компьютерах почти наверняка приведет к разным результатам, из-за наличия в системе других процессов, использующих процессорное время.
Чтобы получить общее представление о значении, которое имеет параллелизм, проделайте такой эксперимент. Сначала запустите программу так.
java FJExperiment 1 Это запросит уровень параллелизма 1 (совершенно последовательное выполнение) при пороговом значении 1000. Вот пример выполнения, полученный на двухъядерном компьютере of parallelism: 1
Sequential threshold: 1000
Elapsed time: 259677487 Теперь укажите уровень параллелизма 2 так.
java FJExperiment 2 1000

9 1 0 Часть II. Библиотека Вот пример вывода, полученный на том же двухъядерном компьютере of parallelism: 2
Sequential threshold: 1000
Elapsed time: 169254472 Вполне очевидно, что применение параллелизма существенно уменьшает время выполнения, увеличивая таким образом скорость программы. Поэкспериментируйте с изменением порогового значения и параллелизма на собственном компьютере. Результаты могут удивить вас.
Есть еще два метода, которые вы могли бы найти полезными при экспериментировании с характеристиками выполнения программы ветвления/объединения.
Во-первых, вы можете получить уровень параллелизма при вызове метода get-
Parallelism ()
, определенного в классе
ForkJoinPool. Вот его форма,
int Он возвращает текущий уровень параллелизма. Напомню, что по умолчанию он будет равен количеству доступных процессоров. Во-вторых, вы можете получить количество процессоров, доступных в системе, при вызове метода avail
- ableProcessors ()
, который определяется классом
Runtime. Вот его форма В связи с наличием других системных запросов, возвращаемое значение может изменяться от вызова к вызову.
Пример применения класса
R e c u r s i v e T a s k < v Два приведенных выше примера основаны на классе
Recurs iveAct ion, и это значит, что они одновременно выполняют задачи, которые не возвращают результатов. Чтобы создать задачу, которая возвращает результат, используйте класс
RecursiveTask. Сами решения разработаны также, как ив предыдущем примере. Основное отличие в том, что метод compute
() возвращает результат. Таким образом, следует объединить результаты так, чтобы при завершении первого запроса он возвращал общий результат. Другое отличие в том, что обычно выбудете запускать подзадачу при вызове метода fork
() или j oin
() явно (а не неявно, при вызове метода invokeAll ()
, например).
Следующая программа демонстрирует применение метода
Recurs iveTask. Она создает задачу по имени
Sum, которая возвращает сумму значений в массиве типа double. В этом примере массив состоит из 5 ООО элементов. Однако каждое второе значение отрицательное. Таким образом, первыми значениями массива будут 0 ,-1 , 2 ,-3 , 4 и т.д.
// Простой пример использования Recursi«veTask .
<
import java.util.concurrent.*;
// Класс RecursiveTask,
используемый для вычисления суммы массива
// типа double.
class Sum extends RecursiveTask {
// Пороговое значение последовательного выполнения
final int seqThresHold = 500;
// Обрабатываемый массив
doublet] data;
// Определяет часть обрабатываемых данных
int start, end;
Глава 27. Параллельные утилиты vals, int s, int e )
{
data = vals;
start = s;
end = e ;
}
// Поиск суммы массива типа double,
protected Double compute() {
double sum = 0;
// Если количество элементов ниже порогового значения, то
// выполнять обработку последовательно
if((end - start) < seqThresHold) {
// Сумма элементов i = start; i < end; i++) sum += data[i];
}
else {
// В противном случае продолжить разделение данных на
// меньшие части Найти середину middle = (start + end) / 2;
// Запустить новые задачи, используя разделенные на
// части данные subTaskA = new Sum(data, start, middle);
Sum subTaskB = new Sum(data, middle, end);
// Запустить каждую подзадачу при разветвлении
subTaskA.fork();
subTaskB.fork();
// Подождать завершения подзадачи агрегировать результаты
sum = subTaskA.join() + subTaskB.j o i n ();
}
// Вернуть конечную сумму
return sum;
}
}
// Демонстрация параллельного выполнения
class RecurTaskDemo {
public static void main (String a r g s U ) {
// Создать пул задач fjp = new ForkJoinPool();
double[] nums = new double[5000];
// Инициализировать nums значениями, знаки которых чередуются
for(int i=0; i < nums.length; i++)
nums[i] = (double) (
(
(i%2)
== 0) ? i :
-i) ;
Sum task = new Sum(nums, 0, n u m s .length);
// Запуск ForkJoinTasks.
Обратите внимание, что в данном
// случае метод invoke() возвращает результат
double summation = fjp.invoke(task);
System.out.println("Summation " + summation);
}

9 1 Часть II. Библиотека Вот вывод этой программы -В данной программе есть несколько интересных моментов. В первую очередь обратите внимание на то, что эти две подзадачи выполняются при вызове метода f o r k ( ), как показано далее. f o r k (); subTaskB. f o r k () В данном случае метод f o r k () используется потому, что он запускает задачу, ноне ждет ее завершения. (Таким образом, он запускает задачу асинхронно) Результат каждой задачи получается при вызове метода
j o i n ()
, как показано далее = subTaskA.join() + subTaskB.j o i n (Эти операторы ожидают завершения каждой задачи. Затем все результаты суммируются и присваиваются переменной sum. Таким образом, суммирование всех подзадач добавляется к вычислению суммы. И наконец, метод compute
() заканчивает работу, возвращая значение переменной sum, которое будет окончательным общим количеством для первого запроса.
Существуют и другие подходы асинхронного выполнения подзадач. Например, следующая последовательность использует метод f or к
() для запуска задачи sub­
TaskA и метод invoke (
) — для запуска и ожидания задачи subTaskB.
subTaskA.for k () ;
sum = subTaskA.join() + Еще одним вариантом может быть непосредственный вызов метода compute
() задачи subTaskB, как показано далее = subTaskA.join() + Асинхронное выполнение задач

Для инициализации задачи приведенные выше программы вызвали метод in­
voke
() класса
ForkJoinPool. Этот подход является общепринятым, когда вызывающий поток должен ждать завершения задачи (что зачастую имеет место, поскольку метод invoke (
) не завершается, пока задача не закончится. Однако вы можете запустить задачу асинхронно. При этом подходе вызывающий поток продолжает выполняться. Таким образом, и вызывающий потоки задача выполняются одновременно. Чтобы запустить задачу асинхронно, используйте метод ex­
ecute ()
, который также определяет класс
ForkJoinPool. Две его формы представлены ниже execute(ForkJoinTask задача execute(Runnable

задача)
В обеих формах выполняемую задачу определяет параметр задача Обратите внимание на то, что вторая форма позволяет определить задачу типа
Runnable, а не
ForkJoinTask. Таким образом, это создает мост между традиционным подходом Java к многопоточности и новой инфраструктурой F o rk /Jo in Framework. Важно помнить, что потоки, используемые классом
ForkJoinPool, являются демоном. Таким образом, они завершаются по завершении основного потока. В результате вам, возможно, придется поддерживать основной поток, пока задачи не закончатся
Глава 27. Параллельные утилиты
9 1 Отмена задачи
Задача может быть отменена при вызове метода c a n c e l ( ), который определен в классе F o r k J o in T a s k . Его общая форма такова o o le a n c a n c e l ( b o o l e a n успеш нП рерыв)
Метод возвращает значение t r u e , если задача, для которой он был вызван, успешно отменена, и значение f a l s e — если задача уже отменена, закончена или не может быть отменена. В настоящее время параметр успешнПрерыв не используется стандартной реализацией. Обычно метод c a n c e l () предназначается для вызова из кода вне задачи, поскольку задача может легко отменить себя при выходе.
При вызове метода i s C a n c e l l e d () вы можете определить, была ли задача отменена, как показано далее boolean Метод возвращает значение t r u e , если вызывающая задача была отменена до завершения, и значение f a l s e — в противном случае.
Определение состояния завершения задачи
Кроме только что описанного метода i s C a n c e l l e d (), класс F o r k J o in T a s k включает два других метода, которые вы можете использовать для определения состояния завершения задачи. Первый из них, метод is C o m p le te d N o r m a lly (), представлен ниже boolean Метод возвращает значение t r u e , если вызывающая задача закончилась нормально, те. не передала исключения, и не была отменена вызовом метода c a n ­
c e l ( ), а в противном случае — значение f a l s e Второй метод, is C o m p le te d A b n o rm a lly ( ), представлен здесь boolean Метод возвращает значение t r u e , если вызывающая задача закончила работу вследствие ее отмены или передачи исключения, а в противном случае — значение f a l s e Перезапуск задачи
Обычно вы не можете перезапустить задачу Другими словами, как только задача заканчивает выполнение, она не может быть перезапущена. Однако вы можете повторно инициализировать состояние задачи (после ее завершения, таким образом, она может быть запущена снова. Для этого используется вызов метода r e i n i t i a l i z e (), как показано далее Этот метод сбрасывает состояние вызывающей задачи. Однако любая модификация, внесенная в любые постоянные данные, которые используются задачей, не будет отменена. Например, если задача изменяет массив, то эта модификация не будет отменена при вызове метода r e i n i t i a l i z e d Что исследовать
Приведенное выше обсуждение затронуло основные принципы инфраструктуры
Fork/Join Framework и наиболее часто используемые методы. Однако инфраструк­

9 Н
Часть II. Библиотека тура Fork/Join Framework — это богатая среда выполнения, включающая передовые возможности, обеспечивающие дополнительный контроль над параллельностью. Хотя изучение всех проблем и нюансов параллельного программирования и инфраструктуры Fork/Join Framework выходит за рамки этой книги, некоторые из средств, предоставляемых классами
ForkJoinTask и
ForkJoinPool, здесь упоминаются.
Некоторы е иные средства класса Как уже упоминалось, такие методы, как invokeAll (
) и fork ()
, могут быть вызваны только внутри класса
ForkJoinTask. Обычно соблюсти это правило просто, нов некоторых случаях у вас может быть код, способный выполняться как внутри, таки вне задачи. Вы можете определить, выполняется ли ваш код внутри задачи, вызвав метод inForkJoinPool
( Вы можете преобразовать объект интерфейса или
Callable в объект класса
ForkJoinTask при помощи метода adapt ()
, определенного в классе
ForkJoinTask. У этого метода есть три формы для преобразования объекта интерфейса для объекта интерфейса, который не возвращает результат и для объекта интерфейса, который действительно возвращает результат. В случае интерфейса выполняется метода в случае интерфейса метод run (При вызове метода getQueuedTaskCount
() вы можете получить приблизительное количество задач, находящихся в очереди вызывающего потока. При вызове метода getSurplusQueuedTaskCount
() можно получить приблизительное количество задач, имеющихся у вызывающего потока в очереди, которое превышает количество других потоков в пуле, который мог бы захватить их. Помните, захват задачи в инфраструктуре F o rk /Jo in Framework — это единственный путь получения высокой эффективности. Хотя этот процесс является автоматическим, в некоторых случаях информация о нем может оказаться полезной при оптимизации производительности.
Класс
ForkJoinTask определяет два метода, которые начинаются с префикса quietly. Они представлены ниже void quietlyJoin

() Присоединяет задачу, ноне возвращает результат и не передает исключений
t
f
inal void quiet lyInvoke
() Вызывает задачу, ноне возвращает результат и не передает исключений____________________________
В основном, эти методы подобны своим упрощенным аналогам, кроме того, что они не возвращают значений и не передают исключений.
Вы можете попытаться отменить вызов задачи (другими словами, исключить ее из расписания) при вызове метода tryUnf or к (Класс
ForkJoinTask реализует интерфейс
Serializable. Таким образом, он разрешает сериализацию. Однако сериализация не используется вовремя выпол­
нения.
Некоторы е иные средства класса Одним из методов, весьма полезных при настройке приложений ветвления и объединения, является переопределенный метод toString
() класса
ForkJoinPool. Он отображает дружественный отчет о состоянии пула. Чтобы увидеть его в действии, используйте приведенную ниже последовательность для запуска и последующего ожидания задачи в классе
FJExperiment в представленной ранее программе экспериментов с задачами
Глава 27. Параллельные утилиты 1 5
// Запустить главный ForkJoinTask асинхронно
fjp.execute(task);
// Отобразить состояние пула при ожидании
w h i l e (!task.isDone()) Запустив программу вы увидите на экране серию сообщений, описывающих состояние пула. Вот один из примеров. Конечно, ваш вывод может быть другим, из- за различия в количестве процессоров, пороговых значений, нагрузки задачи т.д.
java.u t i l .concurrent.ForkJoinPool@141d683[Running,
parallelism = 2, size
= 2, active = 0, running = 2, steals = 0, tasks = 0, submissions = Вы можете определить, не бездействует ли пул в настоящее время при вызове метода isQuiescent(). Он возвращает значение true, если у пула нет никаких активных потоков, и значение false
— в противном случае.
Вы можете получить количество рабочих потоков, находящихся в пуле в настоящее время, при вызове метода get
Pool
Size ()
. При вызове метода getAc- tiveThreadCount
() можно получить приблизительное количество активных потоков в пуле.
Чтобы завершить работу пула, вызовите метод sh u td o w n (). Текущие задачи все еще будут выполняться, но никаких новых задач запущено быть не может. Чтобы остановить пул немедленно, вызовите метод shutdownNow (). В данном случае делается попытка отменить текущие задачи. При вызове метода isS h u td o w n () вы можете определить, закрывается ли пул. Он возвращает значение t r u e , если пул был закрыт, и значение f a l s e — в противном случае. Чтобы определить, был ли пул закрыт и всели задачи закончены, вызовите метод i s T e r m i n a t e d (Некоторые советы относительно
ветвления/объединения
Вот несколько советов, способных помочь вам избежать некоторых из наиболее неприятных проблем, связанных с использованием инфраструктуры F o rk /
Jo in Framework. Во-первых, избегайте использования слишком низкого порогового значения последовательного выполнения. Обычно ошибки допущение на высоком уровне лучше, чем ошибки допущение на низком. Если пороговое значение слишком низко, на создание и переключение задач может уйти времени больше, чем на их обработку. Во-вторых, обычно лучше использовать уровень параллелизма, заданный по умолчанию. Если вы определяете меньшее значение, это может значительно уменьшить преимущество использования инфраструктуры F o rk /Jo in Обычно класс
ForkJoinTask не должен использовать синхронизированные методы или блоки кода. Кроме того, вы обычно не будете использовать метод compute
() с другими типами синхронизации, такими как семафор. (Однако новый класс
Phaser может быть использован, поскольку он совместим с механизмом ветвления/объединения.) Помните, что основная идея класса
ForkJoinTask состоит в стратегии разделяй и властвуй. Такой подход обычно не применяется в ситуациях, где необходима внешняя синхронизация. Кроме того, избегайте ситуаций, в которых ввод-вывод способен вызвать существенную блокировку. Поэтому класс
ForkJoinTask обычно не будет обслуживать ввод-вывод. Проще говоря, чтобы лучше использовать инфраструктуру F o rk /Jo in Framework, задача должна выполнять вычисление, которое может осуществляться без внешней блокировки или синхронизации

9 1 Часть II. Библиотека Последний момент за исключением необычных обстоятельств, не делайте никаких предположений о среде выполнения, в которой будет работать ваш код. Это значит, что вы не должны подразумевать, что будет доступно некое количество процессоров или что на характеристики выполнения вашей программы не будут влиять другие процессы, выполняющиеся в тоже время.
Параллельные утилиты в сравнении с традиционным подходом в Если принять во внимание ту мощность и гибкость, которую предлагают новые параллельные утилиты, то возникает следующий резонный вопрос могут ли они служить заменой традиционному для Java подходу к реализации многопоточности и синхронизации Однозначно нет Первоначальная поддержка многопоточно­
сти и встроенные функциональные возможности синхронизации по-прежнему следует реализовать во множестве программ Java, аплетах и сервлетах. Например, методы s y n c h r o n i z e d , w a i t () и n o t i f y () предлагают элегантные решения широкого круга задач. Однако если вам понадобится расширить возможности управления, то для этих целей можно воспользоваться параллельными утилитами' Кроме того, новая инфраструктура F o rk /Jo in Framework предлагает мощнейший способ интеграции технологий параллельного программирования в ваши более сложные приложения

1   ...   77   78   79   80   81   82   83   84   ...   90


написать администратору сайта