вторник, 10 февраля 2009 г.

Расширение Boost.Asio

Я уже довольно давно использую библиотеку Boost.Asio в своем проекте, в основном для организации асинхронного ввода вывода и не раз ловил себя на мысли, что хотел бы использовать ее не только для этого. На самом деле это не сложно исправить, asio это не столько сетевая библиотека, сколько очень удобный фреймверк для вашего кода =) Итак, немного теории. Библиотека asio позволяет эффективно отделить реализацию от интерфейса используя принцип dependency injection, для этого существуют три сущности: io_object - интерфейс предоставляемый пользователю; service(не путать с io_service-ом) - объект реализующий взаимодействие с ОС, например с системой ввода-вывода; implementation - хранит состояние объекта(например хэндл сокета). Все вместе это выглядит так: io_object - класс унаследованный от boost::asio::basic_io_object, объекты этого класса создаются пользователем для выполнения каких либо операций, примеры: boost::asio::deadline_timer; boost::asio::ip::tcp::socket; boost::asio::windows::random_access_handle. Конструктор класса basic_io_object принимает указатель на io_service. Каждому типу io_object-а соответствует свой service. Сервис, это объект непосредственно выполняющий операции, программа пользователя должна взаимодействовать с сервисом не напрямую а через io_object соответствующего типа, который выполняет роль интерфейса. Сервис автоматически создается до создания первого экземпляра соответствующего io_object-a, например, для всех сокетов создается один сервис(под windows это будет win_iocp_socket_service по названию понятно, что он использует порты завершения:), для таймеров - другой(deadline_timer_service, котрый под windows так-же использует IOCP). io_service содержит список сервисов для всех объектов(io_object) которые были на нем созданы(при создании получили ссылку на этот io_service в качестве параметра конструктора). basic_io_object содержит два члена класса, с помощью которых программист может получить доступ к сервису и реализации самого объекта - service и implementation. service - ссылка на сервис для данного объекта, имеющего тип basic_io_object::service_type, implementation - объект типа basic_io_object::implementation_type - который является членом класса basic_io_object и представляет из себя реализацию io_object-a, временем жизни этого объекта управляет сервис(взывая методы construct и destroy). Допустим у нас есть такой код: boost::asio::io_service io; boost::asio::deadline_timer timer(io); В первой строке будет создан io_service, он не будет содержать ни одного сервиса и если мы вызовем метод io.run(), он вернет управление сразу, так как работы у него нет. Во второй строке все немного интереснее, сначала будет создан io_object, в данном случае это deadline_timer, в конструкторе basic_io_object-a(базового класса) будет вызвана ф-я use_service которая попробует найти соответствующий сервис(для таймеров он будет иметь тип deadline_timer_service<...>) если для данного io_service-a такой сервис еще не создан, то он бует создан, далее будет вызван метод construct только-что созданного сервиса, задача которого - инициализация implementation-a (имеющего тип deadline_timer_service::implementation_type), который является членом класса basic_io_object и содержит детали реализации таймера на данной платформе. Но это еще не все :), после выхода из scope-a будет вызван деструктор таймера, который вызовет метод destroy сервиса, в качестве параметра в него будет передан timer.implementation. Далее в деструкторе io_service-a будет вызван метод shutdown_service сервиса дедлайн таймеров, а затем удален экземпляр класса deadline_timer_service. Как я уже упоминал, все это ради того, что-бы разделить реализацию и интерфейс. В качестве примера я реализую класс(хотя на самом деле их будет три :), для работы с сообщениями windows, который можно будет использовать вместе в библиотекой boost::asio. Для начала нам потребуется класс реализующий обработку сообщений - implementation. Он должен позволять устанавливать обработчики сообщений, удалять их, а так-же содержать метод для обработки конкретного сообщения.
using namespace boost::tuples;
using boost::function;

class messageloop_impl
{
   typedef boost::function<void (const MSG&, boost::system::error_code)> handler_type;
   typedef boost::unordered_map<DWORD, handler_type> table_type;
   table_type table_;
   boost::asio::io_service &io_;
   boost::asio::io_service::work work_;
   boost::mutex mutex_;
public:
   messageloop_impl(boost::asio::io_service &io_service) : io_(io_service), work_(io_service)
   {
   }
   void set_handler(DWORD id, handler_type handler)
   {
       boost::mutex::scoped_lock lock(mutex_);
       table_type::iterator i;
       bool contain_handler;
       tie(i, contain_handler) = find_(id);
       if ( contain_handler )
           runhandler_(i, MSG(), boost::asio::error::eof);
       table_.insert( std::make_pair(id, handler) );
   }
   void process_message(const MSG& msg, int& /*out*/processed)
   {
       boost::mutex::scoped_lock lock(mutex_);
       table_type::iterator i;
       bool contain_handler;
       tie(i, contain_handler) = find_(msg.message);
       if ( contain_handler )
       {
           ++processed;
           runhandler_(i, msg, boost::system::error_code());
       }
   }
   void remove_handler(DWORD id)
   {
       boost::mutex::scoped_lock lock(mutex_);
       table_type::iterator i;
       bool contain_handler;
       tie(i, contain_handler) = find_(id);
       if ( contain_handler )
       {
           runhandler_(i, MSG(), boost::asio::error::eof);
           table_.erase(i);
       }
   }
   void clear()
   {
       boost::mutex::scoped_lock lock(mutex_);
       for(table_type::iterator i = table_.begin(); i != table_.end(); ++i)
           runhandler_(i, MSG(), boost::asio::error::eof);
       table_.clear();
   }
private:
   tuple<table_type::iterator, bool> find_(DWORD id)
   {
       table_type::iterator i = table_.find(id);
       return tuple<table_type::iterator, bool>(i, i != table_.end());
   }
   void runhandler_(table_type::iterator i, const MSG& m, boost::system::error_code e)
   {
       handler_type h = i->second;
       io_.post( boost::bind(h, m, e) );
   }
};
Конструктор messageloop_impl - принимает ссылку на io_service и сохраняет ее внутри объекта класса. Обработчики сообщений реализованы на основе boost::function, имеют сигнатуру void(const MSG&, boost::system::error_code). Для хранения обработчиков сообщений используется хэш таблица table_. Ф-я set_handler добавляет обработчик в хэш таблицу, ф-я remove_handler соответственно удаляет, метод clear удаляет все обработчики. Метод process_message обрабатывает сообщение, в случае, если для данного сообщения найден обработчик, значение переменной processed увеличивается на единицу, а обработчик передается в io_service с помощью метода post, далее он вызывается методом run, poll или run_one io_service-a. Это нужно для того, что-бы наш обработчик сообщений следовал правилу, согласно которому все обработчики могут вызываться только в тех потоках, в которых был вызван метод run(poll или run_one) соответствующего io_service-a, либо во время вызова деструктора io_service-a. Так-же данный класс содержит член work_ имеющий тип boost::asio::io_service::work. io_service содержит счетчик, который инкрементируется каждый раз, когда начинается какая-либо операция, и декрементируется каждый раз после завершения очередной операции. Метод run io_service-a не завершается до тех пор, пока этот счетчик не будет равен нулю. Сделав объект класса io_service::work членом класса messageloop_impl, мы гарантируем, что этот счетчик не обнулится до тех пор, пока хоть один экземпляр messageloop_impl существует. Перед удалением обработчика, он вызывается с вторым параметром равным boost::asio::error::eof. Теперь напишем код нашего сервиса для обработки сообщений.
class basic_messageloop_service : public boost::asio::io_service::service
{
public:
   static boost::asio::io_service::id id;
   typedef boost::shared_ptr<messageloop_impl> implementation_type;

private:
   boost::mutex mutex_;
   std::set<implementation_type> processors_;
   std::set<DWORD> threads_;

   static void send_WM_QUIT_to(DWORD thread)
   {
       BOOL result = ::PostThreadMessage(thread, WM_QUIT, 0, 0);
       if (!result)
       {
           boost::system::error_code e = boost::system::error_code(::GetLastError() , boost::system::system_category);
           throw boost::system::system_error(e);
       }
   }
public:
   explicit basic_messageloop_service(boost::asio::io_service &io_service) : boost::asio::io_service::service(io_service)
   {
   }

   ~basic_messageloop_service()
   {
   }

   void construct(implementation_type &impl)
   {
       impl.reset( new messageloop_impl( get_io_service() ) );
       boost::mutex::scoped_lock lock(mutex_);
       processors_.insert(impl);
   }

   void destroy(implementation_type &impl)
   {
       boost::mutex::scoped_lock lock(mutex_);
       processors_.erase(impl);
       impl.reset();
   }

   void shutdown_service()
   {
       boost::mutex::scoped_lock lock(mutex_);
       std::for_each(threads_.begin(), threads_.end(), &basic_messageloop_service::send_WM_QUIT_to);
       threads_.clear();
   }

   template <class Handler>
   void set_handler(implementation_type &impl, DWORD id, Handler handler)
   {
       impl->set_handler(id, handler);
   }

   void remove_handler(implementation_type &impl, DWORD id)
   {
       impl->remove_handler(id);
   }

   void clear(implementation_type& impl)
   {
       impl->clear();
   }

   void loop()
   {
       MSG msg;
       DWORD thread_id = ::GetCurrentThreadId();
       {
           boost::mutex::scoped_lock lock(mutex_);
           threads_.insert(thread_id);
       }
       while(::GetMessage(&msg, NULL, 0, 0))
       {
           boost::mutex::scoped_lock lock(mutex_);
           int proc_cnt = 0;
           std::for_each(  processors_.begin(),
                           processors_.end(),
                           boost::bind( &messageloop_impl::process_message, _1, msg, boost::ref(proc_cnt) )
                         );
           if ( proc_cnt == 0 )
               ::DispatchMessage(&msg);
       }
       {
           boost::mutex::scoped_lock lock(mutex_);
           threads_.erase(thread_id);
       }
   }

};

boost::asio::io_service::id basic_messageloop_service::id;
Итак, наш класс должен быть наследником класса boost::asio::io_service::service, а так-же иметь статический член id имеющий тип boost::asio::io_service::id, который является уникальным идентификатором сервиса. Так-же класс должен определять тип implementation_type, в данном случае это boost::shared_ptr, от которого зависит тип переменной класса basic_io_object::implementation, тоесть реализации объекта обработчика сообщений. Сервис должен уметь инициализировать объекты имеющие тип implementation_type с помощью метода construct и деинициализировать их методом destroy. Так-же он содержит ряд методов для управления экземплярами класса implementation_type а так-же метод loop, в котором реализован цикл обработки сообщений. Например, метод set_handler, он принимает два параметра, первый(impl) имеет тип implementation_type, второй(id) - ид-р сообщения и третий(handler) - обработчик сообщения, реализован просто как вызов impl->set_handler(id, handler), тоесть просто делегирует вызов реализации. Метод loop, реализован следующим образом, вначале он запоминает в множестве threads_ идентификатор потока в котором он вызван, затем в цикле получает сообщения и передает их обработчикам (которые сервис запоминает в множестве processors_ во время создания каждого из них) а после получения сообщения WM_QUIT он удаляет из множества thread_ идентификатор потока в котором выполнялся. Благодаря этому можно запустить несколько циклов обработки сообщений в разных потоках и один обработчик сможет получать сообщения из любого. Ну и последнее, метод shutdown_service, должен привести к завершению всех циклов обработки сообщений, поэтому он просто посылает всем им сообщение WM_QUIT. Ну и самое последнее что нам нужно сделать - реализовать io_object. Это очень просто сделать.
template <typename Service> 
class basic_messageloop 
    : public boost::asio::basic_io_object<Service> 
{ 
public: 
    explicit basic_messageloop(boost::asio::io_service &io_service) 
        : boost::asio::basic_io_object<Service>(io_service) 
    { 
    } 

    void loop()
    {
        this->service.loop();
    }

    template<class Handler>
    void set_handler(DWORD id, Handler handler)
    {
        this->service.set_handler( this->implementation, id, handler );
    }

    template<class Handler>
    void set_handler(LPCTSTR name, Handler handler)
    {
        DWORD msg_code = ::RegisterWindowMessage(name);
        this->service.set_handler( this->implementation, msg_code, handler );
    }
}; 

typedef basic_messageloop< basic_messageloop_service > messageloop;
Единственное требование - класс должен быть наследником boost::asio::basic_io_object, где Service - наш сервис обработчиков сообщений. В этом классе метод set_handler просто вызывает метод set_handler своего сервиса и передает в него свою реализацию(implementation) и дополнительные параметры. В общем этот класс - просто интерфейс, он не должен иметь состояние(хотя его родитель имеет), а просто перенаправлять все вызовы своему сервису. Юзать это можно так:
boost::asio::io_service io;
asio_aux::messageloop message_loop1(io);
asio_aux::messageloop message_loop2(io);
message_loop1.set_handler(WM_CLOSE, &message_handler);
message_loop1.set_handler(L"MyMessage", &message_handler);
message_loop2.set_handler(WM_CLOSE, &message_handler);
message_loop2.set_handler(L"MyAnotherMessage", &message_handler);
boost::thread thr( boost::bind( &boost::asio::io_service::run, &io) );
message_loop1.loop();

8 комментариев:

mrbrook комментирует...

Епт. Мир действительно тесен. Искал мануалы по boost - попал на твой блог. Тебе бы книги писать с таких подходом - хорошая статья.

Lazin комментирует...

Спасибо, но писатель из меня совершенно никакой =)

Анонимный комментирует...

Ошибка в remove_handler. Д.б.
...
if (contain_handler)
{
runhandler_(i, MSG(), boost::asio::error::eof);
table_.erase (i);
}
...

Lazin комментирует...

Да, действительно ошибка)

greg комментирует...

интересная статья!
я кроме твоего блога в инете про boost вабще ничего на русском не нашел

Alex Ott комментирует...

2greg: у меня есть две вводных статьи про boost.asio

Анонимный комментирует...

Как говорится.. Не дать не взять, зачётная статья!

Alex Ott комментирует...

2greg: у меня есть две вводных статьи про boost.asio