Раковина внутри Актера, вызываемого во время производства, но не во время теста

У меня есть актер, который вызывает поток. Во время выполнения это работает по назначению, но при тестировании поток не вызывается.

Актер (сокращенно)

class PaymentProcessorActor(repo: PaymentRepo, accountCache: AccountCache, config: AppConfig) extends Actor {

  implicit private val materializer: ActorMaterializer = ActorMaterializer()
  implicit private val network: Network = config.network
  private implicit val ec: ExecutionContextExecutor = context.dispatcher

  val paymentSink: Sink[(Seq[Payment], Account), NotUsed] =
    Flow[(Seq[Payment], Account)].map { case (ps, account) =>
      println("inside flow")
      // ... block of type Future[(TransactionResponse, Seq[Payment], Account)] here
    }
    .mapAsync(parallelism = config.accounts.size)(_.map {
      case ((_: TransactionApproved, ps), account) =>
        // handle approval

      case ((x: TransactionRejected, ps), account) =>
        // handle rejection
    })
    .to(Sink.ignore)

  override def receive: Receive = state(nextKnownPaymentDate = None)

  private def state(nextKnownPaymentDate: Option[ZonedDateTime]): Receive =
    processPayments(nextKnownPaymentDate) orElse
      updateNextPaymentTime orElse
      confirmPayments orElse
      rejectPayments orElse
      rejectTransaction orElse
      retryPayments orElse
      updateAccount orElse
      registerAccount


  // If there are payments due, find and pay them
  def processPayments(nextKnownPaymentDate: Option[ZonedDateTime]): PartialFunction[Any, Unit] = {
    case ProcessPayments if nextKnownPaymentDate.exists(_.isBefore(ZonedDateTime.now())) =>
      val readyAccounts = accountCache.readyCount
      if (readyAccounts > 0) {
        val payments = repo.due(readyAccounts * 100)
        if (payments.isEmpty) {
          logger.debug("No more payments due.")
          context.become(state(repo.earliestTimeDue))
        } else {
          val submittingPaymentsWithAccounts: Seq[(Seq[Payment], Account)] =
            payments.grouped(100).flatMap(ps => accountCache.borrowAccount.map(ps -> _)).toSeq
          val submittingPayments: Seq[Payment] = submittingPaymentsWithAccounts.flatMap(_._1)
          repo.submit(submittingPayments.flatMap(_.id), ZonedDateTime.now)

          Source.fromIterator(() => submittingPaymentsWithAccounts.iterator).to(paymentSink).run()
          println("post source run")
        }
      }
  }

Спец. (sampleOf просто создает случайный экземпляр и не имеет отношения к проблеме).

  "the payment sink" should {
    "submit to the network" in {
      val (network, conf, repo, cache) = setup
      val account = sampleOf(genAccount)
      val payments = sampleOf(Gen.listOfN(3, genPayment))
      when(repo.earliestTimeDue).thenReturn(Some(ZonedDateTime.now()))
      when(repo.due(100)).thenReturn(payments)

      val actor = system.actorOf(Props(new PaymentProcessorActor(repo, cache, conf)))

      // these two calls set up the actor state so that payments will be processed
      actor ! UpdateNextPaymentTime
      actor ! UpdateAccount(account)

      // this invokes the stream under test
      actor ! ProcessPayments

      eventually(timeout(5 seconds)) {
        assert(network.posted.size == 1)
      }
    }
  }

  private def setup: (StubNetwork, AppConfig, PaymentRepo, AccountCache) = {
    val n = StubNetwork()
    val conf = new AppConfig {
      val network: Network = n
      val accounts: Map[String, KeyPair] = Map.empty
    }
    val repo = mock[PaymentRepo]
    (n, conf, repo, new AccountCache)
  }

Во время выполнения я вижу сообщения stdout:

post source run
inside flow

Но во время теста я вижу только

post source run

При отладке вижу, что все значения правильные и источник .run называется. Но как-то не запускается.


person Synesso    schedule 08.03.2019    source источник
comment
Не имеет прямого отношения к вашему вопросу, но во внутреннем потоке зачем вам создавать Future в map, а не mapAsync здесь? Я как бы подозреваю, что где-то есть блокировка в диспетчере, но не вижу ее здесь напрямую.   -  person Arnout Engelen    schedule 09.03.2019
comment
Спасибо за ваш комментарий @ArnoutEngelen. Я вызываю библиотечную функцию, которая возвращает Future. Полный блок: github.com/0rora/ 0rora/blob/ba373e/app/models/ Это выглядит проблематично?   -  person Synesso    schedule 11.03.2019


Ответы (1)


В строке .mapAsync(parallelism = config.accounts.size) значение равно нулю, что является ошибкой. Flow никогда не инициализировался. Этот сбой не распространяется на основной поток.

Кроме того, я отключил ведение журнала Akka для тестов в конфигурации, поэтому этот сбой не был зарегистрирован.

person Synesso    schedule 11.03.2019
comment
Просто добавьте стратегию наблюдения при создании материализатора для регистрации ошибок, например: ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider), и решающим будет: decider: Supervision.Decider = { case ex: Exception => `logger.error(sUnhandled exception in stream: $ex)` Supervision.Stop ` }` - person Explorer; 12.03.2019