Стриланк, учитывая ваше беспокойство по поводу нежелательной активности, когда исходный поток спокоен, вас может заинтересовать этот метод стимуляции событий — в противном случае я не собирался добавлять это, так как считаю, что реализация Дж. Леннона вполне разумна (и намного проще). ), и производительность таймера не пострадает.
В этой реализации есть еще одно интересное отличие: она отличается от подхода Sample
тем, что выдает события, происходящие за пределами периода восстановления, немедленно, а не через следующий интервал выборки. Он не поддерживает таймер вне времени восстановления.
РЕДАКТИРОВАТЬ. Вот версия 3, решающая проблему, упомянутую Крисом в комментариях, — она гарантирует, что изменения, происходящие во время охлаждения, сами по себе запускают новый период охлаждения.
public static IObservable<T> LimitRate<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
return source.DistinctUntilChanged()
.GroupByUntil(k => 0,
g => Observable.Timer(duration, scheduler))
.SelectMany(x => x.FirstAsync()
.Merge(x.Skip(1)
.TakeLast(1)))
.Select(x => Observable.Return(x)
.Concat(Observable.Empty<T>()
.Delay(duration, scheduler)))
.Concat();
}
Это работает при первоначальном использовании GroupByUntil
для упаковки всех событий в одну группу на время периода охлаждения. Он отслеживает изменения и выдает окончательные изменения (если они есть) по истечении срока действия группы.
Затем результирующие события проецируются в потоки, у которых OnCompleted задерживается на период охлаждения. Затем эти потоки объединяются. Это не позволяет событиям быть ближе друг к другу, чем время охлаждения, но в противном случае они генерируются как можно скорее.
Вот модульные тесты (обновленные для редактирования версии 3), которые вы можете запустить с помощью пакетов nuget rx-testing
и nunit
:
public class LimitRateTests : ReactiveTest
{
[Test]
public void SlowerThanRateIsUnchanged()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
}
[Test]
public void FasterThanRateIsSampled()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(140, 5),
OnNext(150, 2),
OnNext(300, 3),
OnNext(350, 4));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4));
}
[Test]
public void DuplicatesAreOmitted()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(300, 1),
OnNext(350, 1));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1));
}
[Test]
public void CoolResetsCorrectly()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 2),
OnNext(205, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
}
[Test]
public void MixedPacingWorks()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(825, 5));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(850, 5));
}
}
person
James World
schedule
18.12.2013