Санкт-Петербургская группа тестирования JVM


« Новый блог | Main | Sun Java Real-Time... »
20070706 пятница Июль 06, 2007

java.util.concurrent

Когда я впервые услышал о пакете java.util.concurrent (JSR 166), который предоставляет набор классов для организации межпоточного взаимодействия, я подумал, что это всего лишь усовершенствованный набор контейнерных классов, построенный на основе встроенных в язык Java механизмов межпоточного взаимодействия (synchronized методов и блоков), однако более близкое знакомство с предоставляемым этим пакетом API удивило: ряд возможностей не может быть реализован с использованием стандартных механизмов.

Например, класс ReentrantLock, который аналогичен по функциональности стандартному механизму блокировок, помимо метода захвата блокировки lock(), вызов которого аналогичен входу в synchronized блок, также предоставляет метод lockInterruptibly(), который может быть прерван с использованием Thread.interrupt. То есть имеется возможность прервать поток, пытающийся захватить ReentrantLock, что невозможно сделать при захвате обычной блокировки Java. Также ReentrantLock предоставляет такие возможности, отсутствующие у стандартных блокировок, как захват блокировки только при условии, что она не захвачена другим потоком (метод tryLock()), и попытка захвата с тайм-аутом (метод tryLock(long timeout, TimeUnit unit)).

Кроме этого, главной особенностью встроенных блокировок Java (блокировок, основанных на использовании ключевого слова synchronized) является то, что захват и освобождение происходят в рамках синтаксического блока, то есть блокировка автоматически освобождается при выходе из synchronized метода или synchronized блока. Такая реализация предотвращает потенциальные ошибки кодирования, но налагает ограничения на алгоритмы синхронизации, которые можно реализовать с их помощью, захват же и освобождение ReentrantLock происходит при помощи методов lock() и unlock(), то есть захват и освобождение не ограничены синтаксическим блоком.

Очевидно, что реализация этой функциональности невозможна без поддержки виртуальной машины и операционной системы. Большинство классов пакета java.util.concurrent, обеспечивающих межпоточную синхронизацию, созданы с использованием фреймворка, построенного на основе класса java.util.concurrent.locks.AbstractQueuedSynchronizer. Этот фреймфорк подробно описан в статье "The java.util.concurrent Synchronizer Framework", здесь я приведу краткий обзор особенностей этого фреймворка.

Прежде чем приступить к описанию особенностей реализации пакета java.util.concurrent, необходимо немного отвлечься и рассказать о методе организации межпоточного взаимодействия, основанном на неблокирующих алгоритмах, именно благодаря неблокирующим алгоритмам классы пакета java.util.concurrent, используемые для синхронизации, предоставляют расширенную по сравнению со стандартными блокировками функциональность, а также обладают лучшей производительностью и масштабируемостью.

Традиционный подход для синхронизации потоков, работающих с общими данными, заключается в использовании так называемых критических секций - участков кода, которые не могут исполняться несколькими потоками одновременно. В случае Java этот подход реализован на основе synchronized методов и блоков: поток, вошедший в synchronized метод, может в рамках этого метода спокойно работать с данными, разделяемыми с другими потоками, так как ни один другой поток не сможет в это же время войти в этот метод, а говоря точнее, другие потоки, попытавшиеся сделать это, будут заблокированы до тех пор, пока первый поток не покинет synchronized метод.

У традиционного подхода, использующего критические секции и блокирование потоков, есть несколько недостатков, и, возможно, главным из них является производительность. Если блокировка не была захвачена другим потоком, то операция захвата этой блокировки в современных виртуальных машинах выполняется очень быстро. Однако если блокировка захвачена другим потоком, то в этом случае должен быть задействован сравнительно дорогой механизм блокирования и последующего разблокирования потока. Также очень часто критические секции выполняют очень небольшой объём действий (например, просто увеличивают значение счётчика), при этом время, затрачиваемое на блокирование/разблокирование потока, оказывается гораздо больше по сравнению со временем работы критической секции, и в таких случаях было бы хорошо иметь более легковесный способ синхронизации. Помимо этого, при использовании синхронизации, основанной на блокировании, часто возникают такие проблемы, как взаимоблокировки (deadlocks) и инверсия приоритетов (priority inversion) (инверсия приоритетов возникает в том случае, если высокоприоритетный поток приостанавливается из-за того, что он не может получить блокировку, захваченную низкоприоритетным потоком, в этом случае высокоприоритетный поток не может продолжить выполнение до тех пор, пока блокировка не освобождена, то есть на время ожидания его приоритет как бы понижается до приоритета потока, удерживающего блокировку).

К счастью, существует другой способ организации межпоточного взаимодействия. Синхронизацию можно реализовать и без блокирования потоков, на основе атомарных операций сравнения и изменения значения переменной. Практически все современные процессоры предоставляют команды для обеспечения межпоточной работы, так например, в процессорах SPARC реализована команда compare-and-swap (CAS) (сравнение и замена). Команда CAS принимает три аргумента: адрес ячейки памяти (V), ожидаемое старое значение (A) и новое значение (B). Процессор записывает значение B в ячейку V, но только в том случае, если ячейка памяти содержит ожидаемое значение A, в противном случае запись произведена не будет. В обоих случаях команда возвращает значение, находящееся в ячейке V на момент вызова команды. По результату, возвращаемому командой CAS, поток может судить, успешно или нет прошла модификация переменной. Если CAS закончилась неудачно (то есть запись не была произведена), значит, какой-то другой поток успел изменить значение этой переменной после того, как текущий поток считал её значение.

Основное отличие использования команды CAS для синхронизации от использования критических секций и блокирования потоков заключается в следующем: если несколько потоков одновременно пытаются изменить значение общей переменной, используя команду CAS, то операция завершается успешно только для одного потока, однако остальные потоки, проигравшие в этом соревновании за изменение, не приостанавливаются (как это произошло бы в результате неудачной операции захвата блокировки), вместо этого поток просто получает информацию о том, что операция изменения переменной прошла неудачно, и так как поток не блокирован, то он может самостоятельно решить, какое действие ему теперь предпринять: возможно, попытаться изменить переменную ещё раз, может быть, выполнить какие-либо другие действия или же вообще ничего не делать (в алгоритмах на основе CAS выбор последнего варианта не редок, так как неудача операции CAS может значить, что какой-то другой поток уже выполнил работу, которую вы собирались сделать).

Поскольку операция CAS поддерживается аппаратно, то её использование вносит гораздо меньше накладных расходов, чем задействование более сложных механизмов операционной системы или виртуальной машины для блокирования и разблокирования потоков. Помимо этого, при использовании CAS сужается область, в которой потенциально может возникнуть состязание между потоками за использование общего ресурса, как правило, эта область ограничена использованием всего лишь одной общей переменной (при использовании традиционного подхода эта область обычно целый synchronized метод). То есть в этом случае уменьшается вероятность состязания за общ��е данные, и поток имеет гораздо больше шансов обработать общие данные без возникновения конфликтов, и, следовательно, без лишних задержек. В случае же, если конфликт всё-таки возникает, то время на обработку неудачного вызова CAS гораздо меньше времени, требуемого на блокирование и разблокирование потока.

Вот как мог бы быть реализован простой счётчик на основе CAS, без использования ключевого слова synchronized:

// класс, предоставляющий доступ к команде CAS
class IntegerValueUsingCAS {
private int value;

// возвращает 'true', если изменение прошло успешно
public boolean compareAndSet(int expectedValue, int newValue) {
// попытаться изменить значение 'value', используя CAS
}

public int getValue() {
return value;
}
}

class CasCounter {
private IntegerValueUsingCAS value;

public int getValue() {
return value.getValue();
}

public void increment() {
int oldValue = value.getValue();
while (!value.compareAndSet(oldValue, oldValue + 1))
oldValue = value.getValue();
}
}

При реализации метода CasCounter.increment используется традиционный для алгоритмов на основе CAS подход: получить старое значение общей переменной, вычислить новое значение и попытаться изменить переменную с помощью CAS. В случае неудачи команды CAS действие немедленно повторяется ещё раз.

Как правило, алгоритмы на основе CAS используют так называемый оптимистический подход - поток сначала пытается выполнить действия с общими данными, и если результат команды CAS показывает, что логика работы была нарушена из-за того, что какой-то другой поток одновременно модифицирует те же данные, то необходимо предпринять действия для восстановления логики работы. Приведённый пример со счётчиком довольно простой, и алгоритм работы метода increment очевиден, но сложность алгоритмов на основе CAS резко возрастает, если потоку необходимо атомарно модифицировать не одну, а две переменные, так как в этом случае поток должен последовательно выполнить две команды CAS. В том случае, если первая команда заканчивается успешно, а вторая нет, то общие данные оказываются в противоречивом состоянии, также надо учитывать то, что поток может начать работу с общими данными в тот момент, когда другой поток находится в середине процесса их модификации. Основная сложность алгоритмов, использующих CAS, это обеспечение непротиворечивости общих данных в процессе их модификации сразу несколькими потоками, для этого необходимо обнаруживать описанные ситуации и корректно их обрабатывать, что может быть очень непросто (в качестве примера можно посмотреть на реализацию связного списка на основе CAS, приведённую в статье "Introduction to nonblocking algorithms"). Вообще же разработка алгоритмов это довольно специфическое занятие, которое лучше всего оставить специалистам, а люди, занимающиеся решением конкретных прикладных задач, могут пользоваться уже готовыми классами пакета java.util.concurrent.

Помимо сложности программирования алгоритмов на основе CAS, у этого механизма синхронизации есть и другие недостатки. Среди них, например, возможность возникновения так называемого livelock'а. При возникновении livelock'а поток, в отличии от ситуации deadlock'а, не заблокирован, но не может продолжить работу, так как постоянно пытается выполнить операцию, которая всё время заканчивается неудачей. Например, при очень большом числе потоков, использующих CasCounter из нашего примера, и при определённом стечении обстоятелств, один из потоков может 'застрять', выполняя метод increment, так как для него операция value.compareAndSet всё время будет возвращать false из-за того, что CasCounter постоянно изменяется другими, более удачливыми, потоками.

Кроме того, операция CAS может увеличивать трафик на межпроцессорной шине. Некоторые процессоры могут от этого страдать, а некоторые -- нет, за счет разделенного L2$.

Итак, узнав, что такое неблокирующие алгоритмы и в чём их преимущества и недостатки, вернёмся в фреймворку, лежащему в основе пакета java.util.concurrent.

Важной частью пакета java.util.concurrent являются классы, используемые для обеспечения синхронизации (имеются в виду такие классы как, например, эксклюзивная блокировка (Lock) или семафор (Semaphore)). Общий вид алгоритмов работы таких классов довольно прост, так, например, операция захвата потоком объекта синхронизации происходит следующим образом:

while (состояние объекта не позволяет захват) {
поместить текущий поток в очередь потоков, ожидающих захват данного объекта
заблокировать текущий поток
}
если текущий поток был помещён в очередь ожидающих потоков, удалить его из очереди

а это алгоритм операции освобождения:

обновить состояние объекта
if (состояние объекта позволяет другому потоку захватить объект) {
разблокировать один из потоков, находящихся в очереди ожидания данного объекта
}

Поскольку общий вид этих алгоритмов для разных типов объектов синхронизации один и тот же, было принято решение создать абстрактный базовый класс (AbstractQueuedSynchronizer), инкапсулирующий эти общие алгоритмы (и скрывающий сложность их реализации от прикладных программистов), а его подклассы должны переопределить несколько относительно простых методов, отвечающих за логику работы конкретного объекта синхронизации. Хотя большинству разработчиков не придётся использовать AbstractQueuedSynchronizer напрямую, знание того, как устроены объекты синхронизации, может помочь при их использовании.

Если проанализировать приведённые выше алгоритмы, то можно обнаружить, что для реализации необходимой функциональности, классу AbstractQueuedSynchronizer требуются следующие возможности:

 Поддержка этой функциональности была реализована следующим образом:

Теперь посмотрим, как класс AbstractQueuedSynchronizer может использоваться на практике для создания объектов синхронизации. Для этого попробуем реализовать класс OneShotLatch (защёлка) на основе описанного фреймворка. Этот класс имеет два метода: await и signal, которые фактически являются методами захвата и освобождения объекта синхронизации. Изначально защёлка закрыта, и поток, вызывающий метод await, блокируется до тех пор, пока защёлка не открыта. Метод signal открывает защёлку и позволяет всем заблокированным потокам продолжить выполнение.

Обратимся ещё раз к приведённым выше алгоритмам захвата и освобождения объекта синхронизации. При выполнении операции захвата объекта синхронизации успех или неудача этой операции зависит от состояния объекта (например, в нашем случае у защёлки всего два состояния: защёлка закрыта или открыта), если объект был успешно захвачен, то состояние объекта может быть изменено, для того, чтобы запретить другим потокам захват этого же объекта. Класс AbstractQueuedSynchronizer хранит состояние объекта в переменной типа int, подклассы могут получить и модифицировать состояние, используя методы getState, setState и compareAndSetState. В рамках рассматриваемого фреймворка для реализации этого алгоритма класс OneShotLatch должен переопределить метод AbstractQueuedSynchronizer.tryAcquireShared, этот метод должен проверить состояние защёлки и, в зависимости от результата проверки, вернуть значение > 0, если захват удался и значение < 0, если захват неудачен:

protected int tryAcquireShared(int ignored) {
/*
* Состояние 1 значит, что защёлка открыта, в этом случае операция
* захвата успешна
*/
return (getState() == 1) ? 1 : -1;
}

Метод освобождения объекта синхронизации должен модифицировать состояние объекта и, возможно (в зависимости от состояния), разблокировать пото��и, находящиеся в очереди ожидания. За логику освобождения объекта отвечает метод AbstractQueuedSynchronizer.tryReleaseShared, который должен вернуть значение true, если потоки, находящиеся в очереди ожидания, могут быть разблокированы. Для класса OneShotLatch метод tryReleaseShared выглядит так:

protected boolean tryReleaseShared(int ignored) {
/*
* Устанавливаем состояние 1 - защёлка открыта
*/
setState(1);
return true;
}

Классу OneShotLatch необходимо реализовать лишь два этих метода, всё остальное будет сделано классом AbstractQueuedSynchronizer.

Вот полный листинг класса OneShotLatch (обратите внимание, что OneShotLatch не наследует напрямую от AbstractQueuedSynchronizer, это сделано для того, чтобы скрыть public методы класса AbstractQueuedSynchronizer от клиентов OneShotLatch):

class OneShotLatch {

private final Sync sync = new Sync();

/*
* Методы await() и signal() вызывают соответствующие методы класса AbstractQueuedSynchronizer
*/

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}

public void signal() {
sync.releaseShared(0);
}

static private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
/*
* Состояние 1 значит, что защёлка открыта, в этом случае операция
* захвата успешна
*/
return (getState() == 1) ? 1 : -1;
}

protected boolean tryReleaseShared(int ignored) {
/*
* Устанавливаем состояние 1 - защёлка открыта
*/
setState(1);
return true;
}
}
}

Как можно увидеть из примера, вся работа с потоками выполняется классом AbstractQueuedSynchronizer, а его подклассам необходимо лишь реализовать минимальный набор методов, отвечающих за логику работы конкретного объекта синхронизации, и именно так реализовано большинство объектов синхронизации пакета java.util.concurrent.

Чтобы убедиться, что всё сказанное о преимуществах синхронизации при помощи классов java.util.concurrent на самом деле правда, можно взглянуть на данные, приведённые в статье "More flexible, scalable locking in JDK 5.0", написанной в октябре 2004 года. Сравнивалась синхронизация с использованием класса ReentrantLock и на основе конструкции synchronized в JVM версии 5.0. На приведённых графиках очень хорошо видно, как при использовании synchronized производительность резко падает при увеличении числа потоков, в случае же с ReentrantLock производительность падает более плавно (а в случае с двуядерной машиной увеличение числа потоков может даже привести к увеличению производительности). Но, благодаря усилиям Дэйва Дайса ситуация могла измениться. Но это уже тема отдельного поста.

Думаю, что на этом примере рассказ о внутреннем устройстве java.util.concurrent можно закончить (хотя надо отметить, что некоторые важные возможности пакета, например, фреймворк Executor, не были упомянуты). Надеюсь, что этот небольшой обзор убедил вас, что JSR 166 это не просто набор оптимизированных классов коллекций, а принципиально новый для Java подход к организации межпоточного взаимодействия.

Ссылки:

Практически вся информация данной статьи взята из двух источников:

Ещё несколько статей, имеющих отношение к рассматриваемому вопросу:

Семён Бойков

опубликовал vmrobot ( июл 06 2007, 06:20:36 PM MSD ) Permalink Комментарии [1]

Trackback URL: http://blogs.sun.com/vmrobot/entry/java_util_concurrent
Комментарии:

Спасибо за статью, меня всегда интересовала как выглядит синхронизация изнутри. На этой странице есть некоторая информация о синхронизации потоков: http://www.ibm.com/developerworks/java/library/j-threads1.html
Однако, prefomance benchmark у вызова синхронизированных методов не впечатляют. Есть подозрение, что на многопроцессорных подобные вызовы происходят еще медленней.

опубликовал null Август 08, 2007 at 02:25 PM MSD #

Опубликовать комментарий:

Имя
E-Mail:
URL:

Ваш комментарий:

HTML Syntax: Отключен

Хиты страниц за сегодня: 42