Предметы Forward Rx с кулдауном, переключение на выборку, когда они приходят слишком быстро

Я ищу метод Rx, который возьмет наблюдаемое и поместит последний элемент в «перезарядку», чтобы, когда предметы поступают медленнее, чем время перезарядки, они просто перенаправлялись, но когда они поступают быстрее, вы просто получить последнее значение после каждого периода восстановления.

Другими словами, я хочу переключиться на выборку с периодом t, когда элементы разделены менее чем на t времени (и переключиться обратно, когда они разбросаны).

Это действительно похоже на то, что Observable.Throttle делает, за исключением того, что таймер не сбрасывается всякий раз, когда поступает новый элемент.

Приложение, которое я имею в виду, предназначено для отправки обновлений «последнего значения» по сети. Я не хочу сообщать значение, если оно не изменилось, и я не хочу спамить быстро меняющимся значением так сильно, чтобы заглушить другие данные.

Есть ли стандартный метод, который делает то, что мне нужно?


person Craig Gidney    schedule 18.12.2013    source источник


Ответы (4)


Стриланк, учитывая ваше беспокойство по поводу нежелательной активности, когда исходный поток спокоен, вас может заинтересовать этот метод стимуляции событий — в противном случае я не собирался добавлять это, так как считаю, что реализация Дж. Леннона вполне разумна (и намного проще). ), и производительность таймера не пострадает.

В этой реализации есть еще одно интересное отличие: она отличается от подхода Sample тем, что выдает события, происходящие за пределами периода восстановления, немедленно, а не через следующий интервал выборки. Он не поддерживает таймер вне времени восстановления.

РЕДАКТИРОВАТЬ. Вот версия 3, решающая проблему, упомянутую Крисом в комментариях, — она гарантирует, что изменения, происходящие во время охлаждения, сами по себе запускают новый период охлаждения.

    public static IObservable<T> LimitRate<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        return source.DistinctUntilChanged()
                     .GroupByUntil(k => 0,
                                   g => Observable.Timer(duration, scheduler))
            .SelectMany(x => x.FirstAsync()
                              .Merge(x.Skip(1)
                                      .TakeLast(1)))
                              .Select(x => Observable.Return(x)
                                .Concat(Observable.Empty<T>()
                                    .Delay(duration, scheduler)))
                                    .Concat();
    }

Это работает при первоначальном использовании GroupByUntil для упаковки всех событий в одну группу на время периода охлаждения. Он отслеживает изменения и выдает окончательные изменения (если они есть) по истечении срока действия группы.

Затем результирующие события проецируются в потоки, у которых OnCompleted задерживается на период охлаждения. Затем эти потоки объединяются. Это не позволяет событиям быть ближе друг к другу, чем время охлаждения, но в противном случае они генерируются как можно скорее.

Вот модульные тесты (обновленные для редактирования версии 3), которые вы можете запустить с помощью пакетов nuget rx-testing и nunit:

public class LimitRateTests : ReactiveTest
{
    [Test]
    public void SlowerThanRateIsUnchanged()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(700, 3));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(700, 3));
    }

    [Test]
    public void FasterThanRateIsSampled()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(140, 5),
            OnNext(150, 2),
            OnNext(300, 3),
            OnNext(350, 4));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(400, 4));
    }

    [Test]
    public void DuplicatesAreOmitted()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 1),
            OnNext(300, 1),
            OnNext(350, 1));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1));
    }

    [Test]
    public void CoolResetsCorrectly()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 2),
            OnNext(205, 3));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));
    }

    [Test]
    public void MixedPacingWorks()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 1),
            OnNext(450, 3),
            OnNext(750, 4),
            OnNext(825, 5));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(450, 3),
            OnNext(750, 4),
            OnNext(850, 5));
    }
}
person James World    schedule 18.12.2013
comment
Интересный. GroupByUntil выглядит удобным методом, о котором стоит помнить. Я, скорее всего, уберу часть «пропуск предметов интерьера», но в остальном она полностью пригодна для использования и даже проверена. Ницца. - person Craig Gidney; 18.12.2013
comment
А как насчет SelectMany(g => g.FirstAsync().Concat(g.IgnoreElements())) ? Это делает то же самое, что и ваш SelectMany ? - person cwharris; 18.12.2013
comment
Это пропустит изменения во время периода охлаждения, не так ли? Мой SelectMany заставляет сообщать о последнем изменении во время охлаждения сразу же после охлаждения, как и было запрошено. - person James World; 18.12.2013
comment
Ааа, я понимаю, что ты говоришь. Если событие происходит между периодом охлаждения, мы не получим его после периода охлаждения. - person cwharris; 18.12.2013
comment
Да, в этом вопросе есть несколько интересных тонкостей; сначала я подумал, что это должен быть дубликат, но я не думаю, что это так. - person James World; 18.12.2013
comment
Однако в вашем случае последний предмет будет получен в конце периода охлаждения, но новый предмет может быть получен сразу после этого... мы также хотим снова охладиться после получения предмета TakeLast(1)? ОП принял ответ, но рассматривается ли этот пограничный случай? - person cwharris; 18.12.2013
comment
Да, действительно... в этот момент я решил выпить чашку чая. :) Я уверен, что должен быть элегантный способ справиться с этим, я подумаю об этом. Здесь почти пора спать! - person James World; 18.12.2013
comment
Пересмотрено описание и удалена ошибочная оригинальная реализация, так как оставление ее просто запутало. - person James World; 19.12.2013
comment
Я думаю, что новая реализация, использующая Select и Delay и Concat, не будет отбрасывать промежуточные элементы, когда элементы поступают быстро, а вместо этого создает огромные задержки для последнего элемента. - person Craig Gidney; 19.12.2013
comment
Дох! Откат... Я еще подумаю над этим - пока что я восстановил версию, которая не добавляет кулдауна к изменениям, снятым в период кулдауна. - person James World; 19.12.2013
comment
Ничего себе - это так много нюансов - я думаю, что, объединив оба моих первых двух подхода, я поймал крайние случаи. Не могу отделаться от мысли, что должен быть более простой способ пройти эти модульные тесты! И на самом деле все, что делается по сравнению с ответом Дж. Леннона, - это избегание непрерывной синхронизации и немедленное испускание изменений, а не на интервале выборки - стоит ли это дополнительной сложности? - person James World; 19.12.2013
comment
v3 работает некорректно. Он выдает сообщение в начале и в конце каждого периода, которое содержит два сообщения, и задерживает каждое сообщение на период. При насыщении сообщениями со скоростью не менее 2 за период он отстает все больше и больше, и для отправки сообщений требуется примерно в 1,3 раза больше времени стены, чем затраченное время стены. - person Cirdec; 19.03.2014

Вы можете использовать Observable.DistinctUntilChanged и Observable.Sample.

Observable.DistinctUntilChanged

Этот метод будет отображать значения, только если они отличаются от предыдущего значения. (http://www.introtorx.com/content/v1.0.10621.0/05_Filtering.html)

Observable.Sample

Метод Sample просто принимает последнее значение для каждого указанного TimeSpan. (http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample)

Чтобы создать желаемый эффект, вы можете комбинировать первый сгенерированный элемент с описанными выше.

person J. Lennon    schedule 18.12.2013
comment
Не превратит ли этот метод каждое значение в постоянную фоновую работу, пропорциональную частоте дискретизации, даже если значение не меняется? Это будет работать, но это кажется сложным недостатком. - person Craig Gidney; 18.12.2013
comment
Метод Observable.Sample не связан со статистикой, если вы не создаете новое значение, ничего не запускается. - person J. Lennon; 18.12.2013
comment
Я совсем не это имел в виду. Я имел в виду, что если значение, которое я наблюдаю, не меняется, периодическая выборка и отбрасывание как не отличающихся все еще происходят в фоновом режиме. Создание ненужной постоянной работы из воздуха кажется плохой идеей. - person Craig Gidney; 18.12.2013
comment
Может быть, я не совсем то, что вы хотите, но я не знаю, как метод Observable.Sample работает внутри, я верю, что в большинстве случаев это будет работать нормально, и да, он всегда будет уважать ваши часы. и расписания (на основе TimeSpan). - person J. Lennon; 18.12.2013
comment
Дело не в том, как он работает внутри, а в том, что он на самом деле делает: пересылает последнее значение с заданной скоростью, независимо от того, изменилось оно или нет. Затем в вашем решении DistinctUntilChanged отбрасывает его. Какое внешнее поведение я хочу, но со скрытой ценой. - person Craig Gidney; 18.12.2013
comment
это не отвечает на вопрос. - person cwharris; 21.12.2013

Я понимаю, что на этот вопрос уже был дан ответ в течение некоторого времени, но я хотел бы предоставить альтернативное решение, которое, как я думаю, более точно соответствует исходному требованию. В этом решении представлены 2 пользовательских оператора.

Во-первых, это SampleImmediate, который работает точно так же, как Sample, за исключением того, что он сразу отправляет первый элемент. Это достигается с помощью ряда операторов. Materialize / Dematerialize и DistinctUntilChanged работают вместе, чтобы исключить отправку повторяющихся уведомлений. Merge, Take(1) и Sample обеспечивают базовую функциональность «Немедленная выборка». Publish и Connect связывают их вместе. GroupBy и SelectMany гарантируют, что мы дождемся первого события, прежде чем запускать наш таймер. Create помогает нам правильно распорядиться всем.

public static IObservable<T> SampleImmediate<T>(this IObservable<T> source, TimeSpan dueTime)
{
    return source
        .GroupBy(x => 0)
        .SelectMany(group =>
        {
            return Observable.Create<T>(o =>
            {
                var connectable = group.Materialize().Publish();

                var sub = Observable.Merge(
                        connectable.Sample(dueTime),
                        connectable.Take(1)
                    )
                    .DistinctUntilChanged()
                    .Dematerialize()
                    .Subscribe(o);

                return new CompositeDisposable(connectable.Connect(), sub);
            });
        });
}

Получив SampleImmediate, мы можем создать Cooldown, используя GroupByUntil для группировки всех событий, происходящих до тех пор, пока наше скользящее Throttle окно не закроется. Когда у нас есть группа, мы просто SampleImmediate все это делаем.

public static IObservable<T> Cooldown<T>(this IObservable<T> source, TimeSpan dueTime)
{
    return source
        .GroupByUntil(x => 0, group => group.Throttle(dueTime))
        .SelectMany(group => group.SampleImmediate(dueTime));
}

Я никоим образом не утверждаю, что это решение лучше или быстрее, я просто подумал, что было бы неплохо увидеть альтернативный подход.

person cwharris    schedule 21.12.2013
comment
Интересный! Мне нравится идея слияния. Пара вещей - почему Materialise/Dematerialize? Я их снял, и без них вроде все нормально. Я провел это через свои модульные тесты, и все они прошли, кроме DuplicatesAreOmitted - OP сказал, что я не хочу сообщать значение, если оно не изменилось - я думаю, что это легко исправить с помощью общего DistinctUntilChanged. - person James World; 22.12.2013
comment
Я использовал Materialize/Dematerialize, чтобы сами уведомления были Distinct (а не значения внутри них), поскольку Take(1) и Sample потенциально могут давать одно и то же уведомление, если для группы получено только одно значение. Эта реализация не связана с различимостью значений, поскольку DistinctUntilChanged можно использовать до или после представленных мной операторов. - person cwharris; 22.12.2013
comment
Проще говоря, если бы я использовал Merge(Take(1), Sample) и наблюдаемый источник дал только одно значение, Merge дал бы два значения. Если бы я просто использовал DistinctUntilChanged при слиянии, он мог бы пропустить реальные уведомления. Если я объединю это с Materialise/Dematerialize, тогда мы будем отличаться уведомлением, а не значением. - person cwharris; 22.12.2013
comment
Я вижу, что вы говорите; Я не думал о SampleImmediate изолированно. Когда в игре есть CoolDown, в конце концов все сводится на нет. Таким образом, SampleImmediate предназначен для генерации только первого события в такт, а затем продолжается как Sample. - person James World; 22.12.2013
comment
Действительно, именно так SampleImmediate предназначен для работы, но я не уверен, что вы имеете в виду, когда все это отменяется в конце. - person cwharris; 22.12.2013

Самостоятельный ответ.

Хотя я спросил с точки зрения Rx, мой фактический случай связан с его портом (ReactiveCocoa). Больше людей знают Rx, и я мог бы перевести.

В любом случае, я реализовал его напрямую, чтобы он мог удовлетворить свойства задержки/производительности, которые я хотел:

-(RACSignal*)cooldown:(NSTimeInterval)cooldownPeriod onScheduler:(RACScheduler *)scheduler {
    need(cooldownPeriod >= 0);
    need(!isnan(cooldownPeriod));
    need(scheduler != nil);
    need(scheduler != RACScheduler.immediateScheduler);

    force(cooldownPeriod != 0); //todo: bother with no-cooldown case?
    force(!isinf(cooldownPeriod)); //todo: bother with infinite case?

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        need(subscriber != nil);

        NSObject* lock = [NSObject new];
        __block bool isCoolingDown = false;
        __block bool hasDelayedValue = false;
        __block id delayedValue = nil;
        __block RACDisposable *cooldownDisposer = nil;
        void (^onCanSendValue)(void) = ^{
            @synchronized (lock) {
                // check that we were actually cooling down
                // (e.g. what if the system thrashed before we could dispose the running-down timer, causing a redundant call?)
                if (!isCoolingDown) {
                    return;
                }

                // if no values arrived during the cooldown, we do nothing and can stop the timer for now
                if (!hasDelayedValue) {
                    isCoolingDown = false;
                    [cooldownDisposer dispose];
                    return;
                }

                // forward latest value
                id valueToSend = delayedValue;
                hasDelayedValue = false;
                delayedValue = nil;
                // todo: can this be avoided?
                // holding a lock while triggering arbitrary actions cam introduce subtle deadlock cases...
                [subscriber sendNext:valueToSend];
            }
        };
        void (^preemptivelyEndCooldown)(void) = ^{
            // forward latest value AND ALSO force cooldown to run out (disposing timer)
            onCanSendValue();
            onCanSendValue();
        };

        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            bool didStartCooldown;
            @synchronized (lock) {
                hasDelayedValue = true;
                delayedValue = x;
                didStartCooldown = !isCoolingDown;
                isCoolingDown = true;
            }

            if (didStartCooldown) {
                // first item gets sent right away
                onCanSendValue();
                // coming items have to wait for the timer to run down
                cooldownDisposer = [[RACSignal interval:cooldownPeriod onScheduler:scheduler]
                                    subscribeNext:^(id _) { onCanSendValue(); }];
            }
        } error:^(NSError *error) {
            preemptivelyEndCooldown();
            [subscriber sendError:error];
        } completed:^{
            preemptivelyEndCooldown();
            [subscriber sendCompleted];
        }];

        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            @synchronized (lock) {
                isCoolingDown = false;
                [cooldownDisposer dispose];
            }
        }];
    }] setNameWithFormat:@"[%@ cooldown:%@]", self.name, @(cooldownPeriod)];
}

Он должен почти напрямую транслироваться в .Net RX. Он прекратит выполнять какую-либо работу, когда предметы перестанут поступать, и перенаправит предметы как можно скорее, соблюдая время восстановления.

person Craig Gidney    schedule 18.12.2013