Акселерометр RxJava Противодавление

Проблемы с обратным давлением. Использование темы публикации для получения события датчика при отправке, и необходимость сохранения данных в базе данных при подписке на тему в транзакции.

Я пытался использовать оператор .window(100), поэтому я могу массово вставлять каждый раз, когда я получаю 100 событий датчика подряд, но я могу получить только один элемент в .subscribe()

Не хотите отбрасывать события с помощью оператора буфера. Как правильно с этим справиться?

@Override
public void onSensorChanged(SensorEvent sevent) {

    Sensor sensor = sevent.sensor;

    switch (sensor.getType()) {
        case Sensor.TYPE_ACCELEROMETER:
            sensorEventPublishSubject.onNext(sevent);
            break;
    }
}

sensorEventPublishSubject
            .map(event ->
                    new AccModel(
                            event.values[0],
                            event.values[1],
                            event.values[2],
                            event.accuracy                           
                    )
            )
            .window(100)
            .subscribe(
                    new Action1<Observable<AccModel>>() {
                        @Override
                        public void call(Observable<AccModel> accModelObservable) {
                            //insert in db
                        }
                    }
            );

person ddog    schedule 25.05.2016    source источник


Ответы (1)


У вас есть два варианта, в зависимости от того, что вы хотите сделать с событием onError.

Во-первых, ваше решение с использованием .window правильное, просто оно генерирует Observable, и вы будете получать один Observable каждые 100 событий, и этот Observable, когда вы подпишетесь на него, будет воспроизводить эти 100 событий. Кроме того, в случае ошибки он также последовательно воспроизводит событие ошибки (AFAIK).

Если вас не волнует событие ошибки в последовательности, то есть решение с .buffer(100), перед которым вы должны поставить onErrorReturn() или onErrorResumeNext(), которые вы будете использовать для преобразования события onError в onNext. Это потому, что в случае onError, оператор buffer немедленно распространяет его, поэтому вы теряете события во временном буфере (‹100).

person Sasa Sekulic    schedule 25.05.2016