Дизайн RX — дросселирование обращений к внешним системам

Я пишу брокер сообщений, который подключается к IBM MQ и папкам в файловой системе. После сбора сообщений он затем материализует их в строго типизированные классы и подключает их к RX Subjects.

Я повысил осведомленность о сообщениях, которые позволяют мне определить, какие внешние системы должны быть затронуты для их обработки, поэтому я могу выполнять запросы к наблюдаемым RX и выбирать сообщения, которые не нацелены на внешнюю систему, и т. д.

Что я хочу сделать дальше, это дросселировать сообщения от внешней системы, например:

Если бы я попадал в CRM-систему с сообщением определенного типа, и я бы решил, что хочу поразить эту систему максимум 4 одновременными вызовами, я бы обрабатывал только 4 сообщения за раз, если бы у меня было 5-е сообщение, я бы нужно дождаться завершения одного из предыдущих 4, а затем перейти к 5-му. То же самое для других типов ресурсов, таких как внешние базы данных, другие внешние веб-сервисы и т. д.

Я начал исследовать этот вопрос, и до сих пор лучшим подходом к проектированию было бы написать собственный планировщик. Недостатком является то, что мне пришлось бы писать свои собственные внутренние структуры, которые помещают сообщения в очередь внутри планировщика после их получения, и именно здесь мне не нравится такой подход.

У кого-нибудь есть лучший способ сделать это?


person David Rodrigues    schedule 16.08.2013    source источник


Ответы (3)


То, что вы описываете, кажется максимальным параллелизмом. Оператор Merge поддерживает подобные вещи.

Вам нужно будет использовать что-то вроде GroupBy, чтобы разделить ваш поток в зависимости от того, куда он идет, затем использовать Merge с максимальным параллелизмом для каждой части разделения, а затем, наконец, Merge результаты снова вместе. Что-то вроде этого:

IObservable<T> requests = ...;
requests.GroupBy(request => PickExternalSystem(request))
    .Select(group => group // group.Key is the TExternalSystem
        .Select(request => Observable.Defer(() => group.Key.ExecuteAsync(request)))
        .Merge(maxConcurrency: group.Key.MaxConcurrency))
    .Merge() // merge the results of each group back together again
    .Subscribe(result => ...);
person Brandon    schedule 17.08.2013

Вы можете изучить ReactiveUI, который включает механизм регулирования скорости запросов на обслуживание. См. http://blog.paulbetts.org/index.php/2011/01/15/reactivexaml-is-now-reactiveui-2-0/

person Jim Wooley    schedule 16.08.2013

Я также опубликовал это тот же вопрос в MSDN и получил более подробный ответ с другой реализацией оператора слияния, чтобы не происходило потери данных при изменении максимальных значений параллелизма.

person David Rodrigues    schedule 20.08.2013