суббота, 13 декабря 2008 г.

boost::asio, асинхронная работа с файлами

Примерно полгода назад я начал использовать библиотеку Asio. Это это кроссплатформенная библиотека, предназначеная для написания сетевых приложений. Но она позволяет работать не только с сокетами и таймерами, как может показаться после чтения документации. В принципе, ее можно использовать в качестве фреймверка для приложений использующих асинхронный ввод-вывод. Например с ее помощью можно асинхронно читать и записывать файлы либо просто выполнить любую функцию асинхронно, в другом потоке. Библиотека содержит классы asio::windows::random_access_handle и asio::windows::stream_handle, с помощью которых можно делать разные кошерные вещи, например читать и записывать файлы, либо работать с именоваными пайпами и тд. Следующий пример демонстрирует пример работы с файлами, я выдрал этот код из одного своего проекта, немного упростив. Класс async_file_writer позволяет записывать данные в файл. Метод append добавляет данные в очередь на запись, а метод start_write начинает запись. При этом, код, вызывающий этот метод, получает упраление сразу-же не дожидаясь когда данные будут записаны в файл. Сразу оговорюсь, что в коде используется класс fixed_size_buffer_queue, исходный код которого я выкладывать не хочу, так как он содержит внешние зависимости. Это просто очередь, элементами которой являются буферы фиксированого размера. В общем, вот сам код:
/** @brief File writer object
*  Записывает данные в файл асинхронно
*  @param Owner владелец объекта, после завершения операции записи
*  получает управление
*/
template<class Owner>
class async_file_writer : public boost::enable_shared_from_this< async_file_writer<Owner> >
{
    typedef fixed_size_buffer_queue<0x10000> queue_type;//очередь из буферов фиксированого размера
    typedef boost::mutex mutex_type;

public:

    ///ctor
    async_file_writer (boost::asio::io_service &io_service
                      , const char *filename
                      , Owner& owner)
    //initialization list
        : owner_ (owner) 
        , stream_ ( io_service 
                  , CreateFile ( filename
                               , GENERIC_WRITE
                               , FILE_SHARE_WRITE|FILE_SHARE_READ
                               , NULL
                               , OPEN_ALWAYS
                               , FILE_FLAG_WRITE_THROUGH | FILE_FLAG_SEQUENTIAL_SCAN | FILE_FLAG_OVERLAPPED
                               , NULL)
        , offset_(0)
        , strand_(io_service)
        , name_(filename)
        , write_in_progress_(FALSE))
    //body
    {
        LARGE_INTEGER file_pos = {};

        file_pos.LowPart = 
            SetFilePointer(
                stream_.native(), 
                file_pos.LowPart, 
                &file_pos.HighPart, 
                FILE_END);

        offset_ = file_pos.QuadPart;
    }

private:

    void write_handler( boost::system::error_code ec
                      , size_t transferred
                      , queue_type::buffer_ptr b )
    {
        offset_ += transferred;//modify shared data
        InterlockedExchange(&write_in_progress_, FALSE);
        delete b;
        //start_write_ method is unlocked
        if (ec)
        {
            owner_.on_error_(shared_from_this(), ec);
        }
        else if ( !start_write_() )
        {
            owner_.on_write_( shared_from_this(), transferred );
        }
    }

    bool start_write_()
    {
        mutex_type::scoped_lock lock(mutex_);
        bool write_op_is_ready = InterlockedCompareExchange(&write_in_progress_, TRUE, FALSE) == FALSE;
        if( write_op_is_ready )
        {
            //start_write_ method is locked
            queue_type::buffer_ptr buffer = queue_.pop();
            if (buffer)    
            {
                boost::asio::async_write_at(
                        stream_, 
                        offset_, 
                        boost::asio::buffer(buffer->begin(), buffer->size()), 
                        strand_.wrap(
                            boost::bind(
                                &async_file_writer::write_handler,
                                shared_from_this(),
                                boost::asio::placeholders::error,
                                boost::asio::placeholders::bytes_transferred,
                                buffer
                            )
                        )
                    );
                return true;
            }
            InterlockedExchange(&write_in_progress_, FALSE);
        }
        return false;
    }

    ///file owner
    Owner& owner_;
    ///file queue
    queue_type queue_;
    ///file pos
    boost::uint64_t offset_;
    ///handle wrapper
    boost::asio::windows::random_access_handle stream_;
    ///stand
    boost::asio::strand strand_;
    ///nonerecursive mutex, for sync. write operation
    volatile LONG write_in_progress_;
    ///file name
    const std::string name_;
    //mutex
    mutex_type mutex_;
public:

    ///object type
    typedef async_file_writer<Owner> type;
    ///pointer type
    typedef boost::shared_ptr<type> pointer;
    ///add data to file
    template<class I>
    void append(I begin, I end)
    {
        mutex_type::scoped_lock lock(mutex_);
        queue_.append( begin, end );
    }
    ///create new object
    static pointer create(boost::asio::io_service &io_service, const char *filename, Owner& owner)
    {
        return pointer( new type(io_service, filename, owner) );
    }

    bool start_write()
    {
        return start_write_();
    }
};
Параметр шаблона - класс, владелец файла, который должен реализовывать методы on_error_ и on_write_. Первый из них будет вызываться при возникновении ошибок, второй - после того как очередная порция данных будет записана. Метод append позволяет добавлять данные в очередь на запись. А метод start_write запускает процесс записи. При этом если операция записи в данный момент уже выполняется, он вернет false, а если нет, то он начинает асинхронную запись и возвращает true. При этом, если во время выполнения асинхронной записи в файл, будет вызван метод append и в очередь будет добавлен еще один буфер, то он то-же будет записан. Блокировка может возникнуть в случае вызова методов append и/или start_write из разных потоков, но она будет удерживаться не долго. В общем Asio очень интересная библиотека, которая может быть использована не только как сетевая библиотека. Возможно скоро я покажу как можно ее немного расширить =)

8 комментариев:

Анонимный комментирует...

Это не верно. Ты пробовал тестировать под нагрузкой?

Lazin комментирует...

Пробовал, все работало. А что неверно?

Lazin комментирует...

В коде действительно был race condition, исправил.

Анонимный комментирует...
Этот комментарий был удален администратором блога.
Анонимный комментирует...

У нас в жж за такой пост бы закидали какашками в каментах:)

Анонимный комментирует...

Елки-палки, уникальная заметка

Анонимный комментирует...
Этот комментарий был удален администратором блога.
Anonymous комментирует...

Это не верно. Ты пробовал тестировать под нагрузкой?