EOF в boost::async_read с thread_pull и boost 1.54

У меня странная проблема с моим серверным приложением. Моя система проста: у меня есть более 1 устройства и одно серверное приложение, которые обмениваются данными по сети. Протокол имеет бинарные пакеты с переменной длиной, но фиксированным заголовком (содержащим информацию о текущем размере пакета). Пример пакета:

char pct[maxSize] = {}
pct[0] = 0x5a //preambule
pct[1] = 0xa5 //preambule
pct[2] = 0x07 //packet size
pct[3] = 0x0A //command
... [payload]

Протокол построен по принципу команда-ответ.

Я использую boost::asio для связи - io_service с вытягиванием потока (4 потока) + асинхронная операция чтения/записи (пример кода ниже) и создаю "цикл запроса" - каждые 200 мс по таймеру:

  • запросить одно значение с устройства
  • получить результат, запросить второе значение
  • получить результат, снова запустить таймер

Это очень хорошо работает на Boost 1.53 (Debug and Release). Но потом я переключаюсь на boost 1.54 (особенно в режиме Release) начинается волшебство. Мой сервер успешно запускается, подключается к устройству и запускает «цикл запроса». Секунд 30-60 все работает нормально (данные получаю, данные правильные), но потом начинаю получать asio::error на хендле последнего чтения (всегда в одном месте). Тип ошибки: EOF. После получения ошибки я должен отключиться от устройства.

Некоторое время гугления дает мне информацию о EOF, указывающую, что другая сторона (устройство в моем случае) инициировала процедуру отключения. Но, по логике устройства это не может быть правдой. Может кто-нибудь объяснить, что происходит? Может быть, мне нужно установить какую-то опцию сокета или определить? Я вижу две возможные причины:

  • моя сторона инициализируется отключением (по какой-то причине, которую я не знаю), и EOF является ответом на это действие.
  • некоторый тайм-аут сокета.

Моя среда:

  • ОС: Windows 7/8
  • Компилятор: MSVC 2012, обновление 3

Пример кода основного "цикла запросов". Адаптировано из официального примера чата для повышения Весь код упрощен для уменьшения места :)

  • SocketWorker — низкоуровневая оболочка для сокетов
  • DeviceWorker — класс для связи устройств
  • ERes — внутренняя структура для хранилища ошибок
  • ProtoCmd и ProtoAnswer — оболочка для команды и ответа необработанного массива (аналог сообщения chat_message из пример ускоренного чата)
  • lw_service_proto namespace — предопределенные команды и максимальные размеры пакетов

Итак, примеры кода. Оболочка сокета:

namespace b = boost;
namespace ba = boost::asio;

typedef b::function<void(const ProtoAnswer answ)> DataReceiverType;

class SocketWorker
{
private:
    typedef ba::ip::tcp::socket socketType;
    typedef std::unique_ptr<socketType> socketPtrType;
    socketPtrType devSocket;
    ProtoCmd      sendCmd;
    ProtoAnswer   rcvAnsw; 

    //[other definitions]

public:

//---------------------------------------------------------------------------
ERes SocketWorker::Connect(/*[connect settings]*/)
{
    ERes res(LGS_RESULT_ERROR, "Connect to device - Unknow Error");

    using namespace boost::asio::ip;
    boost::system::error_code sock_error;

    //try to connect
    devSocket->connect(tcp::endpoint(address::from_string(/*[connect settings ip]*/), /*[connect settings port]*/), sock_error);

    if(sock_error.value() > 0) {
        //[work with error]
        devSocket->close();
    }
    else {
        //[res code ok]
    } 

    return res;
}
//---------------------------------------------------------------------------
ERes SocketWorker::Disconnect()
{
    if (devSocket->is_open())
    {
        boost::system::error_code ec;
        devSocket->shutdown(bi::tcp::socket::shutdown_send, ec);
        devSocket->close();
    }
    return ERes(LGS_RESULT_OK, "OK");
}

//---------------------------------------------------------------------------
//query any cmd
void SocketWorker::QueryCommand(const ProtoCmd cmd, DataReceiverType dataClb)
{
    sendCmd = std::move(cmd); //store command
    if (sendCmd .CommandLength() > 0)
    {
        ba::async_write(*devSocket.get(), ba::buffer(sendCmd.Data(), sendCmd.Length()),
                        b::bind(&SocketWorker::HandleSocketWrite,
                                this, ba::placeholders::error, dataClb));
    }
    else
    {
        cerr << "Send command error: nothing to send" << endl;
    }
}

//---------------------------------------------------------------------------
// boost socket handlers
void SocketWorker::HandleSocketWrite(const b::system::error_code& error, 
                                                   DataReceiverType dataClb)
{
    if (error)
    {
        cerr << "Send cmd error: " << error.message() << endl;
        //[send error to other place]
        return;
    }

    //start reading header of answer (lw_service_proto::headerSize == 3 bytes)
    ba::async_read(*devSocket.get(),
                   ba::buffer(rcvAnsw.Data(), lw_service_proto::headerSize),
                   b::bind(&SocketWorker::HandleSockReadHeader, 
                           this, ba::placeholders::error, dataClb)); 
}
//---------------------------------------------------------------------------
//handler for read header
void SocketWorker::HandleSockReadHeader(const b::system::error_code& error, DataReceiverType dataClb)
{
    if (error)
    {
        //[error working]
        return;
    }

    //decode header (check preambule and get  full packet size) and read answer payload
    if (rcvAnsw.DecodeHeaderAndGetCmdSize())
    {
      ba::async_read(*devSocket.get(),
                     ba::buffer(rcvAnsw.Answer(), rcvAnsw.AnswerLength()),
                     b::bind(&SocketWorker::HandleSockReadBody, 
                             this, ba::placeholders::error, dataClb));
    }
}
//---------------------------------------------------------------------------
//handler for andwer payload
void SocketWorker::HandleSockReadBody(const b::system::error_code& error, DataReceiverType dataClb)
{
    //if no error - send anwser to 'master'
    if (!error){
        if (dataClb != nullptr) 
            dataClb(rcvAnsw);
    }
    else{
        //[error process]

        //here i got EOF in release mode
    }
}

};

Рабочий устройства

class DeviceWorker
{
private:
    const static int LW_QUERY_TIME = 200;
    LWDeviceSocketWorker sockWorker;
    ba::io_service&    timerIOService;
    typedef std::shared_ptr<ba::deadline_timer> TimerPtr;
    TimerPtr        queryTimer;
    bool            queryCycleWorking;

    //[other definitions]
public:

ERes DeviceWorker::Connect()
{
    ERes intRes = sockWorker.Connect(/*[connect settings here]*/);

    if(intRes != LGS_RESULT_OK) {
        //[set result to error]
    }
    else {
        //[set result to success]

        //start "query cycle"
        StartNewCycleQuery();
    }

    return intRes;
}
//---------------------------------------------------------------------------
ERes DeviceWorker::Disconnect()
{
    return sockWorker.Disconnect();
}
//---------------------------------------------------------------------------
void DeviceWorker::StartNewCycleQuery()
{
    queryCycleWorking = true;
    //start timer
    queryTimer = make_shared<ba::deadline_timer>(timerIOService, bt::milliseconds(LW_QUERY_TIME));
    queryTimer->async_wait(boost::bind(&DeviceWorker::HandleQueryTimer,
                                       this, boost::asio::placeholders::error));
}
//---------------------------------------------------------------------------
void DeviceWorker::StopCycleQuery()
{
    //kill timer
    if (queryTimer) 
        queryTimer->cancel();

    queryCycleWorking = false;
}
//---------------------------------------------------------------------------
//timer handler
void DeviceWorker::HandleQueryTimer(const b::system::error_code& error)
{
    if (!error)
    {
        ProtoCmd cmd;    
        //query for first value
        cmd.EncodeCommandCore(lw_service_proto::cmdGetAlarm, 1);
        sockWorker.QueryCommand(cmd, boost::bind(&DeviceWorker::ReceiveAlarmCycle, 
                                this, _1));    
    }
}
//---------------------------------------------------------------------------
//receive first value
void DeviceWorker::ReceiveAlarmCycle(ProtoAnswer adata)
{
    //check and fix last bytes (remove \r\n from some commands)
    adata.CheckAndFixFooter();

    //[working with answer]

    if (queryCycleWorking)
    { 
        //query for second value
        ProtoCmd cmd;
        cmd.EncodeCommandCore(lw_service_proto::cmdGetEnergyLevel, 1);
        sockWorker.QueryCommand(cmd, b::bind(&DeviceWorker::ReceiveEnergyCycle, 
                                      this, _1));
    }
}
//---------------------------------------------------------------------------
//receive second value
void DeviceWorker::ReceiveEnergyCycle(ProtoAnswer edata)
{
    //check and fix last bytes (remove \r\n from some commands)
    edata.CheckAndFixFooter();

    //[working with second value]

    //start new "query cycle"
    if (queryCycleWorking)
        StartNewCycleQuery();
}

};

Любые идеи приветствуются :)

редактировать: После нескольких тестов я вижу другое изображение:

  • эта проблема воспроизводится только на boost 1.54 (режим отладки и выпуска, выпуск - намного быстрее), с boost 1.53 больше нет ошибок (возможно, я плохо очистил свой код, а затем пересобрал в первый раз....)
  • с бустом 1.54 и 1 потоком (вместо 4) все работает хорошо

Я также провел некоторое время с отладчиком и исходным кодом Boost и сделал некоторые выводы:

  • Когда я получаю EOF, мои данные уже полностью получены.
  • Этот EOF указывает, что в этой операции нечего передавать, т. е. флаг результата сокета равен 0 (нет ошибки), но флаг операции повышения, если EOF (байты передачи == 0)

В этот момент я вынужден включить буст 1.53...


person ShaKeSPeaR    schedule 15.07.2013    source источник
comment
Признаюсь, не вникал глубоко в описание проблемы... Но на первый взгляд время жизни буферов кажется мне подозрительным. В частности, вы отправляете buffer(cmd.Data(), cmd.Length()) - где cmd — локальный объект, т.е. буфер явно не переживет асинхронную операцию. Аналогично, что такое rcvAnsw, где оно определено?   -  person Igor R.    schedule 15.07.2013
comment
@ИгорьР. Мой плохой, извините :) Локальный объект для одной команды и одного ответа, определенный в SocketWorker, и поэтому живет во времени всех асинхронных операций. А вот для локального cmd хороший вопрос. я почему-то думал, что буфер копирует данные для отправки. Попробуйте сохранить команду локально... PS: добавьте локальную команду в исходный код в основном сообщении.   -  person ShaKeSPeaR    schedule 15.07.2013
comment
Нет, функция buffer() free не копирует и не владеет базовым буфером, она просто адаптирует его к концепции ConstBufferSequence (или MutableBufferSequence). повысить. org/doc/libs/1_54_0/doc/html/boost_asio/reference/   -  person Igor R.    schedule 15.07.2013
comment
Я имею в виду буфер, хранящийся где-то внутри async_send, но перечитал руководство и понял, что ошибался :) Я исправил эту проблему в своем коде - к сожалению, с основной проблемой ничего не изменилось... Но все равно спасибо за это замечание :)   -  person ShaKeSPeaR    schedule 15.07.2013
comment
сколько потоков вызывает io_service::run()?   -  person Sam Miller    schedule 15.07.2013
comment
@SamMiller io_service вызывается по запросу из 4 потоков   -  person ShaKeSPeaR    schedule 16.07.2013
comment
Воспроизводится ли проблема, если вы вызываете io_service::run() из одного потока? Если это так, пожалуйста, включите этот код в свой вопрос.   -  person Sam Miller    schedule 16.07.2013
comment
@SamMiller хорошо, попробуй завтра, спасибо. Я также провел более глубокое исследование с повышением и обнаружил некоторые интересные вещи. Чуть позже постараюсь сделать результаты отдельным ответом.   -  person ShaKeSPeaR    schedule 18.07.2013
comment
@SamMiller Пробовал с одним потоком - все работает хорошо .... Отредактировал основной пост с этой информацией   -  person ShaKeSPeaR    schedule 19.07.2013
comment
Думаю, у меня похожая проблема   -  person Arnaud    schedule 06.09.2013


Ответы (2)


У меня была точно такая же проблема, и я совершенно уверен, что это ошибка boost:: asio 1.54.0.

Вот отчет об ошибке.

Эффективное решение состоит в том, чтобы вернуться к версии 1.53, хотя на странице отчета об ошибке доступен патч для версии 1.54.

person Arnaud    schedule 06.09.2013
comment
Спасибо, откатился уже на 1.53. Надеюсь патч есть в 1.55 :) - person ShaKeSPeaR; 08.09.2013
comment
Только что посмотрел предстоящее boost 1.55. Похоже, он нашел и исправил ошибку с io_service + multi thread pool для Windows :) - person ShaKeSPeaR; 21.10.2013

Если ваше приложение нормально работает с одним потоком, вызывающим io_service::run(), но не работает с четырьмя потоками, скорее всего, у вас есть состояние гонки. Этот тип проблемы трудно диагностировать. Вообще говоря, вы должны убедиться, что ваш devSocket имеет не более одной незавершенной операции async_read() и async_write(). Ваша текущая реализация SocketWorker::QueryCommand() безоговорочно вызывает async_write(), что может нарушить предположение об упорядочении задокументировано как таковое

Эта операция реализуется с точки зрения нулевого или более вызовов функции async_write_some потока и известна как составная операция. Программа должна гарантировать, что поток не выполняет никаких других операций записи (таких как async_write, функция потока async_write_some или любые другие составные операции, выполняющие запись), пока эта операция не завершится.

классическое решение К этой задаче относится поддержание очереди исходящих сообщений. Если предыдущая запись не завершена, добавьте следующее исходящее сообщение в очередь. Когда предыдущая запись завершится, инициируйте async_write() для следующего сообщения в очереди. При использовании нескольких потоков, вызывающих io_service::run(), вам может потребоваться использовать цепочку, как это делает связанный ответ.

person Sam Miller    schedule 19.07.2013
comment
Спасибо за ответ. Я знаю об одной асинхронной операции над сокетом во время проблемы, и мой текущий дизайн избегает этого с помощью ручного цикла управления запросом, например. timer end-write cmd-read cmd-....-start timer). Но в любом случае я попытался переписать свой код с помощью strand, но безуспешно - проблема все еще здесь. Я принудительно временно переключился на буст 1.53. - person ShaKeSPeaR; 29.07.2013
comment
Я также пытался переписать код, используя async_read_until с параметрами \r\n (только для удаления моих обработчиков). Но результат тот же - я получаю EOF и byte_transferred=0, хотя мой буфер уже обрабатывает правильный пакет, т.е. он уже передан и количество байтов больше 0.... - person ShaKeSPeaR; 29.07.2013
comment
Я только что провел выходные, устраняя аналогичную проблему с Boost 1.54. Я не мог найти ничего плохого в моем коде. Установил 1.55 и больше не видел проблемы (это происходило каждый раз, когда я запускал определенный тест, запускал один и тот же тест более 100 раз с повышением 1.55 и не видел проблемы). - person regu; 06.01.2014