Использование времени ожидания Rx с наблюдателем FromEventPattern

Я пытаюсь реализовать тайм-аут, когда за период времени нет событий.

Сценарий:

У меня есть объект, который вызывает событие каждый раз при получении сообщения. Я хотел бы реагировать, когда нет сообщений (событий OnReceived), полученных в течение определенного периода времени (скажем, 20 секунд)

Это то, что у меня есть до сих пор

var observable =  Observable.FromEventPattern<BasicDeliverEventHandler>(
                             handler => _innerConsumer.Received += OnReceived,
                             handler => _innerConsumer.Received -= OnReceived);

var timeout = observable.Timeout(TimeSpan.FromSeconds(20));
using (timeout.Subscribe(_ => { },
              exception => 
              Tracer.Warning("Eventing Consumer timeout : {0}", exception.Message)))
{ }

Я создаю наблюдаемое из EventPattern. Затем, используя тайм-аут. Чего я не понимаю, так это того, как получить исключение из тайм-аута. Я хочу реагировать, когда это происходит.

Я не думаю, что метод подписки является правильным, но это я понял из документации. Я открыт для предложений или других альтернатив, если это не правильно.

заранее спасибо


person Marcote    schedule 19.11.2013    source источник
comment
В вашем коде не должно быть оператора using. Это немедленно отменяет вашу подписку, что не позволяет вашим уведомлениям достигать наблюдателя.   -  person cwharris    schedule 19.11.2013


Ответы (2)


Timeout проблематичен, потому что он завершает последовательность. Throttle - это то, что вам нужно, но вам также нужно вставить начальный элемент на случай, если вы вообще не получите никаких событий.

Я преобразовываю события в Unit.Default — это полезно, когда вам все равно, что произошло, просто что-то произошло, — и использую StartWith для заполнения газа:

var timeout = observable.Select(_ => Unit.Default)
                        .StartWith(Unit.Default)
                        .Throttle(TimeSpan.FromSeconds(20);

var subs = timeout.Subscribe(_ => Console.WriteLine("Timeout!"));

Из интереса у меня также есть аналогичное решение для обнаружения отключенных клиентов — на этот раз я предоставляю одно уведомление о тайм-ауте для нескольких источников: http://www.zerobugbuild.com/?p=230

person James World    schedule 19.11.2013
comment
Я думал о добавлении StartWith к своему ответу, но как потребитель я хотел бы иметь исходное событие. Я думаю, это зависит от того, как именно потребитель собирается использовать данные. - person cwharris; 19.11.2013
comment
OP не просил об этом, но вместо этого вы всегда можете заполнить Throttle нулевым объектом, например EventArgs.Empty. - person James World; 19.11.2013
comment
Я не понял обсуждения StartWith. Я хотел бы убить - person Marcote; 19.11.2013
comment
@Markust Можете ли вы перефразировать это? Я не понимаю. - person cwharris; 20.11.2013
comment
StartWith требуется для того, чтобы поток истекал по тайм-ауту в случае, если события вообще не возникают — Throttle в этом случае не будет работать, потому что не будет никакого события для подавления. - person James World; 20.11.2013
comment
Извините, нажмите Enter, прежде чем закончить свои мысли. Во всяком случае, Джеймс понял, что я имел в виду. В случае, если у меня нет никаких событий, мне нужно будет использовать StartWith. Спасибо вам, ребята.- - person Marcote; 20.11.2013
comment
Ваше решение для отключенных клиентов очень важно, что мне было нужно. Спасибо. - person Marcote; 26.11.2013

Давайте посмотрим на код, который у вас есть.

var observable =
  Observable.FromEventPattern<BasicDeliverEventHandler>(
    handler => _innerConsumer.Received += OnReceived,
    handler => _innerConsumer.Received -= OnReceived
    );

var timeout = observable.Timeout(TimeSpan.FromSeconds(20));

using (timeout.Subscribe(
  _ => { },
  exception =>
  Tracer.Warning("Eventing Consumer timeout : {0}", exception.Message)))
{

}

Мы можем переписать логику подписки следующим образом:

var subscription = timeout.Subscribe(
  _ => { }
  exception =>
    Tracer.Warning("Eventing Consumer timeout : {0}", exception.Message)
  );

subscription.Dispose(); // This is bad

Поскольку ваша подписка уничтожается немедленно, ваш наблюдатель не получает ожидаемых вами уведомлений.

Удалив subscription.Dispose() или оператор using, ваш наблюдатель должен получить TimeoutException через 20 секунд после подписки. Однако, поскольку Exception также отменяет подписку, вы получите это Exception только один раз.

Кроме того, оператор Timeout запускает тайм-аут во время подписки и не отменяет тайм-аут, пока подписка не будет отменена или не завершится исходный наблюдатель.

Вы можете попробовать использовать другой оператор, например Throttle.

observable.Throttle(TimeSpan.FromSeconds(20))
    .Subscribe(x =>
        Console.WriteLine("it has been 20 seconds since we received the last notification.")
        )
person cwharris    schedule 19.11.2013