Как создать IObservable‹T›, который читает из очереди сообщений MSMQ?

Я удаляю нашу систему электронной почты с нашего сайта ASP.NET, который раньше отправлял электронные письма немедленно с помощью системы для обработки запросов в отдельной службе, чтобы снизить нагрузку на веб-сайт. Я пытаюсь спроектировать его на основе набора интерфейсов, чтобы я мог поменять местами реализации, если захочу, но изначально он будет основан на очереди сообщений (MSMQ) для отправки запросов в очередь, чтобы служба получала входящие запросы а затем обработать их. В настоящее время у меня примерно определены следующие интерфейсы:

// Sends one or more requests to be processed somehow
public interface IRequestSender
{
    void Send(IEnumerable<Request> requests);
}

// Listens for incoming requests and passes them to an observer to do the real work
public interface IRequestListener : IObservable<Request>
{
    void Start();
    void Stop();
}

// Processes a request given to it by a IRequestListener
public interface IRequestProcessor : IObserver<Request>
{
}

Вы заметите, что Listener и Processor используют наблюдаемый шаблон, так как я думаю, что он подходит лучше всего.

Моя проблема заключается в том, чтобы выяснить, как написать реализацию IRequestListener, которая получает от MSMQ, в основном, как мне создать подходящий IObservable<T>?

Мой первый вариант, который я нашел, — создать IObservable<T> с нуля на основе примера, приведенного Документация MSDN, но это похоже на большую работу по сантехнике.

Другим вариантом является использование Reactive Extensions, так как кажется, что они предназначены для упрощения создания наблюдаемых. Наиболее близкими, которые я нашел для использования Rx с MSMQ, являются эти страницы:

Но я не знаю, как применить эти примеры к моему интерфейсу IRequestListener.

Любые другие идеи тоже приветствуются, даже изменения в моем базовом дизайне, если они подходят.


person Peter Monks    schedule 08.02.2012    source источник


Ответы (1)


Сначала я использовал FromAsyncPattern, но потом написал для него класс, потому что он лучше справлялся с тайм-аутом и отравленными сообщениями. После запуска очереди в любом случае являются горячими наблюдаемыми. Вы также можете использовать Observable.Defer, чтобы приблизить его к Rx вместо Start/Stop.

Вот базовая реализация QueueObservable. Вы можете просто начать с звонка ListenReceive.

Subject<T> Subject = new Subject<T>();

protected void ListenReceive()
{
    Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive);
}

protected void OnReceive(IAsyncResult ar)
{
    Message message = null;

    try
    {
        message = Queue.EndReceive(ar);
    }
    catch (TimeoutException ex)
    {
        //retry?
    }

    if (message != null)
        Subject.OnNext((T) message.Body);

    Thread.Yield();

    if (!IsDisposed)
        ListenReceive();
}    

public IObservable<T> AsObservable()
{
        return Subject;
}
person Asti    schedule 09.02.2012
comment
Я проводил некоторые эксперименты в том же духе, прежде чем увидел этот ответ, использование Subject<T> внутри помогает отслеживать подписки, и моя реализация не в миллионе миль от ваших предложений, спасибо. - person Peter Monks; 10.02.2012