/** @brief File writer object * Записывает данные в файл асинхронно * @param Owner владелец объекта, после завершения операции записи * получает управление */ template<class Owner> class async_file_writer : public boost::enable_shared_from_this< async_file_writer<Owner> > { typedef fixed_size_buffer_queue<0x10000> queue_type;//очередь из буферов фиксированого размера typedef boost::mutex mutex_type; public: ///ctor async_file_writer (boost::asio::io_service &io_service , const char *filename , Owner& owner) //initialization list : owner_ (owner) , stream_ ( io_service , CreateFile ( filename , GENERIC_WRITE , FILE_SHARE_WRITE|FILE_SHARE_READ , NULL , OPEN_ALWAYS , FILE_FLAG_WRITE_THROUGH | FILE_FLAG_SEQUENTIAL_SCAN | FILE_FLAG_OVERLAPPED , NULL) , offset_(0) , strand_(io_service) , name_(filename) , write_in_progress_(FALSE)) //body { LARGE_INTEGER file_pos = {}; file_pos.LowPart = SetFilePointer( stream_.native(), file_pos.LowPart, &file_pos.HighPart, FILE_END); offset_ = file_pos.QuadPart; } private: void write_handler( boost::system::error_code ec , size_t transferred , queue_type::buffer_ptr b ) { offset_ += transferred;//modify shared data InterlockedExchange(&write_in_progress_, FALSE); delete b; //start_write_ method is unlocked if (ec) { owner_.on_error_(shared_from_this(), ec); } else if ( !start_write_() ) { owner_.on_write_( shared_from_this(), transferred ); } } bool start_write_() { mutex_type::scoped_lock lock(mutex_); bool write_op_is_ready = InterlockedCompareExchange(&write_in_progress_, TRUE, FALSE) == FALSE; if( write_op_is_ready ) { //start_write_ method is locked queue_type::buffer_ptr buffer = queue_.pop(); if (buffer) { boost::asio::async_write_at( stream_, offset_, boost::asio::buffer(buffer->begin(), buffer->size()), strand_.wrap( boost::bind( &async_file_writer::write_handler, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, buffer ) ) ); return true; } InterlockedExchange(&write_in_progress_, FALSE); } return false; } ///file owner Owner& owner_; ///file queue queue_type queue_; ///file pos boost::uint64_t offset_; ///handle wrapper boost::asio::windows::random_access_handle stream_; ///stand boost::asio::strand strand_; ///nonerecursive mutex, for sync. write operation volatile LONG write_in_progress_; ///file name const std::string name_; //mutex mutex_type mutex_; public: ///object type typedef async_file_writer<Owner> type; ///pointer type typedef boost::shared_ptr<type> pointer; ///add data to file template<class I> void append(I begin, I end) { mutex_type::scoped_lock lock(mutex_); queue_.append( begin, end ); } ///create new object static pointer create(boost::asio::io_service &io_service, const char *filename, Owner& owner) { return pointer( new type(io_service, filename, owner) ); } bool start_write() { return start_write_(); } };Параметр шаблона - класс, владелец файла, который должен реализовывать методы on_error_ и on_write_. Первый из них будет вызываться при возникновении ошибок, второй - после того как очередная порция данных будет записана. Метод append позволяет добавлять данные в очередь на запись. А метод start_write запускает процесс записи. При этом если операция записи в данный момент уже выполняется, он вернет false, а если нет, то он начинает асинхронную запись и возвращает true. При этом, если во время выполнения асинхронной записи в файл, будет вызван метод append и в очередь будет добавлен еще один буфер, то он то-же будет записан. Блокировка может возникнуть в случае вызова методов append и/или start_write из разных потоков, но она будет удерживаться не долго. В общем Asio очень интересная библиотека, которая может быть использована не только как сетевая библиотека. Возможно скоро я покажу как можно ее немного расширить =)
суббота, 13 декабря 2008 г.
boost::asio, асинхронная работа с файлами
Примерно полгода назад я начал использовать библиотеку Asio. Это это кроссплатформенная библиотека, предназначеная для написания сетевых приложений. Но она позволяет работать не только с сокетами и таймерами, как может показаться после чтения документации. В принципе, ее можно использовать в качестве фреймверка для приложений использующих асинхронный ввод-вывод. Например с ее помощью можно асинхронно читать и записывать файлы либо просто выполнить любую функцию асинхронно, в другом потоке. Библиотека содержит классы asio::windows::random_access_handle и asio::windows::stream_handle, с помощью которых можно делать разные кошерные вещи, например читать и записывать файлы, либо работать с именоваными пайпами и тд.
Следующий пример демонстрирует пример работы с файлами, я выдрал этот код из одного своего проекта, немного упростив. Класс async_file_writer позволяет записывать данные в файл. Метод append добавляет данные в очередь на запись, а метод start_write начинает запись. При этом, код, вызывающий этот метод, получает упраление сразу-же не дожидаясь когда данные будут записаны в файл. Сразу оговорюсь, что в коде используется класс fixed_size_buffer_queue, исходный код которого я выкладывать не хочу, так как он содержит внешние зависимости. Это просто очередь, элементами которой являются буферы фиксированого размера. В общем, вот сам код:
Это не верно. Ты пробовал тестировать под нагрузкой?
ОтветитьУдалитьПробовал, все работало. А что неверно?
ОтветитьУдалитьВ коде действительно был race condition, исправил.
ОтветитьУдалитьЭтот комментарий был удален администратором блога.
ОтветитьУдалитьУ нас в жж за такой пост бы закидали какашками в каментах:)
ОтветитьУдалитьЕлки-палки, уникальная заметка
ОтветитьУдалитьЭтот комментарий был удален администратором блога.
ОтветитьУдалитьЭто не верно. Ты пробовал тестировать под нагрузкой?
ОтветитьУдалить