Подробное руководство по освоению RxJava

RxJava присутствует на рынке разработки уже давно, но, общаясь с некоторыми из моих коллег, я обнаружил, что многие люди не начали использовать RxJava, потому что не осознают преимущества, которые он дает.

Хотя есть много статей, я пишу этот пост, чтобы объяснить, как легко можно привыкнуть к использованию RxJava. Вначале все новое, например, терминология и то, что использовать.

Нам нужно продолжать работать с этим, чтобы мы привыкли и понимали это. В этом посте мы обсудим основы RxJava и некоторые полезные идеи.

Что такое RxJava

RxJava - это реализация Java VM Reactive Extensions: библиотеки для создания асинхронных программ и программ, основанных на событиях, с использованием наблюдаемых последовательностей.

Просто мы можем определить его как API для асинхронного программирования с наблюдаемыми потоками. Это сочетание лучших идей из шаблона наблюдателя, шаблона итератора и функционального программирования.

Он расширяет шаблон наблюдателя для поддержки последовательностей данных / событий и добавляет операторы, которые позволяют декларативно составлять последовательности вместе, абстрагируясь от проблем, связанных с такими вещами, как низкоуровневые потоки, синхронизация, безопасность потоков и параллельные структуры данных.

Другими словами, мы можем сказать, что данные, испускаемые одним компонентом, и базовая структура, предоставляемая библиотеками Rx, будут распространять эти изменения на другой компонент, который зарегистрирован для получения этих изменений данных.

Почему мы должны использовать RxJava?

Прежде чем начать что-либо использовать, нужно проанализировать, зачем нам эта штука. Использование Executor Service для потоковой передачи и AsyncTask для других асинхронных операций может выполнить нашу задачу, но нам гораздо сложнее управлять ими должным образом.

Простая многопоточность

Для асинхронной работы решающее значение имеет управление потоками. Во многих ситуациях во время выполнения задачи мы сталкиваемся с ситуацией связи между фоновым потоком и основным потоком.

Простой пример - обновление пользовательского интерфейса во время выполнения фонового потока. Хотя это звучит просто, мы знаем, сколько проверок нам нужно выполнить перед обновлением пользовательского интерфейса.

Rx упростил нам задачу, поэтому с помощью Rx мы можем легко управлять задачей, которую мы запустили в отдельном потоке.

Простая обработка ошибок

Ошибки - это самая неприятная вещь для разработчиков. Выполняя множество сложных асинхронных операций, мы сталкиваемся с ошибками во многих местах.

Чтобы избежать этого, мы обычно используем _1 _ / _ 2_ или другой неоднозначный код для решения этой проблемы. Но Rx предоставляет стандартный подход для обработки этих ошибок. Он имеет стандартный подход к успешной и неудачной реализации.

Избегайте ада обратных вызовов

В любой момент, если вы выполняли вложенные сетевые вызовы, вы можете знать, какие трудности будут при их обработке. Но у Rx есть много операторов, которые очень легко решают подобные проблемы.

Шаблон наблюдателя

Давайте немного разберемся с паттерном наблюдателя. Шаблон наблюдателя - это шаблон разработки программного обеспечения, в котором объект, называемый субъектом, поддерживает список своих зависимых элементов, называемых наблюдателями.

Он автоматически уведомляет их о любых изменениях состояния, обычно вызывая один из их методов. Проще говоря, будет наблюдатель, который подписывается на Observable , чтобы получать уведомления о последних данных или, скажем, об изменениях состояния.

Формула

В какой-то степени RxJava - это изучение этой формулы и ее применение в соответствии с нашими требованиями.

Rx= SCHEDULERS + OBSERVABLE + OBSERVER

Давайте проанализируем и разберемся каждый компонент.

Наблюдаемый

В RxJava Observables - это источник, который передает данные наблюдателям. Чтобы наблюдатели могли слушать Observables, им сначала нужно подписаться.

Экземпляр, созданный после подписки в RxJava2, называется Disposable. Если задача выполнена или прекратить прослушивание Observables, мы можем отказаться от подписки, вызвав метод dispose() в экземпляре Disposable.

Мы можем думать о наблюдаемых как о поставщиках. Они обрабатывают и передают данные другим компонентам. Он выполняет некоторую работу и излучает некоторые значения.

Observable.OnSubscribe - это интерфейс, который определяет действие, которое должно выполняться, когда подписчик подписывается на Observable. Метод subscribe будет работать только тогда, когда Observer подписан на Observable.

Давайте создадим простой Observable.

Наблюдатель

Наблюдатели - это компоненты, которые потребляют данные, передаваемые наблюдаемым объектом. В Rx наблюдатели подписываются на наблюдаемое, используя метод subscribe для получения данных, испускаемых наблюдаемым.

Всякий раз, когда наблюдаемый передает данные, все зарегистрированные наблюдатели получают данные в обратном вызове метода onNext(). Здесь мы можем выполнять различные операции, такие как синтаксический анализ ответа JSON или обновление пользовательского интерфейса и т. Д. Если есть ошибка, вызванная наблюдаемым объектом, наблюдатель получит ее в onError().

Есть три основных метода, которые мы должны знать, чтобы понять Observer.

  • onNext(): Используется для получения данных, когда наблюдаемый испускает следующий элемент.
  • onError(): срабатывает при возникновении ошибки.
  • onComplete(): вызывается после отправки последнего элемента. В основном, когда работа закончена.

Давайте создадим простой экземпляр Observer.

Давайте подпишемся на наблюдаемое вместе с наблюдателем.

observableObject.subscribe(observerInstance);

Это создает подписку между наблюдателем и наблюдаемым. Теперь Observable будет выдавать значения, которые будут улавливаться onNext() Observer'а.

Вывод из консоли:

Output:
onSubscribe
onNext 10
onNext 20
onNext 30
onComplete

Планировщики

Планировщики используются, чтобы указать, в каком потоке должна выполняться работа, поэтому это более полезно в случае управления потоками.

В Rx планировщики - это компоненты, которые сообщают Observable или Observer, в каком потоке они должны работать.

Мы можем использовать subscribeOn(), чтобы указать наблюдаемому объекту, в каком потоке он должен работать, и мы можем использовать observeOn(), чтобы указать наблюдаемому, в каком потоке они должны передавать данные.

Планировщиков много, но мы обсудим три основных, которые мы используем в основном.

Schedulers.io ()

Обычно используется для вещей, связанных с вводом-выводом, таких как сетевые запросы, операции файловой системы и т. Д. Планировщик ввода-вывода поддерживается пулом потоков. Обычно мы используем его как:

observableInstance.subscribeOn(Schedulers.io())

Возвращает общий экземпляр планировщика по умолчанию, предназначенный для работы с привязкой к вводу-выводу. Это можно использовать для асинхронного выполнения блокирующих операций ввода-вывода.

Эта реализация работает аналогично ThreadPoolExecutor из java.util.concurrent с неограниченным пулом потоков.

Реализация поддерживается пулом однопоточных ScheduledExecutorService экземпляров, которые будут пытаться повторно использовать ранее запущенные экземпляры, используемые работником, в противном случае они запустят новый резервный ScheduledExecutorService экземпляр.

Обратите внимание, что этот планировщик может создавать неограниченное количество рабочих потоков, что может привести к замедлению работы системы или OutOfMemoryError. Следовательно, для случайного использования или при реализации оператора экземпляры Worker должны быть удалены.

Мы можем управлять определенными свойствами этого стандартного планировщика через системные свойства, которые должны быть установлены до того, как класс Scheduler будет указан в вашем коде. Поддерживаемые системные свойства System.getProperty().

Время поддержания активности io() Scheduler worker по умолчанию составляет 60 секунд, а приоритет потока io() Scheduler по умолчанию равен NORM_PRIORITY.

Следующий метод в структуре Rx вызывается, когда мы вызываем Schedulers.io() из любой части нашего приложения:

@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

Schedulers.computation ()

Этот планировщик можно использовать для выполнения операций с интенсивным использованием ЦП, таких как обработка огромных данных, обработка растровых изображений и т. Д. Количество потоков, созданных с помощью этого планировщика, полностью зависит от количества ядер ЦП, доступных на мобильном устройстве.

Итак, если у вас есть два ядра на вашем мобильном телефоне, у него будет два потока в пуле. Это также означает, что если эти два потока заняты, процессу придется ждать, пока они станут доступны.

Поскольку это ограничивает количество потоков, работающих параллельно, мы должны использовать планировщик вычислений , когда задачи полностью зависят от ЦП; это когда они требуют вычислительной мощности и не имеют блокирующего кода.

observableInstance.subscribeOn(Schedulers.computation())

AndroidSchedulers.mainThread ()

Этот Планировщик предоставляется библиотекой Rx Android с именем пакета io.reactivex.android.schedulers.

Этот планировщик используется для возврата выполнения к основному потоку, чтобы обновления пользовательского интерфейса могли быть выполнены при необходимости. Обычно это используется с observeOn(), как показано ниже.

observableInstance.observeOn(AndroidSchedulers.mainThread())

Пример

Давайте лучше разберемся во всех этих компонентах на примере.

Шаг 1

Добавьте зависимости на уровень приложения build.gradle.

//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.8'
//RxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
//ViewModel
implementation 'androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0'

Шаг 2

Давайте сначала разберемся с некоторыми методами, прежде чем углубляться в них, например subscribeOn() и observeOn().

subscribeOn(): это метод, который сообщает Observables, в каком потоке они должны работать. Где бы вы ни поместили метод subscribeOn() в цепочку наблюдаемых объектов, он действует только на корневое наблюдаемое и контролирует, в каком потоке он должен выполняться.

Для каждого наблюдаемого объекта должен быть только один subscribeOn(), определяющий его поток выполнения. Хотя мы определяем несколько subscribeOn() методов, тот, который ближе всего к корневому наблюдаемому, будет иметь эффект, никакие другие.

Если у нас есть цепочка наблюдаемых, корневая наблюдаемая - это та, из которой происходят выбросы.

observeOn(): этот метод сообщает, в каком потоке будут выполняться все последующие операторы (пока не встретится другой observeOn), он может появляться несколько раз в цепочке, изменяя поток выполнения различных частей кода.

Это метод, который сообщает Observers, в каком потоке они должны использовать полученные значения Observable. Если мы определили subscribeOn() для наблюдаемого и не указали observeOn(), тогда Observer будет использовать элементы только в этом конкретном потоке.

Пример использования

Обычно мы выполняем сетевые запросы на Scheduler.io(), который выполняется в отдельном рабочем потоке, но нам нужно указать observeOn() с AndroidSchedulers.mainThread(), чтобы использовать данные, выпущенные для обновления пользовательского интерфейса.

Давайте создадим простое приложение с действием, в котором при нажатии кнопки вызывается метод ViewModel для извлечения данных либо из удаленного репо, либо из локальных вычислений, чтобы проверить, как выполняется многопоточность с помощью Rx.

Давайте рассмотрим логику ViewModel.

Выход:

Шаг 3

Анализ. Наблюдая за методом в ViewModel, мы можем понять, что:

  1. CompositeDisposable: Одноразовый контейнер, который может удерживать несколько других одноразовых предметов и предлагает O (1) для сложности добавления и удаления.
  2. subscribe(): Этот метод состоит из двух частей, блоков успеха и отказа, где мы можем обрабатывать каждый случай в соответствии с требованиями.
  3. subscribeOn() и observeOn(): subscribeOn() используется, чтобы указать, в каком потоке Observable должен выполнять свою работу, а observeOn() используется, чтобы указать, в каком потоке Observable должен выдавать элементы.
  4. onCleared(): Это переопределенный метод ViewModel, в котором мы можем избавиться от всех расходных материалов. Этот метод вызывается, когда ViewModel уничтожается.

Бонус

Желая узнать больше о Rx, продолжайте читать серию Полное руководство по RxJava.

Заключение

Это еще не все, что нужно знать о RxJava; есть еще много чего для изучения, например, различные способы создания Observable и типы Observable, операторов и т. д.

Я напишу о каждом из них в своих следующих публикациях. Давай продолжим учиться.

Пожалуйста, дайте мне знать ваши предложения и комментарии.

Спасибо за прочтение.