Как убить core.async/thread?

У меня есть несколько потоков, возвращенных core.async/thread, вовлеченных в какой-то процесс, который я собираюсь закрыть. Я не закрываю всю свою программу, только эти потоки. Как я могу завершить потоки?

Метод .stop класса Java Thread устарел, но я был бы рад его использовать, за исключением того, что core.async/thread возвращает не Thread, а ManyToManyChannel:

user=> (clojure.core.async/thread)
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x780e97c0
  "clojure.core.async.impl.channels.ManyToManyChannel@780e97c0"]
user=> (type *1)
clojure.core.async.impl.channels.ManyToManyChannel

Я не нашел никакой документации по ManyToManyChannel. Звучит как странное название типа потока, так что здесь может быть что-то элементарное, чего я не понимаю. Но вот мой нынешний наивный, бессмысленно звучащий вопрос: как убить ManyToManyChannel?

clojure.repl/thread-stopper, похоже, не влияет на ManyToManyChannels.


person Ben Kovitz    schedule 02.06.2016    source источник


Ответы (1)


Вы позволяете потоку завершиться естественным образом. Если внешнее завершение необходимо, вы должны его реализовать.

(defn terminatable [input-ch terminate-ch]
   (thread
     (loop []
       (let [[v ch] (alts!! [input-ch terminate-ch])]
         (if (identical? ch input-ch)
           (if (some? v)
             (do (process-input v) (recur))
             ;; else input-ch has closed -> don't call recur,
             ;; thread terminates
             )
           ;; else we received sth. from terminate-ch,
           ;; or terminate-ch has closed -> don't call recur,
           ;; thread terminates
           )))))

Затем завершите работу извне через (close! terminate-ch)

Наконец, вы можете определить, когда поток завершится, взяв данные из канала, возвращаемого thread.

I. e.

(take! (terminatable (chan) (doto (chan) close!)) 
       (fn [_] (println "Thread is terminated")))
person Leon Grapenthin    schedule 02.06.2016
comment
Означает ли это, что я не могу писать такой код? (async/thread (doseq [line (line-seq subprocess-stdout)] (>!! stdout-chan line)) (то есть вызов line-seq один раз вместо alts!! для каждой строки.) Если нет, есть ли какой-то способ получить что-то вроде удобного вызова line-seq для чтения всего из подпроцесса, и при этом иметь простой способ закрыть это без ждать завершения подпроцесса? - person Ben Kovitz; 02.06.2016
comment
В этом суть потоковой обработки: вы моделируете потоки с помощью каналов. Таким образом, вам придется поместить строки в канал, как input-ch в моем примере, и брать из него по одной строке за раз. т. е. (onto-chan input-ch (line-seq subprocess-stdout)). Затем v становится строкой, и вы можете выполнить дальнейшую обработку, например, поместить ее на stdout-chan в рамках реализации process-input. - person Leon Grapenthin; 03.06.2016
comment
Рассмотрите возможность инвестирования в pipeline-blocking и pipeline-async, а также в преобразователи, поскольку они предоставляют простые инструменты для параллелизации обработки вашего потока. pipeline-blocking реализует альтернативную стратегию завершения (закрытие целевых каналов). - person Leon Grapenthin; 03.06.2016
comment
Если все, что вы хотите сделать, это поместить строки в stdout-chan, вы также можете использовать (onto-chan stdout-chan (line-seq subprocess-stdout)) — этот процесс можно прервать, закрыв stdout-chan. Если вы не хотите закрывать stdout-chan, вы создаете промежуточный канал, который помещает значения в stdout-chan до тех пор, пока вы не закроете его через pipe. - person Leon Grapenthin; 03.06.2016
comment
Очень признателен! Я только что удалил (try … catch)es вокруг моих вызовов line-seq и заменил их на onto-chan. Чтобы избежать сложности alts!!, я заменил каждый (loop [] …) на (while (not @terminated?) …). Каждый поток теперь составляет всего 3–4 строки журнала. - person Ben Kovitz; 03.06.2016
comment
Не могли бы вы добавить примечание о закрытии выходного канала таких функций, как onto-chan и pipeline-blocking, чтобы они останавливались даже при чтении чего-то вроде line-seq? В документации к этим функциям это не упоминается, по крайней мере, явно, и многие люди, вероятно, сочтут эту информацию полезной. - person Ben Kovitz; 03.06.2016
comment
@BenKovitz Все они останавливаются, как только операция пут возвращает false, потому что целевой канал закрыт. Это реализация, которую можно ожидать от любого такого основного процесса, как лучшие практики автоматического освобождения процессорного времени в недетерминированных сценариях закрытия. - person Leon Grapenthin; 03.06.2016
comment
Этот метод немного менее надежен в том же диапазоне, что и ваша стратегия @terminated. Они оба завершаются позже, если происходит следующий порядок событий: 1. Значение было помещено 2. Произошло завершение 3. Источник блокирует - теперь источник должен разблокировать или создать значение, пока завершение не будет реализовано. Реализация, которую я предоставил, завершается немедленно. - person Leon Grapenthin; 03.06.2016