Создание клиентского API REST с использованием реактивных расширений (Rx)

Я пытаюсь понять правильные варианты использования Reactive Extensions (Rx). Примеры, которые продолжают появляться, — это события пользовательского интерфейса (перетаскивание, рисование) и предположения о том, что Rx подходит для асинхронных приложений/операций, таких как вызовы веб-службы.

Я работаю над приложением, в котором мне нужно написать крошечный клиентский API для службы REST. Мне нужно вызвать четыре конечных точки REST, три для получения справочных данных (аэропорты, авиакомпании и статусы), а четвертая — основная служба, которая предоставит вам время полета для заданного аэропорта.

Я создал классы, предоставляющие три службы справочных данных, и методы выглядят примерно так:

public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)

В моем методе GetFlights я хочу, чтобы каждый рейс содержал ссылку на аэропорт, из которого он вылетает, и на авиакомпанию, выполняющую рейс. Для этого мне нужны данные из GetAirports и GetAirlines. Каждый аэропорт, авиакомпания и статус будут добавлены в словарь (то есть словарь), чтобы я мог легко установить ссылку при разборе каждого рейса.

flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]

Моя текущая реализация теперь выглядит так:

public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
    var airports = new AirportNamesService().GetAirports();
    var airlines = new AirlineNamesService().GetAirlines();
    var statuses = new StatusService().GetStautses();


    var referenceData = airports
        .ForkJoin(airlines, (allAirports, allAirlines) =>
                            {
                                Airports.AddRange(allAirports);
                                Airlines.AddRange(allAirlines);
                                return new Unit();
                            })
        .ForkJoin(statuses, (nothing, allStatuses) =>
                            {
                                Statuses.AddRange(allStatuses);
                                return new Unit();
                            });

    string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);

    var flights = from data in referenceData
                    from flight in GetFlightsFrom(url)
                    select flight;

    return flights;
}

private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
    return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}

Текущая реализация основана на ответе Сергея и использует ForkJoin для обеспечения последовательного выполнения и того, что я ссылаюсь на данные, загружаемые перед полетами. Эта реализация намного элегантнее, чем запуск события «ReferenceDataLoaded», как в моей предыдущей реализации.


person Jonas Follesø    schedule 14.05.2010    source источник
comment
Ответ обновлен - также взгляните на эту тему: social.msdn.microsoft.com/Forums/en/rx/thread/ — показывает, как написать собственную буферизацию.   -  person Sergey Aldoukhov    schedule 16.05.2010


Ответы (2)


Я думаю, если вы получаете список сущностей из каждого REST-вызова, ваш вызов должен иметь немного другую сигнатуру — вы не наблюдаете за каждым значением в возвращаемой коллекции, вы наблюдаете за событием завершения вызова. Итак, для аэропортов он должен иметь подпись:

public IObservable<Aiports> GetAirports()

Следующим шагом будет параллельный запуск первых трех и ожидание их всех:

var ports_lines_statuses = 
    Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());

Третьим шагом будет составление наблюдаемого выше с помощью GetFlights():

var decoratedFlights = 
  from pls in ports_lines_statuses
  let airport = MyAirportFunc(pls)
  from flight in GetFlights(airport)
  select flight;

РЕДАКТИРОВАТЬ: я до сих пор не понимаю, почему ваши услуги возвращаются

IObservable<Airport> 

вместо

IObservable<IEnumerable<Airport>>

Насколько я знаю, из вызова REST вы получаете все объекты сразу, но, может быть, вы выполняете пейджинг? В любом случае, если вы хотите, чтобы RX выполнял буферизацию, вы можете использовать .BufferWithCount() :

    var allAirports = new AirportNamesService()
        .GetAirports().BufferWithCount(int.MaxValue); 
...

Затем вы можете применить ForkJoin:

var ports_lines_statuses =  
    allAirports
        .ForkJoin(allAirlines, PortsLinesSelector)
        .ForkJoin(statuses, ...

ports_lines_statuses будет содержать одно событие на временной шкале, которое будет содержать все справочные данные.

РЕДАКТИРОВАТЬ: вот еще один, используя только что созданный ListObservable (только последний выпуск):

allAiports = airports.Start(); 
allAirlines = airlines.Start();
allStatuses = statuses.Start();

...
whenReferenceDataLoaded =
  Observable.Join(airports.WhenCompleted()
                 .And(airlines.WhenCompleted())
                 .And(statuses.WhenCompleted())
                 Then((p, l, s) => new Unit())); 



    public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
    {
        return source
            .Materialize()
            .Where(n => n.Kind == NotificationKind.OnCompleted)
            .Select(_ => new Unit());
    }
person Sergey Aldoukhov    schedule 14.05.2010
comment
На самом деле я хочу сначала получить все авиакомпании, аэропорты и статусы в одном пакете, потому что, когда я получаю рейсы, мне нужны эти три набора справочных данных, чтобы я мог связать их с рейсом. Итак, мне нужно поместить аэропорты в словарь, подобный этой Dictionart‹string, Airport›, чтобы я мог сделать: Flight.Airport = airports[flightXml.AirportCode]. - person Jonas Follesø; 15.05.2010
comment
Я обновил вопрос новыми сигнатурами методов. Вы правы, я действительно хочу получить сразу все аэропорты, авиакомпании и статусы. Правильно ли я понимаю, что PortLinesSelector — это метод, объединяющий аэропорты и авиакомпании, а затем мне нужен второй метод для объединения предыдущего результата с новым результатом? Я попытался загрузить последнюю версию RX для Silverlight 3/4, но не смог найти метод Start() в Observable (только для начала). - person Jonas Follesø; 16.05.2010
comment
@jonas-folleso Да, PortsLinesSelector — это что-то вроде (ports, lines) => new {ports, lines}, и второй селектор присоединит к этому результату третий. Идея здесь состоит в том, чтобы максимально придерживаться функционального стиля и просто передавать данные через конвейер вместо использования локальных переменных. Start() для наблюдаемого есть только в версии .Net4, поэтому вам придется подождать, пока он не будет перенесен на другие... - person Sergey Aldoukhov; 16.05.2010
comment
Окей, круто. Вы определенно указали мне правильное направление, и, поскольку я получил что-то, что работает очень хорошо, я отмечу это как ответ. Большое спасибо! - person Jonas Follesø; 17.05.2010

Вариант использования здесь основан на вытягивании - IEnumerable в порядке. Если вы хотите сказать, уведомить, когда приходит новый рейс, то обертывание вызова REST на основе извлечения в Observable.Generate может иметь некоторую ценность.

person Scott Weinstein    schedule 15.05.2010
comment
Итак, Rx не является хорошим подходом для создания клиента REST в моем сценарии? Поскольку это WP7, я не могу сделать его синхронным, поэтому альтернативой может быть выполнение: GetAirlinesAsync и наличие события GetAirlinesCompleted. Затем мне пришлось бы вызывать GetAirlinesAsync, GetAirportsAsync и GetStatusesAsync и ждать, пока сработают все три события обратного вызова, прежде чем вызывать GetFlights..? Я также планировал расширить свой метод, чтобы он повторно вызывал службу GetFlights каждые 3 минуты для обновления. Следовательно, наблюдение за новыми летающими объектами по мере их прибытия звучит как хорошая идея..? - person Jonas Follesø; 16.05.2010
comment
Если базовый API основан только на асинхронности, то RX имеет гораздо больше смысла. Наблюдаемый.Создать... - person Scott Weinstein; 16.05.2010