Главная страница
Навигация по странице:

  • 14.1 Управление зависимостью от состояния

  • Листинг 14.2

  • 14.1.2 Пример: грубая блокировка путем опроса и сна

  • !isFull()

  • 14.1.3 Очереди условий на освобождение

  • При участии Тима Перлса, Джошуа Блоха, Джозева Боубира, Дэвида Холмса и Дага Ли Параллельное программирование в java на


    Скачать 4.97 Mb.
    НазваниеПри участии Тима Перлса, Джошуа Блоха, Джозева Боубира, Дэвида Холмса и Дага Ли Параллельное программирование в java на
    Анкорjava concurrency
    Дата28.11.2019
    Размер4.97 Mb.
    Формат файлаpdf
    Имя файлаJava concurrency in practice.pdf
    ТипДокументы
    #97401
    страница27 из 34
    1   ...   23   24   25   26   27   28   29   30   ...   34
    Глава 14 Разработка собственных
    синхронизаторов
    Библиотеки классов включают в себя несколько зависящих от состояния классов
    (state-dependent classes), – то есть, имеющих операции, зависящие от предусловий
    определяемых состоянием (state-based preconditions) - таких как
    FutureTask
    ,
    Semaphore и
    BlockingQueue
    . Например, нельзя удалить элемент из пустой очереди или получить результат еще не завершенной задачи; перед выполнением этих операций необходимо дождаться, когда очередь перейдет в состояние “не пусто” или задача перейдет в состояние “завершено”.
    Самый простой способ создать класс, зависящий от состояния, это построить его поверх существующего библиотечного класса, зависящего от состояния; мы поступили аналогичным образом с классом
    ValueLatch из раздела
    8.5.1
    , используя класс
    CountDownLatch для обеспечения требуемого поведения блокировки. Но если библиотечные классы не предоставляют необходимых функциональных возможностей, можно также создать собственные синхронизаторы, используя низкоуровневые механизмы, предоставляемые языком и библиотеками, включая внутренние очереди условий (condition queues), явные объекты
    Condition и фреймворк
    AbstractQueuedSynchronizer
    . В этой главе рассматриваются различные варианты реализации зависимости от состояния, а также правила использования механизмов зависимости от состояния, предусмотренных платформой.
    14.1 Управление зависимостью от состояния
    В однопоточной программе, если предусловие на основе состояния (например, "пул соединений непустой") при вызове метода не выполняется, оно никогда не станет истинным. Поэтому классы в последовательных программах могут быть закодированы на сбой, если их предварительные условия не выполняются. Но в параллельной программе условия, основанные на состоянии, могут изменяться под влиянием других потоков: пул, который был пуст несколько инструкций назад, может стать непустым, потому что другой поток возвратил элемент. Методы, зависящие от состояния параллельных объектов, иногда могут выйти из строя, когда их предусловия не выполняются, но часто существует лучшая альтернатива: дождаться момента, когда предусловие станет истинным.
    Операции, зависящие от состояния и блокирующиеся до тех пор, пока операция не сможет продолжить выполнение, более удобны и менее подвержены ошибкам, в отличие от тех, что просто падают с ошибкой. Встроенный механизм очереди условий позволяет потокам блокироваться до тех пор, пока объект не войдёт в состояние, которое позволяет продолжить прогресс выполнения и пробуждать заблокированные потоки, когда они будут в состоянии в дальнейшем продолжить прогресс выполнения. Мы детально рассмотрим очереди условий в разделе
    14.2
    , но с целью мотивации в оценке эффективности механизма ожидания условий, мы сначала покажем, как зависимость от состояния может быть (с большими сложностями) решена с помощью опроса и сна.
    Блокирующееся зависящее от состояния действие, принимает форму, приведённую в листинге 14.1. Схема блокировки несколько необычна тем, что блокировка освобождается и вновь захватывается посреди выполнения операции.

    Переменные состояния, составляющие предварительное условие, должны быть защищены блокировкой объекта, чтобы оставаться константными во время проверки предусловия. Но если предусловие не выполняется, блокировка должна быть снята, чтобы другой поток мог изменить состояние объекта, иначе предусловие никогда не станет истинным. Блокировка должна быть захвачена перед тем, как вновь проверять выполнение предусловия. acquire lock on object state while (precondition does not hold) { release lock wait until precondition might hold optionally fail if interrupted or timeout expires reacquire lock
    } perform action release lock
    Листинг 14.1 Структура блокирующихся зависимых от состояния действий
    Ограниченные буферы, подобные классу
    ArrayBlockingQueue
    , как правило, используются в дизайнах производитель-потребитель. Ограниченный буфер предоставляет операции поместить и взять, каждая из которых обладает предусловиями: нельзя взять элемент из пустого буфера или поместить элемент в полный буфер. Операции, зависящие от состояния, могут сталкиваться со сбоями предусловий, бросая при этом исключение или возвращая статус ошибки
    (перекладывая решение проблемы на вызывающего), или блокируясь до тех пор, пока объект не перейдет в нужное состояние.
    Мы собираемся разработать несколько реализаций ограниченного буфера, которые используют различные подходы к обработке сбоев предусловий. Каждая из реализаций расширяет класс
    BaseBoundedBuffer
    , представленный в листинге
    14.2, реализующий классический кольцевой буфер на основе массива, в котором переменные состояния буфера (
    buf
    , head
    , tail
    , и count
    ) защищены внутренней блокировкой буфера. Класс предоставляет синхронизированные методы doPut и doTake
    , которые используются подклассами для реализации операций put и take
    ; базовое состояние скрыто от подклассов.
    @ThreadSafe public abstract class BaseBoundedBuffer {
    @GuardedBy("this") private final V[] buf;
    @GuardedBy("this") private int tail;
    @GuardedBy("this") private int head;
    @GuardedBy("this") private int count; protected BaseBoundedBuffer(int capacity) { this.buf = (V[]) new Object[capacity];
    } protected synchronized final void doPut(V v) { buf[tail] = v; if (++tail == buf.length) tail = 0;
    ++count;

    } protected synchronized final V doTake() {
    V v = buf[head]; buf[head] = null; if (++head == buf.length) head = 0;
    --count; return v;
    } public synchronized final boolean isFull() { return count == buf.length;
    } public synchronized final boolean isEmpty() { return count == 0;
    }
    }
    Листинг 14.2 Базовый класс для реализаций ограниченного буфера
    14.1.1 Пример: распространение сбоев предусловий на
    вызывающий код
    Класс
    GrumpyBoundedBuffer приведённый в листинге 14.3, представляет собой первую, и довольно грубую, попытку реализации ограниченного буфера. Методы put и take объявлены как synchronized
    , для обеспечения монопольного доступа к состоянию буфера, так как оба, при осуществлении доступа к буферу, используют логику проверить-затем-выполнить.
    @ThreadSafe public class GrumpyBoundedBuffer extends BaseBoundedBuffer { public GrumpyBoundedBuffer(int size) { super(size); } public synchronized void put(V v) throws BufferFullException {
    if (isFull())
    throw new BufferFullException();
    doPut(v);
    } public synchronized V take() throws BufferEmptyException {
    if (isEmpty())
    throw new BufferEmptyException();
    return doTake();
    }
    }
    Листинг 14.3 Ограниченный буфер, прерывающий свою работу, когда предварительные условия не выполняются

    Хотя этот подход достаточно прост в реализации, он вызывает раздражение.
    Исключения должны возникать при исключительных условиях [EJ пункт 39].
    Условие “Буфер заполнен” является исключительным условием для ограниченного буфера не больше, чем “красный” является исключительным условием в качестве сигнала трафику движения. Упрощение реализации буфера (принуждение вызывающего объекта к управлению зависимостью от состояния) с лихвой компенсируется существенным усложнением его использования, так как теперь вызывающий объект должен быть готов перехватывать исключения и, возможно, повторно выполнять каждую операцию буфера.
    146
    Хорошо структурированный вызов метода take показан в листинге 14.4 - не очень красиво, особенно если методы put и take вызываются по всей программе. while (true) { try {
    V item = buffer.take();
    // use item
    break;
    } catch (BufferEmptyException e) {
    Thread.sleep(SLEEP_GRANULARITY);
    }
    }
    Листинг 14.4 Клиентская логика, используемая для обращения к классу
    GrumpyBoundedBuffer
    Один из вариантов такого подхода заключается в том, чтобы возвращать значение ошибки, когда буфер находится в несогласованном состоянии. Это незначительное улучшение, по своей сути, заключается в том, что не происходит злоупотребления механизмом исключений, за счёт бросания исключения, которое в действительности означает “извините, попробуйте еще раз”, но это не решает фундаментальную проблему: вызывающие объекты должны самостоятельно разбираться со сбоями в предусловиях.
    147
    Клиентский код, приведённый в листинге 14.4, не является единственным способом реализации логики повтора. Вызывающий объект мог бы немедленно повторить вызов метода take
    , без засыпания - подход, известный как напряжённое
    ожидание (busy waiting) или ожидание с проверкой (spin waiting). Такое ожидание может отнимать довольно много процессорного времени, если состояние буфера не изменяется в течение некоторого времени. С другой стороны, если вызывающий объект решает уснуть, чтобы не потреблять так много времени CPU, он может легко “проспать”, если состояние буфера изменится вскоре после вызова метода sleep
    . Так что клиентскому коду остается выбор между плохим использованием
    CPU из-за прокручивания вызовов и плохой отзывчивостью из-за засыпания. (В каждой итерации, где-то между напряжённым ожиданием и сном, будет располагаться вызов метода
    Thread.yield
    , представляющий собой подсказку планировщику о том, что в текущий момент было бы разумно позволить работать
    146
    Передача зависимого состояния обратно вызывающему объекту, также приводит к невозможности выполнения таких действий, как сохранение порядка FIFO; вынуждая вызывающий объект повторить попытку, вы теряете информацию о том, кто обратился первым.
    147
    Интерфейс
    Queue предлагает оба варианта - метод poll возвращает значение null
    , если очередь пуста, а метод remove бросает исключение – но интерфейс
    Queue не предназначен для использования в дизайне производитель-потребитель. Интерфейс
    BlockingQueue
    , операции которого блокируются до тех пор, пока очередь, для продолжения работы, не перейдёт в согласованное состояние, является лучшим выбором, когда производители и потребители будут выполняться параллельно.
    другому потоку. Если вы ожидаете выполнения другого потока, чтобы сделать что- то, что-то могло бы произойти быстрее в том случае, если бы вы уступали процессор, а не потребляли весь выделенный при планировании квант времени.)
    14.1.2 Пример: грубая блокировка путем опроса и сна
    Класс
    SleepyBoundedBuffer представленный в листинге 14.5, пытается избавить вызывающие объекты от неудобств, вызванных реализацией логики повтора при каждом вызове, путём инкапсуляции повторяющихся вызовов грубого механизма
    “опросить и уснуть” внутрь операций put и take
    . Если буфер пуст, метод take засыпает до момента, пока другой поток не поместит некоторые данные в буфер; если буфер полон, метод put засыпает до момента, пока другой поток не освободит место, удалив некоторые данные. Этот подход инкапсулирует управление предусловиями и упрощает использование буфера – определенно, шаг в верном направлении.
    @ThreadSafe public class SleepyBoundedBuffer extends BaseBoundedBuffer { public SleepyBoundedBuffer(int size) { super(size); } public void put(V v) throws InterruptedException { while (true) { synchronized (this) { if (!isFull()) { doPut(v); return;
    }
    }
    Thread.sleep(SLEEP_GRANULARITY);
    }
    } public V take() throws InterruptedException { while (true) { synchronized (this) { if (!isEmpty()) return doTake();
    }
    Thread.sleep(SLEEP_GRANULARITY);
    }
    }
    }
    Листинг 14.5 Ограниченный буфер, использующий грубую блокировку
    Реализация класса
    SleepyBoundedBuffer сложнее, чем предыдущая попытка.
    148
    Код буфера должен проверять соответствующее условие состояния (state condition) удерживая блокировку буфера, поскольку переменные, представляющие состояние, защищены блокировкой буфера. Если проверка завершается неудачей, выполняющийся поток засыпает на некоторое время, перед этим освободив
    148
    Мы избавим вас от деталей, касающихся других пяти реализаций ограниченного буфера, особенно
    SneezyBoundedBuffer.
    блокировку, чтобы другие потоки могли получить доступ к буферу.
    149
    Когда поток проснется (wakes up), он повторно захватит блокировку и попытается снова, чередуя сон и проверку условия состояния до тех пор, пока выполнение операции не будет продолжено.
    С точки зрения вызывающего объекта, это работает отлично - если операция может быть выполнена немедленно, так и будет сделано, а иначе она будет заблокирована - и абоненту не нужно иметь дело с механикой сбоя и повтором.
    Выбор детализации сна - это компромисс между отзывчивостью и использованием
    CPU; чем меньше степень детализации сна, тем больше отзывчивость, но и тем больше потребление ресурсов процессора. На рис. 14.1 показано, каким образом детализация сна может влиять на скорость отклика: может быть задержка между моментом времени, когда пространство буфера становится доступным и моментом времени, когда поток просыпается и снова выполняет проверку.
    Рисунок 14.1 Поток проспал, потому что условие приняло значение true после того, как он захотел уснуть
    Класс
    SleepyBoundedBuffer также порождает другое требование для вызывающего объекта - работа с исключением
    InterruptedException
    . Когда метод блокируется на ожидании выполнения условия, вежливое действие заключается в предоставлении механизма отмены (см. главу
    7
    ). Подобно большинству библиотечных блокирующих методов с хорошим поведением, класс
    SleepyBoundedBuffer поддерживает отмену через прерывание, возвращая управление раньше и бросая исключение
    InterruptedException
    , если выполнение было прервано.
    Эти попытки синтезировать блокирующую операцию из опроса и сна были довольно болезненными. Было бы неплохо иметь способ приостановки потока, но гарантировать, что он быстро пробудится, как только определенное условие
    (например, буфер больше не полон) становится истинным. Это именно то, чем занимаются очереди условий.
    14.1.3 Очереди условий на освобождение
    Очереди условий похожи на звонок “тост готов” в вашем тостере. Если вы слушаете звонок, вы будете немедленно уведомлены, когда ваш тост будет готов и сможете бросить то, чем занимались (или нет, может быть, вы хотите в первую очередь закончить чтение газеты) и получить тост. Если вы не слушаете звонок
    (возможно, вы вышли на улицу, чтобы получить газету), вы можете пропустить уведомление, но по возвращении на кухню вы можете наблюдать за состоянием тостера и либо получить тост, если он готов, либо вновь начать слушать звонок, если это не так.
    149
    Как правило, для потока является плохой идеей осуществление перехода в спящий режим или блокирование иным образом совместно с удерживаемой блокировкой, но в данном случае всё еще хуже, потому что желаемое условие (буфер полон/пуст) никогда не сможет стать истинным, если блокировка не будет освобождена!
    L
    set
    condition U
    L
    condition
    not true U
    sleep
    L
    U
    condition
    true
    A
    B

    Очередь условий (condition queue) получила свое имя, поскольку она предоставила группе потоков - называемой набором ожидания (wait set) - способ ожидания выполнения определенного условия. В отличие от обычных очередей, в которых элементы являются единицами данных, элементами очереди условий являются потоки, ожидающие выполнения условия.
    Так же, как каждый объект Java может выступать в качестве блокировки, каждый объект может также выступать в качестве очереди условий, и методы wait
    , notify
    , и notifyAll класса
    Object составляют API внутренних очередей условий. Внутренняя блокировка объекта и его внутренняя очередь условий связаны: чтобы вызвать любой из методов очереди условий объекта X, необходимо удерживать блокировку на объекте X. Это связано с тем, что механизм ожидания условий на основе состояний тесно связан с механизмом сохранения согласованности состояний: не получится дождаться условия, пока не удастся проверить состояние, и нельзя освободить другой поток из состояния ожидания по условию, пока не удастся изменить состояние.
    Метод
    Object.wait атомарно освобождает блокировку и просит ОС приостановить выполнение текущего потока, позволяя другим потокам захватить блокировку и, следовательно, изменить состояние объекта. После пробуждения он повторно захватывает блокировку перед возвращением. Интуитивно, вызов wait означает “Я хочу пойти спать, но разбудите меня, когда произойдёт что-то интересное”, а вызов методов уведомления означает “что-то интересное произошло”.
    Представленный в листинге 14.6 класс
    BoundedBuffer
    , реализует ограниченный буфер с помощью методов wait и notifyAll
    . Этот вариант проще, чем в случае со “спящей” версией, и является более эффективным (просыпаться приходится реже, если состояние буфера не изменяется) и более отзывчивым
    (когда происходит интересующее изменение состояния, просыпаться получается быстро). Это значительное улучшение, но обратите внимание, что введение очередей условий не привело к внесению изменений в семантику, по сравнению со
    “спящей” версией. Это просто оптимизация в нескольких измерениях: эффективности использования ЦП, издержек переключения контекста и отзывчивости. Очереди условий не позволяют делать то, чего вы не можете сделать с помощью опроса и сна
    150
    , но они позволяют сделать всё намного проще и эффективнее выражать и управлять зависимостью от состояния.
    @ThreadSafe public class BoundedBuffer extends BaseBoundedBuffer {
    // CONDITION PREDICATE: not-full (!isFull())
    // CONDITION PREDICATE: not-empty (!isEmpty())
    public BoundedBuffer(int size) { super(size); }
    // BLOCKS-UNTIL: not-full
    public synchronized void put(V v) throws InterruptedException {
    while (isFull())
    wait();
    doPut(v);
    150
    Это не совсем верно; справедливая очередь условий может гарантировать относительный порядок, в котором потоки освобождаются из набора ожидания. Внутренние очереди условий, подобно внутренним блокировкам, не предлагают справедливого помещения в очередь; явные реализации интерфейса
    Condition предлагают выбор из справедливого и несправедливого помещения в очередь.

    1   ...   23   24   25   26   27   28   29   30   ...   34


    написать администратору сайта