Вопросы по теме 'akka-stream'

Akka-streams: выполнить действие при запуске потока
Наличие описания потока в akka-streams val flow: Flow[Input, Output, Unit] = ??? , как мне изменить его, чтобы получить новое описание потока, которое выполняет указанный побочный эффект при запуске, т. е. когда поток материализуется?
465 просмотров
schedule 23.10.2022

Почему Akka Streams поглощает мои исключения?
Почему исключение в import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source object TestExceptionHandling { def main(args: Array[String]): Unit = { implicit val actorSystem = ActorSystem()...
7176 просмотров
schedule 23.07.2022

Нотация Akka Streams Graph DSL
Я использовал граф dsl для создания некоторых заданий потоковой обработки на основе некоторого примера кода, который я видел. Все работает отлично, мне просто трудно понять обозначения: (обновлено для 2.4) def elements: Source[Foos] = ... def...
1885 просмотров
schedule 30.06.2022

Этапы Akka Stream не выполняются одновременно
Я начинаю изучать Akka Streams и запускаю первый пример отсюда: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-rate.html#stream-rate-scala import akka.stream.scaladsl._ import akka.actor.ActorSystem import...
260 просмотров
schedule 12.07.2022

akka-http: полный запрос с потоком
Предположим, я установил произвольно сложный Flow[HttpRequest, HttpResponse, Unit] . Я уже могу использовать указанный поток для обработки входящих запросов с помощью Http().bindAndHandle(flow, "0.0.0.0", 8080) Теперь я хотел бы добавить...
1260 просмотров
schedule 05.01.2024

Обработка потока Akka в транзакции Slick
Версии программного обеспечения: Акка 2.4.4 Слик 3.1.0 Я хочу обработать элементы из потока Akka в транзакции Slick. Вот некоторый упрощенный код, иллюстрирующий один из возможных подходов: def insert(d: AnimalFields): DBIO[Long] =...
723 просмотров
schedule 01.06.2024

Akka stream — обработка асинхронных повторных попыток вне потока
Я пытаюсь создать поток Akka, где некоторые этапы являются HTTP-вызовами. Поскольку http-вызовы могут (и будут) время от времени давать сбои, я бы предпочел поместить соответствующие данные в отдельную очередь (может быть, даже внешнюю по отношению к...
137 просмотров
schedule 31.03.2024

Подключение к веб-сокету Akka Streams
Я пытаюсь найти лучший способ реализовать реальное приложение websocket с использованием akka-http и akka-streams. В основном я ищу простоту, которой я сейчас просто не понимаю. Предположим, у вас есть довольно сложный конвейер, который должен...
404 просмотров
schedule 08.05.2024

Как спроектировать систему актеров реактивного потока с возможностью разветвления
Я пытаюсь реализовать систему на основе актеров с возможностью обратного давления. По требованию главный процесс получает потоковые данные в формате JSON. Однако каждое событие JSON имеет несколько полей, таких как {ip: '123.43.12.1', страна: 'США'...
324 просмотров

Reactive-Kafka: как приостановить потребитель в случае исключения и повторить попытку по запросу
Я уже задавал этот вопрос на Google Groups, но ответа пока не получил. Поэтому размещаю это здесь для другой аудитории. Мы используем Reactive-Kafka для нашего приложения. У нас есть сценарий, как показано ниже, в котором мы хотим прекратить...
1360 просмотров
schedule 14.09.2022

Используйте источник дважды с akka-stream
Я использую платформу Play для веб-приложения, которое я создал. Play 2.5 использует Akka Stream API для потоковой передачи запроса/ответа. У меня есть конечная точка, в которой входящий файл передается непосредственно на Google Диск. Я...
636 просмотров
schedule 16.11.2022

Каков наиболее рекомендуемый способ захвата вывода TCP-сервера в очередь Akka Stream?
Я экспериментирую с Akka Streams, чтобы понять, как именно предполагается использовать то, что TCP-сервер получает от клиента (серверу не нужно отвечать клиенту). Вот стандартная реализация TCP-сервера (после применения того, что я понимаю из...
217 просмотров
schedule 20.11.2023

Потоки Akka объединяют источники, кажется, что источники запускаются одновременно
Я использую Source.combine в потоках Akka. Я заметил, что все источники запускаются в одно и то же время в начале жизненного цикла графа и буферизуют некоторые входные данные перед паузой и возобновлением в порядке, указанном в списке source.combine....
2956 просмотров
schedule 04.04.2024

Akka Stream — как выполнять потоковую передачу из нескольких источников SQS
Это последующая запись Akka Stream — выбор приемника на основе элемента в потоке . Предположим, у меня есть несколько очередей SQS, из которых я хочу выполнять потоковую передачу. Я использую коннектор AWS SQS Alpakka для создания Source ....
1016 просмотров
schedule 24.09.2022

Потоки Akka.Net и удаленное взаимодействие (Sink.ActorRefWithAck)
Я сделал довольно простую реализацию с потоками Akka.net, используя Sink.ActorRefWithAck : подписчик запрашивает большую строку у издателя, который отправляет ее фрагментами. Он отлично работает локально (UT), но не удаленно . И я не могу понять,...
461 просмотров
schedule 11.09.2022

Akka.Net PersistenceQuery не возвращает все результаты
Я использую Akka.Net (v 1.3.2) и пытаюсь запросить журнал событий для всех событий с определенным тегом. Мне нужны только события, существующие на момент запроса журнала. Внутри актера у меня есть следующий код: var readJournal =...
170 просмотров
schedule 18.09.2022

Поток Akka слишком медленный по сравнению с двумя потоками с ArrayBlockingQueue
Я работаю над проблемой производительности, которая возникла при переключении текущего проекта на поток Akka. После упрощения проблем кажется, что поток Akka передавал гораздо меньше сообщений, чем я ожидал. Здесь у меня есть два очень простых...
354 просмотров
schedule 13.09.2022

Повторить поток в akka при сбое любого этапа потока
Я использую поток akka для обработки своих данных. В котором у меня есть 1 источник, который состоит из элемента UUID. Поток выглядит следующим образом: извлекает элемент из какой-либо сторонней HTTP-службы, которая возвращает полный элемент...
934 просмотров
schedule 01.08.2022

Любой способ повторного использования Source[ByteString, Any] (без сохранения всего в памяти)
Есть ли способ сделать источник многоразовым? У меня есть сервер akka-http, который получает загрузку большого файла, а затем передает (разделенные) данные на веб-сокеты подписчика и другие HTTP-серверы через HTTP POST. В обоих случаях есть API,...
200 просмотров
schedule 01.11.2023

Мультиплексирование исходных потоков akka в две копии
Я пытаюсь кэшировать источник потоковой передачи на диск, одновременно отправляя его как HttpResponse , то есть у меня есть Source[ByteString,_] , который я хочу передать HttpEntity , но я также хочу запустить те же данные в приемник FileIO.toPath...
183 просмотров
schedule 28.07.2022