Млопс с потоковой обработкой

В мире больших данных все больше и больше компаний открывают для себя потенциал быстрой обработки данных с использованием потоковых технологий. Нам даже не нужно упоминать, насколько полезно машинное обучение, это само собой разумеется.

Обычно этими двумя областями владеют немногие специалисты. Таким образом, интеграция вашей потоковой архитектуры с достижениями вашей команды специалистов по данным может быть не такой простой.

В этой статье я хотел бы представить один из возможных способов показать, как возможна такая интеграция. Для этого я выбрал Apache Flink, потоковый движок, широко используемый во всем мире, и «MLeap, библиотеку сериализации для моделей машинного обучения для различных программ. языки. Дополнительную информацию о MLeap и о том, как создать модель на Python и запустить ее на Scala, можно найти в моей предыдущей статье здесь.

Основная идея этой статьи — объяснить, как модель машинного обучения, обученную на Python, можно использовать внутри конвейера Flink, и предоставить информацию о библиотеке Flink, над которой мы в GetInData работаем. Подробности смотрите здесь.

Мотивация для Flink с MLeap

Зачем обслуживать модели машинного обучения внутри потокового движка? Почему бы просто не использовать какой-нибудь HTTP-сервис, который может предоставить нам прогнозы или прочитать (прогнозированные и сохраненные данные) из БД?

Вот три аргумента:

  • Необходимость сверхнизких задержек — обслуживание прогнозов внутри потоковых заданий происходит намного быстрее, поскольку мы можем исключить сетевые задержки, вызванные подключениями к HTTP-серверу или БД. Не говоря уже о том, что сеть иногда дает сбои.
  • Разделение службы — что, если другие команды изменят полезную нагрузку REST JSON или аналитик данных переименует столбец в базе данных? Хранение моделей внутри заданий Flink — простое решение.
  • Затраты — о них никто не любит говорить, тем не менее они являются одним из основных факторов успеха проектов. Наличие дополнительных HTTP-серверов и БД + конвейеров, которые вычисляют прогнозы и сохраняют их в БД, приводит к затратам. Ваша команда менеджеров не любит затраты, я в этом уверен на 100%!

Конечно, хранение моделей машинного обучения внутри Jobs создает и другие проблемы, например:

  • Как загрузить модель?
  • Как вы делаете прогнозы?
  • Моя команда использует Flink SQL. Можно ли делать прогнозы с помощью этого API?

Чтобы ответить на эти вопросы, мы создали библиотеку Flink-MLeap, которую вы можете найти здесь.

Пример использования

Представьте, что вы Data Scientist (или, может быть, вы действительно им являетесь) и хотели бы использовать существующую инфраструктуру Flink для подключения обученной модели машинного обучения и прогнозирования потоковых данных. Вы очень занятый человек и не хотите тратить время на изучение совершенно новых технологий. Все хорошо, вы не ленитесь! Это естественно, никто не может изучить все различные технологии в мире.

Давайте сосредоточимся на том, что вы знаете, и как использовать эти знания. Вы парень/девушка данных, поэтому вы, вероятно, слышали о SQL или, возможно, даже написали тысячи запросов на этом языке. Хороший! Когда вы читали о Flink, вы наткнулись на этот интересный пост в блоге о создании заданий Flink с помощью SQL, например этот здесь.

Подключение к кластеру Flink с помощью SQL Client и написание запросов для прогнозирования потоков должно быть таким простым и естественным, верно?

Применение

С нашей библиотекой вы можете легко обслуживать модели машинного обучения в потоковой среде:

Итак, сначала мы создаем поток с функциями, используя SQL. Для этого мы используем коннектор генерации данных, который является вспомогательным коннектором в Flink, который генерирует поток со случайными значениями — очень полезно на этапе разработки.

// Create table with features
CREATE TABLE Features (
  feature1 DOUBLE NOT NULL,
  feature2 INT NOT NULL,
  feature3 DOUBLE NOT NULL,
  feature_timestamp TIMESTAMP(3))
WITH ( 
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.feature1.min' = '0.0',
'fields.feature1.max' = '1.0'
)

Затем мы делаем прогнозы на основе этих функций:

// Execute predictions
SELECT
  Predict(feature1, feature2, feature3) as prediction,
  Predictv2(feature1) as prediction2
FROM Features

Как видите, здесь мы используем пользовательскую функцию SQL Predict и Predictv2 в нашем наборе функций. Они могут принимать разное количество аргументов и типов. Имена функций и моделей, которые они используют, можно просто определить в конфигурации.

Ниже вы можете найти больше технических аспектов библиотеки, как мы ее создали и примеры того, как ее использовать и настраивать.

FLINK SQL-API

Мы больше сосредоточились на Flink SQL API и подготовили дополнительные утилиты для этого API, поэтому любой, кто не слишком знаком с Flink или чувствует себя более комфортно, используя SQL, а не Java/Scala, может легко использовать модель машинного обучения в заданиях Flink.

Для этого мы подготовили MLeapUDFRegistry. Основная цель этого реестра — зарегистрировать UDF (определяемые пользователем функции Flink), которые впоследствии можно использовать в запросах SQL. Чтобы добавить свои пользовательские функции, вы можете определить их внутри application.conf следующим образом:

mleap {
 udfRegistry = [
     {
         udfName = "Predict"
         bundlePath = "/mleap-example-1"
         bundleSource = "file"
     },
     {
           udfName = "Predictv2"
         bundlePath = "/mleap-example-2"
         bundleSource = "file"
     }
 ]
}

И запустите MLeapUDFRegistry.registerFromConfig(config, tableEnv) перед выполнением ваших запросов, как мы делали в этих примерах приложений: FlinkSqlWithMLeap.

...
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 val tableEnv = StreamTableEnvironment.create(env)
 
 // Register UDFs basing on config
 val config = ConfigFactory.load()
 MLeapUDFRegistry.registerFromConfig(config, tableEnv)
 
...

Еще одна вещь, которая может быть довольно проблематичной, — это написание определенных UDF для каждой модели ML. Это, конечно, самый простой, но и самый трудоемкий подход. Вот почему мы определили очень общий MLeapUDF, чтобы его можно было легко повторно использовать для любого пакета MLeap.

Благодаря MLeapUDFRegistry и MLeapUDF использование моделей машинного обучения с SQL проходит очень гладко. Просто посмотрите на FlinkSqlWithMLeap приложений. Любой, кто знает SQL и имеет модель машинного обучения, может легко использовать их с Flink.

Приятного аппетита!

Код оживить

Давайте быстро взглянем на код. Мы написали этот проект на Scala. Он содержит два модуля:

  • lib — с библиотечными классами
  • пример — с примерами использования.

В модуле библиотеки мы рассмотрели два API Flink: потоковую передачу и SQL, чтобы их можно было повторно использовать в любой работе.

Загрузка моделей машинного обучения

Чтобы загрузить модель, нам нужно сначала ее создать. Я буду повторно использовать модели, представленные в предыдущей статье.

Регрессоры случайного леса принимают одно число с плавающей запятой в качестве входных данных и дают одно число с плавающей запятой в качестве вывода.

Чтобы загрузить модели в задания Flink, мы создали файл BundleLoaders. Один из них — FileBundleLoader, который загружает пакеты из локальных файлов. Другой — GCSBundleLoader, который может извлекать модели из Google Cloud Storage Bucket и использовать их в заданиях Flink.

Потоковое API

В нашей библиотеке мы больше фокусируемся на примерах SQL, потому что аудитория этой функции больше. Я считаю, что больше специалистов по данным знают SQL, чем Java. Сказав это, Streaming API был хорошей отправной точкой для проверки возможности запуска заданий с моделями MLeap.

В MleapMapFunction мы представили способ использования пакетов MLeap. Загружаем модель открытым методом.

case class MleapMapFunction(bundleName: String, bundleLoader: BundleLoader) extends
  RichMapFunction[Double, Double] {
 
  private val LOG = LoggerFactory.getLogger(classOf[MleapMapFunction])
  @transient var transformer: Transformer = _
 
  override def open(parameters: Configuration): Unit = {
 transformer = bundleLoader.loadBundle(bundleName) match {
   case Failure(exception) => {
     LOG.error(s"Error while loading bundle: $bundleName", exception)
     throw BundleLoadProblem(exception)
   }
   case Success(value) => value
 }
  }
 
  override def map(value: Double): Double = {
 val dataset = Seq(Row(DenseTensor(Array(value), List(1))))
 val frame = DefaultLeapFrame(transformer.inputSchema, dataset)
 val res = transformer.transform(frame).get.dataset.head(1).asInstanceOf[Double]
 res
  }
}

Затем в методе карты мы делаем прогнозы. Как видите, это было очень простое решение.

Чтобы проверить это, мы реализовали простую работу Fink FlinkDatastreamWithMleap:

object FlinkDatastreamWithMleap {
  def main(args: Array[String]): Unit = {
 
 implicit val typeInfo = TypeInformation.of(classOf[StructType])
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 
 val rand: Random = new Random()
 
 val text = env.fromElements(rand.nextDouble(), rand.nextDouble(), rand.nextDouble())
 val bundlePath = getClass.getResource("/mleap-example-1").toString
 
 text.map(MleapMapFunction(bundlePath, FileBundleLoader)).print()
 
 env.execute()
  }
}

Никогда не переставай улучшаться

Нет такой системы/библиотеки, которую нельзя было бы обогатить или улучшить. То же самое и с этой библиотекой. Основные моменты, которые мы хотели бы улучшить:

  • протестировать нашу общую UDF с более сложными моделями ML,
  • подготовьте примеры того, как использовать его с Kubernetes,
  • добавить поддержку других сериализаторов моделей машинного обучения, подобных MLeap, таких как: PMML, что обеспечит поддержку большего количества библиотек машинного обучения.

Если вы заинтересованы в какой-либо из этих функций, сообщите нам об этом. Делая это, мы будем знать, что более полезно для вас, и сможем лучше расставить приоритеты в нашей работе! Спасибо за прочтение. Не забудьте проверить другие наши записи в блоге, такие как: Онлайн-модель машинного обучения с использованием MLeap и посетить наш github.

Первоначально опубликовано на https://getindata.com.

Автор блога: Bartosz Chodnicki — Senior Software Engineer