воскресенье, 2 октября 2011 г.

Очереди сообщений

Я хотел написать пост о библиотеке disruptor, начал писать введение и, совершенно случайно, написал целый пост :)
Итак, очереди бывают разными, бывают очереди, с помощью которых реализуют обход графа в ширину, а бывают такие очереди, с помощью которых можно передавать сообщения от одного потока к другому, этот пост именно о них.
Для начала договоримся, что очередь, это структура данных, поддерживающая две операции - enqueue и dequeue, добавление и извлечение элемента данных из очереди. Элементы извлекаются из очереди в FIFO порядке.
Когда речь заходит о распараллеливании каких либо вычислений, то многим на ум сразу приходит параллелизм на уровне данных, то бишь одну половину массива мы обрабатываем в одном потоке, а вторую в другом, в простейшем случае. Либо как нибудь еще разбиваем задачу на независимые части, обрабатываем их на разных процессорах и затем объединяем результаты.

Однако это возможно не всегда, некоторые вычисления представляют собой набор последовательных операций, в этом случае, мы не можем обрабатывать один элемент данных параллельно. Но, если нам нужно выполнять обработку множества последовательных элементов данных, то можно использовать принцип конвейера. Мы выполняем обработку первого элемента данных на первом процессоре, затем продолжаем обработку на втором процессоре, затем на третьем и тд. При этом, когда мы выполняем, первый этап обработки для одного сообщения, мы можем выполнять второй этап обработки предыдущего и третий этап обработки пред-предыдущего сообщений и тд., параллельно.

В итоге, мы сможем одновременно обрабатывать столько сообщений, сколько у нас есть независимых операций.
В жизни все несколько сложнее, операции выполняются с разной скоростью, поэтому, для объединения разных этапов обработки в цепочку используются очереди. Мы кладем сообщение в первую очередь, поток, выполняющий первый этап обработки вытаскивает его оттуда, выполняет обработку и кладет результат в свою выходную очередь сообщений, которая в то же время, является входной для второго этапа обработки. И так далее.

Мы можем использовать на одном из этапов обработки несколько потоков, если эта операция требует больше вычислительных ресурсов. Для этого мы должны на стороне производителя в round-robin порядке выбрать очередь, в которую будем записывать, а на стороне потребителя, в том же самом порядке выбрать очередь из которой сообщение будет извлечено и помещено в следующую очередь(это если нам нужно сохранить порядок сообщений, если не нужно, то можно сделать проще). Для этого нам нужны два счетчика, изначально их значения должны быть равны, прежде чем добавлять элемент, мы вычисляем индекс очереди как i mod N, где i - значение счетчика, N - количество очередей, добавляем элемент - queues[i mod N].enqueue(X), после этого мы увеличиваем i на единицу.
На стороне потребителя нужно извлечь последовательность элементов из множества очередей, алгоритм будет таким же, только вместо enqueue нужно вызвать dequeue.

Естественно, для решения этой задачи нельзя использовать std::queue, System.Collection.Generics.Queue, java.util.Queue или то, что есть в вашем любимом языке программирования. Эти структуры данных не потокобезопасны, доступ к ним придется синхронизировать, а это означает что два потока не смогут одновременно добавить и извлечь элемент из очереди.
Помимо этого, операции на стороне потребителя и производителя часто выполняются с разной скоростью, в случае, если производитель работает быстрее чем потребитель, достаточно долго, программа просто вылетит по OOM, так как в очередь будет помещено слишком много элементов.
Для решения всех этих проблем были созданы специальные очереди. Их можно классифицировать следующим образом: блокирующие - неблокирующие; ограниченные - неограниченные. Так же очереди классифицируют по уровню concurrency - single producer/single consumer(SPSC), single producer/multiple consumers(SPMC), multiple producers/single consumer(MPSC), multiple producers/multiple consumers(MPMC). При этом данные характеристики могут сочетаться произвольным образом, например single producer/multiple consumers очередь может быть ограниченной и неблокирующей одновременно.

Блокирующая очередь, это очередь, операции над которой могут заблокировать вызывающий поток до тех пор, пока состояние очереди не изменится. Например, в случае ограниченной очереди, попытка добавления элемента(enqueue) в переполненную очередь может быть заблокирована до тех пор, пока какой либо другой поток не извлечет элемент и не освободит место для нового элемента. Попытка извлечения элемента из пустой блокирующей очереди так же может заблокировать вызывающий поток до тех пор, пока в очередь не будет добавлен элемент. Обычно, для блокирующей очереди реализуют способ сообщить потоку потребителю, извлекающему сообщения из очереди, что производитель уже завершил работу. Данные свойства делают блокирующие очереди очень простым в использовании инструментом для организации совместной работы множества потоков.
Неблокирующие очереди, как правило просто возвращают признак того, была операция выполнена, или нет. Их преимущество, перед блокирующими очередями, состоит в том, что они могут быть реализованы без использования мьютексов, семафоров и прочих condintion variables, которые обычно применяются в блокирующих очередях для реализации ожидания.

Очереди также отличаются по внутренней реализации, существуют очереди на основе массивов и на основе списков. Общее у этих реализаций то, что в каждом случае присутствуют два указателя, Tail и Head. Tail - указатель конца списка, к которому добавляются элементы, Head - указатель на начало списка, из которого извлекаются элементы.
Таким образом, для добавления и для извлечения элементов из очереди нужно изменить одну из этих переменных и прочитать обе. Так же, очевидно, что на основе массива нельзя построить неограниченную очередь(очередь, в которую можно добавить произвольное количество элементов). Однако, очередь, реализованная на основе списка может быть ограниченной, достаточно поддерживать счетчик элементов очереди. Кстати, автор этого поста видел и гибридный вариант, очередь на основе списка, элементами которой являлись массивы фиксированного размера, это оптимизация, упрощающая жизнь сборщику мусора.

Теперь о типах очередей с точки зрения concurrency.
SPSC - самая простая и быстрая очередь, она может быть реализована вообще без использования атомарных (команды с префиксом lock, блокирующие шину памяти на время выполнения) операций, достаточно двух барьеров памяти. Мало того, операции enqueue и dequeue могут быть реализованы как wait free операции (естественно, wait free гарантия здесь условна, так как в случае списка, нужно выделить память под новый элемент, в случае массива wait free гарантия может выполняться только тогда, когда есть место в массиве).
SPMC  и MPSC очереди немного сложнее и медленнее, так как в данном случае несколько потоков могут бороться за head либо за tail указатели. По этой причине, данные очереди могут быть реализованы только с применением атомарных операций. Для такой очереди нужна как минимум одна CAS операция, с одной из сторон, для MPSC - со стороны производителя, для SPMC - со стороны потребителя.
MPMC - наиболее универсальная и наименее эффективная очередь. Она требует использования двух атомарных CAS операций, одной при добавлении элементов и одной при извлечении.

Все эти качества нужно учитывать при проектировании системы, выполняющей конвейерную обработку данных. Конвейерная обработка, это своего рода размен, мы увеличиваем общую пропускную способность(количество сообщений в сек.), но уменьшаем латентность(время обработки отдельного сообщения), ведь теперь помимо обработки сообщения, к латентности добавится время выполнения операций добавления и извлечения из всех очередей на пути сообщения. Также, не стоит забывать о балансировке нагрузки, если один из этапов конвейера работает на порядок быстрее чем все остальные, то нет смысла выделять его в отдельный этап и наоборот, если один из этапов выполняется слишком медленно, возможно стоит его разделить на части, либо выполнять его параллельно.

Комментариев нет: