Одной из ключевых метрик в Spark является lastCompletedBatch_processingDelay, которая указывает время, необходимое для обработки пакета событий. Если продолжительность пакетной обработки превышает временной интервал между последовательными пакетами (как настроено параметром длительность пакета), приложение больше не может успевать за скоростью ввода, и формируется резерв ожидающих пакетов. Если отставание продолжает расти, в какой-то момент у приложения может закончиться нехватка памяти и произойдет сбой.

Дайвинг

Поскольку приложение Beam-over-Spark состоит из нескольких уровней, мы сначала стремились сузить область поиска. Поэтому мы устранили любые нетривиальные операторы луча и операции ввода-вывода, такие как чтение и запись Kafka, и остались с конвейером, подобным карте идентичности, над источником в памяти, который генерировал поток целых чисел, созданный конструкцией Beam GenerateSequence. .

Если бы проблема с производительностью сохранялась даже в упрощенном, урезанном приложении, это, скорее всего, указывало бы на проблему в одном из фундаментальных слоев, а не в коде нашего приложения (который на тот момент был сведен к тривиальной карте идентификации). трансформация).

Интересно, что проблема действительно сохранялась даже в урезанной версии приложения.

Интерфейс искры

Затем мы обратились к пользовательскому интерфейсу Spark, чтобы собрать основную информацию.

Среди метрик задачи время десериализации задачи занимало больше всего времени — от 13 до 19 мс. Однако на данный момент было неясно, имеет ли это какое-либо отношение к проблемам, которые мы наблюдали.

Вторая выборка, сделанная через несколько часов, показала, что время десериализации задачи продолжало расти и достигло 41–79 мс, в то время как другие метрики задачи оставались стабильными и не показывали признаков деградации — главным подозреваемым стала десериализация задачи.

Две основные причины могут объяснить увеличение времени десериализации задач:

  1. Размер задачи растет, и системе необходимо десериализовать больше данных.
  2. Способ, которым выполняется десериализация, отрицательно влияет на время десериализации и со временем занимает больше времени.

Просматривая журналы драйвера, размер задач оставался в пределах стабильного диапазона в несколько килобайт, что указывает на то, что дополнительное время не тратится на десериализацию более крупных объектов, но, вероятно, больше связано с тем, как выполнялась десериализация, и меньше с тем, что было десериализовано.

Профилирование ЦП

Пламенные графики — это известный метод профилирования производительности приложений. В недавнем посте нашего собственного Авиема Зура подробно рассказывается и объясняется, как можно использовать пламенные графы для профилирования приложений Spark.

Мы использовали пламенные графы для профилирования JVM драйвера и исполнителя нашего приложения (не одновременно, чтобы избежать коллизий на уровне базы данных).
Профилирование драйвера выполнялось с использованием:

"spark.driver.extraJavaOptions=-javaagent:statsd-jvm-profiler-…"

Профилирование исполнителя выполняется с помощью:

"spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler-…"

Хотя явного узкого места, которое могло бы объяснить снижение производительности, которое мы наблюдали, не было, метод TorrentBroadcast.readBroadcastBlock() привлек наше внимание, поскольку мы не могли легко объяснить, почему чтение блока занимает то, что мы считали, относительно большим временем.

Мы также попытались сравнить два графика пламени одного и того же компонента, как это предложено в Блоге Брендана Грегга. Первый график пламени был сгенерирован близко ко времени запуска приложения, а второй вскоре после того, как задержка обработки значительно увеличилась. Тем не менее, диаграмма разностного пламени не выявила новых идей. В этот момент казалось, что более глубокое изучение TorrentBroadcast.readBroadcastBlock() может быть полезным.

Журналы искр

Затем мы обратились к журналам для поиска потенциальных проблем в методе TorrentBroadcast.readBroadcastBlock(). Чтобы упростить чтение журнала и избежать записи журналов несколькими процессорами на одном и том же исполнителе, мы изменили количество потоков искрового исполнителя (и драйвера) на 1.

В журналах были обнаружены свидетельства того, что время чтения широковещательной переменной увеличивалось:

DEBUG BlockManager: Getting local block broadcast_691
INFO TorrentBroadcast: Reading broadcast variable 691 took 12 ms

Перенесемся в будущее:

DEBUG BlockManager: Getting local block broadcast_211582
INFO TorrentBroadcast: Reading broadcast variable 211582 took 31 ms

Поскольку широковещательные переменные используются драйвером spark для передачи задач узлам-исполнителям, это кажется весьма актуальным для рассматриваемой проблемы. Мы решили подробнее изучить поток десериализации широковещательной рассылки (задачи) и поискать возможные задержки.

23:55:04,152 DEBUG BlockManager: Getting local block broadcast_211582

23:55:04,161 TRACE BlockManager: Put for block broadcast_211582_piece0 took 0 ms to get into synchronized block
23:55:04,181 INFO MemoryStore: Block broadcast_211582_piece0 stored as bytes in memory (estimated size 6.7 KB, free 3.3 MB)

23:55:04,183 DEBUG BlockManager: Putting block broadcast_211582_piece0 without replication took 22 ms
23:55:04,183 INFO TorrentBroadcast: Reading broadcast variable 211582 took 31 ms

Мы заметили временной разрыв более 181–161 = 20 мс, ~64% от общего времени (!), между следующими двумя последовательными распечатками:

23:55:04,161 TRACE BlockManager: Put for block broadcast_211582_piece0 took 0 ms to get into synchronized block
23:55:04,181 INFO MemoryStore: Block broadcast_211582_piece0 stored as bytes in memory (estimated size 6.7 KB, free 3.3 MB)

С точки зрения кода, казалось бы, между этими двумя распечатками ничего особенного не произошло, что привело к тому, что ~ 64% времени десериализации не было учтено.

Более пристальный взгляд на вторую распечатку (MemoryStore.scala) выявил несколько интересных моментов:

logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId,
Utils.bytesToString(size),
Utils.bytesToString(blocksMemoryUsed)))

Где blocksMemoryUsed определяется следующим образом:

private def blocksMemoryUsed: Long = memoryManager.synchronized {
memoryUsed - currentUnrollMemory
}

а currentUnrollMemory определяется следующим образом:

def currentUnrollMemory: Long = memoryManager.synchronized {
unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
}

В основном это означает, что unrollMemoryMap и pendingUnrollMemoryMap сканируются каждый раз, когда распечатывается эта конкретная строка журнала. Если бы эти карты выросли до экстремальных размеров, это могло бы определенно объяснить задержки, которые мы наблюдали, более того, это могло бы объяснить аспект деградации нашей проблемы со временем, поскольку задержки будут расти вместе с размером карт.

Мы вкратце рассмотрели возможность повышения уровня журнала, чтобы избежать этой конкретной распечатки, но быстрый поиск по использованию выявил больше случаев использования currentUnrollMemory в других местах, кроме распечаток журнала, что сделало такой обходной путь неэффективным.

Чтобы еще больше подтвердить нашу гипотезу и посмотреть, действительно ли одна из вышеупомянутых структур данных карты (или обе) стала большой, введите дампы памяти.

Дампы памяти

Дамп памяти одного из исполнителей незадолго до запуска приложения выглядел так:

Дамп памяти того же исполнителя через несколько часов выглядел так:

unrollMemoryMap увеличилось с 3477 до 145 555 записей, в результате чего общее количество экземпляров scala.collection.mutable.DefaultEntry достигло 145 596.

Имея в своем распоряжении эту информацию, мы начали просеивать JIRA Spark.

СПАРК-17465 описал явление, которое мы наблюдали, и предположил, что оно было исправлено в версиях 1.6.3, 2.0.1, 2.1.0, где была исправлена ​​очистка карт unrollMemoryMap и pendingUnrollMemoryMap.

Запуск нашего приложения со Spark 1.6.3 в течение нескольких дней подтвердил, что проблема действительно решена.

Эпилог

Решение нашей проблемы с производительностью сводилось к обновлению Spark с 1.6.2 до 1.6.3 (искровой бегун Beam для Spark 2.x — WIP).

Мы надеемся, что методы, описанные в этом посте, могут помочь другим, имеющим дело с проблемами производительности аналогичного характера, и вдохновить сообщество поделиться своими историями из окопов.

Кроме того, в рамках нашего сеанса профилирования мы подали и рассмотрели следующие заявки Apache Beam: