воскресенье, 2 октября 2011 г.

Disruptor

Продолжая разговор об очередях. Не так давно, Мартин Фаулер опубликовал статью с описанием архитектуры LMAX, это что-то вроде трейдинговой платформы, которая обрабатывает очень много мелких сообщений. Как мы уже знаем, сообщения можно обрабатывать с помощью конвейера, но обычные очереди сообщений оказывается не всегда хорошо подходят для этой цели.

Представим себе некую сложную систему из очередей и потоков, у нас есть N потоков, которые связаны N + 1 очередями. Пускай очереди реализованы на основе массивов. У нас есть множество массивов, каждое сообщение N + 1 раз записывается в массив и N + 1 раз читается из массива, при этом происходит N + 1 изменений переменных head и tail разных очередей. Даже если все очереди, это SPSC очереди, это несколько избыточно. Для решения этой проблемы, разработчики LMAX создали библиотеку Disruptor для java. Помимо этого, уже существуют .NET и С++ порты, примеры в этой статье будут использовать .NET порт.
В общем, disruptor это один большой кольцевой буфер, с помощью которого можно заменить целый огород из очередей. Во время создания, кольцевому буферу передается его размер и фабрика для создания объектов. Помимо этого, буфер можно немного настроить, он может использовать разные механизмы ожидания, блокирующий, yield (передает управление другому потоку) и spin wait. Буфер заполняется созданными фабрикой объектами, которые из него никогда не удаляются.

  var ringBuffer = new RingBuffer<ValueEntry>(
    () => new ValueEntry(),
    100 * 1024,
    ClaimStrategyFactory.ClaimStrategyOption.SingleThreaded,
    WaitStrategyFactory.WaitStrategyOption.Yielding);
Далее, пользователь библиотеки может создать множество потребителей, причем потребители могут работать как параллельно, при этом каждое сообщение будет обработано всеми параллельно включенными потребителями, а так же последовательно. У вас не получится сделать многие вещи, например, создать цикл или сделать так, чтобы сообщения обрабатывались только одним из параллельных потребителей.
  var cons = new ValueConsumer(COUNT);
  ringBuffer
    .ConsumeWith(new Incrementer())
    .Then(new Incrementer(), new Decrementer())
    .Then(cons);
В данном примере мы создаем конвейер из трех этапов, на пером этапе каждое сообщение будет обработано одним потребителем Incrementer, на втором двумя(Incrementer и Decrementer), и на третьем - одним потребителем типа ValueConsumer. При этом сообщение останется в массиве после того, как все потребители его обработают, в дальнейшем его перезапишет производитель. Потребители, это объекты, реализующие интерфейс IBatchHandler<T>:

  class Incrementer : IBatchHandler<ValueEntry>
  {
    public void OnAvailable(long sequence, ValueEntry data) {
      data.Value++;
    }
    public void OnEndOfBatch() {
    }
  }
Хочу обратить внимание на то, что потребитель изменяет параметр data, параметр data, это элемент кольцевого буфера, sequence - номер сообщения. После этого, пользователь библиотеки должен создать producer barrier - объект, с помощью которого он будет добавлять новые сообщения. Producer barrier может быть многопоточным, тогда внутри себя он будет использовать атомарные операции, либо однопоточным, тогда он будет работать немного быстрее в случае, если вызывается из одного потока.

  var pbarrier = ringBuffer.CreateProducerBarrier();
  ringBuffer.StartConsumers();
После того, как вы все это сделали, нужно вызвать метод StartConsumers, который запустит по одному потоку на каждого потребителя. Добавление нового сообщения выглядит следующим образом:

  ValueEntry data;
  var seq = barrier.NextEntry(out data);
  data.Value = i;
  barrier.Commit(seq);

мы должны в первую очередь вызвать метод NextEntry, который выделит в массиве место под наш новый элемент. При этом не происходит выделение памяти, элемент массива был создан заранее. Барьер использует claim factory для получения номера очередного сообщения (параметр claim factory конструктора) именно его и возвращает метод NextEntry. Далее, нужно изменить объект, ссылку на который мы получили вызвав NextEntry, например записав в него наше сообщение и затем, вызвать метод Commit, передав в него номер сообщения.
Такой хитрый метод записи позволяет добавлять элементы в буфер сразу из нескольких потоков без блокировок и синхронизации.Далее, будут вызываться методы OnAvailable всех потребителей в том порядке, который мы задали во время конфигурирования буфера. В этот метод также передается номер обрабатываемого сообщения, который можно использовать например для того, что-бы организовать обработку сообщений в round-robin манере, несколькими параллельными consumer-ами.
Обработчики содержат внутри себя свою текущую позицию и специальный объект, который создается с помощью wait factory, с помощью которого он может ждать появления доступных для обработки сообщений. Также, все обработчики знают о своих зависимостях и после обработки очередного сообщения "сигналят" следующему обработчику. Текущая позиция обработчика изменяется только из одного потока - потока этого обработчика, при этом генерируется write barrier. Перед тем как прочитать элемент, обработчик должен быть "разбужен" producer barrier-ом или другим обработчиком, он должен прочитать значение текущей позиции разбудившего (генерируется read barrier) и решить, сколько сообщений он может прочитать. Затем, для каждого из доступных сообщений будет вызван метод OnAvailable, а затем OnEndOfBatch.
OnEndOfBatch - позволяет обрабатывать сообщения не по оному а пачками, т.н. batch processing. Если ваш обработчик выполняет какой либо I/O, то лучше в методе OnAvailable формировать буфер для отправки(либо просто запомнить номера первого и последнего сообщений между вызовами OnEndOfBatch), а в OnEndOfBatch - непосредственно выполнять I/O.
Итак, disruptor может быть очень полезен, хотя-бы тем, что избавляет от необходимости городить и поддерживать огород из множества потоков и очередей вручную. Ну и как приятный бонус, это все очень быстро работает :)

Ну и в конце - код который я написал что-бы "поиграться" с этой библиотекой, может кому нибудь пригодится:


using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Diagnostics;
using System.Collections.Concurrent;
using Disruptor;
 
namespace QueueTest
{
	class ValueEntry
	{
		public int Value { getset; }
	}
 
	class Incrementer : IBatchHandler<ValueEntry>
	{
		public void OnAvailable(long sequence, ValueEntry data) {
			data.Value++;
		}
 
		public void OnEndOfBatch() {
		}
	}
 
	class Decrementer : IBatchHandler<ValueEntry>
	{
		public void OnAvailable(long sequence, ValueEntry data) {
			data.Value--;
		}
 
		public void OnEndOfBatch() {
		}
	}
 
	class ValueProducer
	{
		int count;
		IProducerBarrier<ValueEntry> barrier;
 
		public ValueProducer(int num, IProducerBarrier<ValueEntry> pbarrier) {
			count = num;
			barrier = pbarrier;
		}
 
		public void Run() {
			for (int i = 0; i < count; i++) {
				ValueEntry data;
				var seq = barrier.NextEntry(out data);
				data.Value = i;
				barrier.Commit(seq);
			}
		}
	}
 
	class ValueConsumer : IBatchHandler<ValueEntry>
	{
		int iter = 0;
		int count;
		AutoResetEvent evt;
 
		public ValueConsumer(int cnt) {
			count = cnt;
			evt = new AutoResetEvent(false);
		}
 
		public void OnAvailable(long sequence, ValueEntry data) {
			if (sequence == count - 1)
				evt.Set();
			if (data.Value != sequence + 1)
				Console.WriteLine("Error at {0}", iter - 1);
		}
 
		public void OnEndOfBatch() {
		}
 
		public void WaitForAll() {
			evt.WaitOne();
		}
	}
 
	class Program
	{
		const int COUNT = 100 * 1000;
		static void Main(string[] args) {
			var ringBuffer = new RingBuffer<ValueEntry>(
				() => new ValueEntry(),
				100 * 1024,
				ClaimStrategyFactory.ClaimStrategyOption.SingleThreaded,
				WaitStrategyFactory.WaitStrategyOption.Yielding);
 
			var cons = new ValueConsumer(COUNT);
			ringBuffer
				.ConsumeWith(new Incrementer())
				.Then(new Incrementer(), new Decrementer())
				.Then(cons);
			var pbarrier = ringBuffer.CreateProducerBarrier();
			var prod = new ValueProducer(COUNT, pbarrier);
			ringBuffer.StartConsumers();
			var sw = new Stopwatch();
			sw.Start();
			prod.Run();
			cons.WaitForAll();
			sw.Stop();
			Console.WriteLine("Disruptor: {0}ms", sw.ElapsedMilliseconds);
		}
	}
}

Комментариев нет: