Самый простой способ понять, что такое канал и как его использовать.

Я представил Основы Kotlin Coroutines и Функции приостановки в предыдущих сообщениях. Сегодня мы узнаем кое-что о Channel.
Как мы узнали из Kotlin Coroutines in Android - Basics, Deferred передает одно значение между сопрограммами. Channel можно рассматривать как предоставление потока значений между сопрограммами.
Оглавление
- Основы работы с каналом
- Channel.send () и Channel.receive ()
- Channel.offer () и Channel.poll ()
- SendChannel.close ()
- ReceiveChannel.cancel (причина: CancellationException? = Null)
- Политика пропускной способности канала
(1) Канал рандеву
(2) Буферизованный канал
(3) Неограниченный канал
(4) Объединенный канал
1. Основные сведения о канале
Согласно официальной документации, мы могли знать, что Channel очень похож на BlockingQueue. Самая большая разница в том, что Channel приостанавливает сопрограмму, а не блокирует ее, а Channel может быть закрыт, когда больше не нужно добавлять элементы. Мы можем отправлять элементы в канал от производителя и получать их от потребителя.
Канал интерфейса реализует как SendChannel, так и ReceiveChannel. Здесь я перечисляю наиболее часто используемые функции:
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {}
public interface SendChannel<in E> {
public val isClosedForSend: Boolean
public val isFull: Boolean
public suspend fun send(element: E)
public fun offer(element: E): Boolean
public fun close(cause: Throwable? = null): Boolean
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
}
public interface ReceiveChannel<out E> {
public val isClosedForReceive: Boolean
public val isEmpty: Boolean
public suspend fun receive(): E
public fun poll(): E?
public fun cancel(cause: CancellationException? = null)
}
Есть две важные функции приостановки - Channel.send () и Channel.receive (). Имя функции легко читается, одна сопрограмма может использовать Channel.send() для отправки некоторых значений в канал, а другая может использовать Channel.receive() для получения значений из канала.
Мы можем думать о channel посередине как о типичном Queue, он упорядочивает элементы в порядке очереди. Приведем простой пример:
Мы отправляем целые числа 1 ~ 5 на канал в Coroutine # 1 и получаем их в Coroutine # 2. Значения будут распечатаны последовательно:
D/demo: 1 D/demo: 2 D/demo: 3 D/demo: 4 D/demo: 5 D/demo: done
2. Channel.offer () и Channel.poll ()
Offer () и poll () похожи на send () и receive (), но эти две функции не являются функциями приостановки.
Offer() немедленно добавляет элемент в очередь и возвращает истину в случае успеха. В противном случае он возвращает false или выдает исключение, если isClosedForSend истинно.
Poll() немедленно получает элемент из канала. Если канал пуст или isClosedForReceive истинно без причины, то Poll() возвращает ноль. Но если канал закрыт по причине, например channel.close(IOException()), то генерирует исключение.
3. SendChannel.Close ()
Close() - это специальная функция для закрытия канала. По сути, он отправляет близкий токен по каналу, а затем последующие вызовы этой функции не имеют никакого эффекта и возвращают false. Для получения состояний канала можно использовать два значения:
- Channel.isClosedForSend
После вызоваChannel.close()это значение начинает возвращать истину. - Channel.isClosedForReceive
После вызоваChannel.close()и получения всех элементов в канале это значение начинает возвращать истину.
Итак, что, если мы по-прежнему вызываем функции Channel после закрытия канала? Официальная документация объясняет это следующим образом:
Канал, который был закрыт без причины, выдает ClosedSendChannelException при попытках отправки и ClosedReceiveChannelException при попытках приема. Канал, который был закрыт по ненулевой причине, называется отказавшим каналом. Попытки отправить или получить на отказавшем канале вызывают исключение с указанной причиной.
Чтобы предотвратить получение превышающих элементов после Channel.close(). Мы можем использовать for loop или Channel.consumeEach(), чтобы убедиться, что все элементы потребляются без сбоя приложения.
Обратите внимание, что эта функция все еще находится в эксперименте. Но он может быть выпущен в Kotlin Coroutines версии 1.4.


4. ReceiveChannel.cancel (причина: CancellationException? = Null)
После того, как мы вызываем cancel(), isClosedForReceive и isClosedForSend сразу начинают возвращать истину. Все попытки отправить на этот канал или получить с этого канала будут вызывать CancellationException.
5. Политика пропускной способности канала
Иногда нам нужно ограничить количество элементов, помещаемых в канал одновременно. Можно выбрать четыре политики емкости.

(1) Канал рандеву (без буфера)
Как показано в приведенном выше коде, канал рандеву является политикой пропускной способности по умолчанию. У этого канала нет буфера. Элемент передается от отправителя к получателю только тогда, когда два вызова встречаются во времени, что означает, что channel.send() приостанавливается, пока не будет вызван другой channel.receive(), и channel.receive() также приостановится, пока не будет вызван другой channel.send(). Изучив исходный код, мы обнаруживаем, что isBufferAlwaysEmpty и isBufferAlwaysFull возвращают true в канале Rendezvous.

Итак, как это работает? Мы пытаемся отправить 0 и 1, затем дважды вызываем receive ():
Журнал будет:
D/demo: send 0 and suspend D/demo: invoke receive() D/demo: get: 0 immediately D/demo: invoke receive() and suspend D/demo: send 1 and meet a receive() immediately D/demo: sender done D/demo: get: 1 D/demo: receiver done

Я хотел бы объяснить процесс с помощью изображения выше:
- Сопрограмма №1 вызывает
channel.send(0), затем приостанавливает работу, пока кто-нибудь не позвонитchannel.receive(). - Сопрограмма №2 вызывает
channel.receive()и сразу встречаетchannel.send(0). Итак, Coroutine # 2 немедленно получает элемент 0. - Coroutine # 2 продолжает свою работу, вызывая следующий
channel.receive(). Он приостанавливается, поскольку больше нет соответствующихchannel.send(). - Когда Coroutine # 2 приостанавливается, процесс изменяется на выполнение и возобновление Coroutine # 1. Затем вызывается
channel.send(1), и все задачи в Coroutine # 1 выполняются. - Вернитесь к Coroutine # 2 и получите 1. Все задачи Coroutine # 2 выполнены.
(2) Буферизованный канал
Дайте каналу буферную емкость. Когда буфер заполнен, channel.send() приостанавливает выполнение сопрограммы до тех пор, пока не будут израсходованы некоторые элементы в канале. Вы можете определить емкость буфера самостоятельно, например val channel = Channel<Int>(10), или использовать емкость по умолчанию: val channel = Channel<Int>(Channel.BUFFERED), которая по умолчанию равна 64 и может быть отменена параметром kotlinx.coroutines.channels.defaultBuffer в JVM. Вот пример, мы определяем канал с емкостью буфера 2. Попробуйте отправить элементы 0 ~ 5 и получить их:
Журнал будет:
D/demo: send 0 D/demo: send 1 D/demo: send 2 D/demo: receive 0 D/demo: receive 1 D/demo: receive 2 D/demo: send 3 D/demo: send 4 D/demo: send done D/demo: receive 3 D/demo: receive 4 D/demo: receive done

- Coroutine # 1 повторяет отправку элементов в канал, пока буфер не заполнится, а затем Coroutine # 1 приостанавливается. Элементы 0 и 1 будут сохранены в буфере, а
Channel.send(2)вызовет приостановку. - Coroutine # 2 повторяет элементы приема до тех пор, пока не приостанавливается. Он получает элементы 0 и 1 из буферного канала. Затем
Channel.send(2)иChannel.receive()встречаются вовремя, поэтому элемент 2 также передается в Coroutine # 2. - Непрерывно вызывается
channel.receive(), и Coroutine # 2 приостанавливается. Channel.send(3)соответствуетchannel.receive()на шаге 3, поэтому он будет доставлен напрямую. Что касается элемента 4, он будет сохранен в канале.- Сопрограмма №2 получает элементы 3 и 4, а затем завершает свою работу.

Предоставьте каналу неограниченную буферную емкость (равную Int.MAX_VALUE в коде). Это означает, что channel.send() никогда не будет приостанавливать работу сопрограммы, и каждый элемент может быть помещен в канал. Однако мы должны учитывать, что если память исчерпается, мы получим OutOfMemoryError.
Журнал будет:
D/demo: send 0 D/demo: send 1 D/demo: send 2 ... D/demo: send 98 D/demo: send 99 D/demo: send done D/demo: receive 0 D/demo: receive 1 D/demo: receive 2 ... D/demo: receive 98 D/demo: receive 99 D/demo: receive done

Channel.send()никогда не будет приостановлен, все элементы будут помещены в канал.- Вызовите
Channel.receive(), чтобы получить все элементы из канала. Когда в канале больше нет элементов, он приостанавливается.
(4) Объединенный канал
Элемент в объединенном канале будет заменен последним элементом. Таким образом, channel.send() никогда не будет приостановлен, а channel.receive() всегда получит последний элемент.
Журнал будет:
D/demo: send 0 D/demo: send 1 D/demo: send 2 D/demo: send done D/demo: receive 2 D/demo: receive done

- Мы отправляем сюда элементы 0, 1 и 2. Наконец, в канале остается только элемент 2. элементы 0 и 1 перезаписываются.
- Затем мы вызываем
channel.receive()и получаем элемент 2.
На сегодня все, надеюсь, этот пост поможет вам узнать, что такое канал и как его использовать. Если у вас есть какие-либо предложения или вопросы, оставляйте комментарии ниже, и я буду рад их обсудить со всеми вами. Спасибо, что прочитали еще раз. Увидимся в следующем посте. ✋ ✋
использованная литература
- Официальные документы Kotlin Coroutines:
- Kotlin Playground: есть отличное видео, объясняющее
Rendezvous channel.
- Этот пост действительно потрясающий, поскольку он очень ясно объясняет различия между потоками и сопрограммами. Более того, он также разделяет концепцию Coroutines Channel. Я настоятельно рекомендую вам это прочитать.