RxJava onErrorResumeNext вызывает java.io.InterrupedIOException

У меня проблема с оператором RxJava onErrorResumeNext. Я хочу получить местоположение, а затем получить данные с сервера (с доработкой) зависит от местоположения, но если нет местоположения (ошибка: последовательность не содержит элементов), я хочу получить данные с сервера с другим Observable (который не зависит от местоположения ). Я пытался использовать оператор onErrorResumeNext, но получаю «java.io.InterruptedIOException: поток прерван».

Код перед добавлением onErrorResumeNext — работает хорошо

LocationService.getUpdatedOrLastKnownLocation(getContext()))
            .flatMap(location -> RestService.getPostsAround(location,0,10)) //offset = 0, limit = 10;
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(subscriber);

Код с onErrorResumeNext — выдает исключение

LocationService.getUpdatedOrLastKnownLocation(getContext()))
            .flatMap(location -> RestService.getPostsAround(location,0,10)) //offset = 0, limit = 10;
            .onErrorResumeNext(RestService.getPostsByMapProjection(googleMap.getProjection().getVisibleRegion()))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(subscriber);

Трассировки стека:

07-27 22:33:58.384 18632-18632/com.blacksea.plamobi W/System.err: java.io.InterruptedIOException: thread interrupted
07-27 22:33:58.384 18632-18632/com.blacksea.plamobi W/System.err:     at okio.Timeout.throwIfReached(Timeout.java:145)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err:     at okio.Okio$1.write(Okio.java:77)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err:     at okio.AsyncTimeout$1.write(AsyncTimeout.java:155)
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err:     at okio.RealBufferedSink.flush(RealBufferedSink.java:221)
07-27 22:33:58.386 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.internal.http.Http1xStream.finishRequest(Http1xStream.java:159)
07-27 22:33:58.386 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.internal.http.HttpEngine.readNetworkResponse(HttpEngine.java:721)
07-27 22:33:58.387 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.internal.http.HttpEngine.access$200(HttpEngine.java:81)
07-27 22:33:58.387 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.internal.http.HttpEngine$NetworkInterceptorChain.proceed(HttpEngine.java:708)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.internal.http.HttpEngine.readResponse(HttpEngine.java:563)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.RealCall.getResponse(RealCall.java:241)
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:198)
07-27 22:33:58.391 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:160)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err:     at okhttp3.RealCall.execute(RealCall.java:57)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err:     at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err:     at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request(RxJavaCallAdapterFactory.java:171)
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err:     at rx.Subscriber.setProducer(Subscriber.java:211)
07-27 22:33:58.393 18632-18632/com.blacksea.plamobi W/System.err:     at rx.Subscriber.setProducer(Subscriber.java:205)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err:     at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:152)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err:     at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:138)
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
07-27 22:33:58.395 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
07-27 22:33:58.395 18632-18632/com.blacksea.plamobi W/System.err:     at rx.Observable.unsafeSubscribe(Observable.java:8460)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OnSubscribeFlattenIterable.call(OnSubscribeFlattenIterable.java:65)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OnSubscribeFlattenIterable.call(OnSubscribeFlattenIterable.java:37)
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err:     at rx.Observable.unsafeSubscribe(Observable.java:8460)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:141)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266)
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:810)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:571)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:560)
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:276)
07-27 22:33:58.401 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMap$MapSubscriber.onError(OperatorMap.java:85)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:810)
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:571)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:560)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:844)
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorSubscribeOn$1$1.onError(OperatorSubscribeOn.java:59)
07-27 22:33:58.405 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:264)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:207)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err:     at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err:     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423)
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err:     at java.util.concurrent.FutureTask.run(FutureTask.java:237)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err:     at java.lang.Thread.run(Thread.java:818)

person Viktor Sinelnikov    schedule 27.07.2016    source источник
comment
Можете ли вы опубликовать, как это создает Observable? RestService.getPostsByMapProjection(googleMap.getProjection().getVisibleRegion() наблюдает ли он за новой нитью On()?   -  person Prakash    schedule 28.07.2016
comment
Нет, это не так. Но если я его добавлю, ничего не изменится. Наблюдаемый, созданный модернизацией   -  person Viktor Sinelnikov    schedule 28.07.2016


Ответы (3)


Если используется retrofit2, его наблюдаемые объекты не меняют поток, с которым они работают, поэтому вся цепочка до subscribeOn() работает в одном потоке из планировщика ввода-вывода. По-видимому, ошибка из RestService.getPostsAround устанавливает флаг прерывания для этого потока, и okio задыхается от этого в RestService.getPostsByMapProjection . Вы можете попробовать добавить subscribeOn(Schedulers.io()) после RestService.getPostsByMapProjection в onErrorResumeNext().

person m.ostroverkhov    schedule 28.07.2016
comment
отлично, проблема .subscribeOn(Schedulers.io()) решена. Большое спасибо. - person Viktor Sinelnikov; 28.07.2016

По моему опыту, java.io.InterruptedIOException — отвлекающий маневр, так как обычно это означает, что какой-то другой поток отписался, и все запущенные потоки (например, сетевые запросы) прерываются.

В вашем случае RestService.getPostsByMapProjection вызывается каждый раз, независимо от того, есть ли ошибка, что может не соответствовать вашему поведению; подумайте о том, чтобы обернуть его в Observable.defer()

person Tassos Bassoukos    schedule 27.07.2016
comment
Я изменил код на: .onErrorResumeNext(Observable.defer(() -> RestService.getPostsByMapProjection(googleMap.getProjection().getVisibleRegion()))), но ничего не изменилось - person Viktor Sinelnikov; 28.07.2016

Вы можете использовать максимальный параллелизм на своей flatMap, чтобы убедиться, что у вас нет параллельных проблем.

@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
    if (getClass() == ScalarSynchronousObservable.class) {
        return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
    }
    return merge(map(func), maxConcurrent);
}

В вашем коде

    LocationService.getUpdatedOrLastKnownLocation(getContext()))
        .flatMap(location -> RestService.getPostsAround(location,0,10),1) //offset = 0, limit = 10;
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);
person paul    schedule 28.07.2016