Млопс с потоковой обработкой
В мире больших данных все больше и больше компаний открывают для себя потенциал быстрой обработки данных с использованием потоковых технологий. Нам даже не нужно упоминать, насколько полезно машинное обучение, это само собой разумеется.
Обычно этими двумя областями владеют немногие специалисты. Таким образом, интеграция вашей потоковой архитектуры с достижениями вашей команды специалистов по данным может быть не такой простой.
В этой статье я хотел бы представить один из возможных способов показать, как возможна такая интеграция. Для этого я выбрал Apache Flink, потоковый движок, широко используемый во всем мире, и «MLeap, библиотеку сериализации для моделей машинного обучения для различных программ. языки. Дополнительную информацию о MLeap и о том, как создать модель на Python и запустить ее на Scala, можно найти в моей предыдущей статье здесь.
Основная идея этой статьи — объяснить, как модель машинного обучения, обученную на Python, можно использовать внутри конвейера Flink, и предоставить информацию о библиотеке Flink, над которой мы в GetInData работаем. Подробности смотрите здесь.
Мотивация для Flink с MLeap
Зачем обслуживать модели машинного обучения внутри потокового движка? Почему бы просто не использовать какой-нибудь HTTP-сервис, который может предоставить нам прогнозы или прочитать (прогнозированные и сохраненные данные) из БД?
Вот три аргумента:
- Необходимость сверхнизких задержек — обслуживание прогнозов внутри потоковых заданий происходит намного быстрее, поскольку мы можем исключить сетевые задержки, вызванные подключениями к HTTP-серверу или БД. Не говоря уже о том, что сеть иногда дает сбои.
- Разделение службы — что, если другие команды изменят полезную нагрузку REST JSON или аналитик данных переименует столбец в базе данных? Хранение моделей внутри заданий Flink — простое решение.
- Затраты — о них никто не любит говорить, тем не менее они являются одним из основных факторов успеха проектов. Наличие дополнительных HTTP-серверов и БД + конвейеров, которые вычисляют прогнозы и сохраняют их в БД, приводит к затратам. Ваша команда менеджеров не любит затраты, я в этом уверен на 100%!
Конечно, хранение моделей машинного обучения внутри Jobs создает и другие проблемы, например:
- Как загрузить модель?
- Как вы делаете прогнозы?
- Моя команда использует Flink SQL. Можно ли делать прогнозы с помощью этого API?
Чтобы ответить на эти вопросы, мы создали библиотеку Flink-MLeap, которую вы можете найти здесь.
Пример использования
Представьте, что вы Data Scientist (или, может быть, вы действительно им являетесь) и хотели бы использовать существующую инфраструктуру Flink для подключения обученной модели машинного обучения и прогнозирования потоковых данных. Вы очень занятый человек и не хотите тратить время на изучение совершенно новых технологий. Все хорошо, вы не ленитесь! Это естественно, никто не может изучить все различные технологии в мире.
Давайте сосредоточимся на том, что вы знаете, и как использовать эти знания. Вы парень/девушка данных, поэтому вы, вероятно, слышали о SQL или, возможно, даже написали тысячи запросов на этом языке. Хороший! Когда вы читали о Flink, вы наткнулись на этот интересный пост в блоге о создании заданий Flink с помощью SQL, например этот здесь.
Подключение к кластеру Flink с помощью SQL Client и написание запросов для прогнозирования потоков должно быть таким простым и естественным, верно?
Применение
С нашей библиотекой вы можете легко обслуживать модели машинного обучения в потоковой среде:
Итак, сначала мы создаем поток с функциями, используя SQL. Для этого мы используем коннектор генерации данных, который является вспомогательным коннектором в Flink, который генерирует поток со случайными значениями — очень полезно на этапе разработки.
// Create table with features CREATE TABLE Features ( feature1 DOUBLE NOT NULL, feature2 INT NOT NULL, feature3 DOUBLE NOT NULL, feature_timestamp TIMESTAMP(3)) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10', 'fields.feature1.min' = '0.0', 'fields.feature1.max' = '1.0' )
Затем мы делаем прогнозы на основе этих функций:
// Execute predictions SELECT Predict(feature1, feature2, feature3) as prediction, Predictv2(feature1) as prediction2 FROM Features
Как видите, здесь мы используем пользовательскую функцию SQL Predict и Predictv2 в нашем наборе функций. Они могут принимать разное количество аргументов и типов. Имена функций и моделей, которые они используют, можно просто определить в конфигурации.
Ниже вы можете найти больше технических аспектов библиотеки, как мы ее создали и примеры того, как ее использовать и настраивать.
FLINK SQL-API
Мы больше сосредоточились на Flink SQL API и подготовили дополнительные утилиты для этого API, поэтому любой, кто не слишком знаком с Flink или чувствует себя более комфортно, используя SQL, а не Java/Scala, может легко использовать модель машинного обучения в заданиях Flink.
Для этого мы подготовили MLeapUDFRegistry
. Основная цель этого реестра — зарегистрировать UDF (определяемые пользователем функции Flink), которые впоследствии можно использовать в запросах SQL. Чтобы добавить свои пользовательские функции, вы можете определить их внутри application.conf следующим образом:
mleap { udfRegistry = [ { udfName = "Predict" bundlePath = "/mleap-example-1" bundleSource = "file" }, { udfName = "Predictv2" bundlePath = "/mleap-example-2" bundleSource = "file" } ] }
И запустите MLeapUDFRegistry.registerFromConfig(config, tableEnv) перед выполнением ваших запросов, как мы делали в этих примерах приложений: FlinkSqlWithMLeap
.
... val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // Register UDFs basing on config val config = ConfigFactory.load() MLeapUDFRegistry.registerFromConfig(config, tableEnv) ...
Еще одна вещь, которая может быть довольно проблематичной, — это написание определенных UDF для каждой модели ML. Это, конечно, самый простой, но и самый трудоемкий подход. Вот почему мы определили очень общий MLeapUDF
, чтобы его можно было легко повторно использовать для любого пакета MLeap.
Благодаря MLeapUDFRegistry
и MLeapUDF
использование моделей машинного обучения с SQL проходит очень гладко. Просто посмотрите на FlinkSqlWithMLeap
приложений. Любой, кто знает SQL и имеет модель машинного обучения, может легко использовать их с Flink.
Приятного аппетита!
Код оживить
Давайте быстро взглянем на код. Мы написали этот проект на Scala. Он содержит два модуля:
- lib — с библиотечными классами
- пример — с примерами использования.
В модуле библиотеки мы рассмотрели два API Flink: потоковую передачу и SQL, чтобы их можно было повторно использовать в любой работе.
Загрузка моделей машинного обучения
Чтобы загрузить модель, нам нужно сначала ее создать. Я буду повторно использовать модели, представленные в предыдущей статье.
Регрессоры случайного леса принимают одно число с плавающей запятой в качестве входных данных и дают одно число с плавающей запятой в качестве вывода.
Чтобы загрузить модели в задания Flink, мы создали файл BundleLoaders
. Один из них — FileBundleLoader
, который загружает пакеты из локальных файлов. Другой — GCSBundleLoader
, который может извлекать модели из Google Cloud Storage Bucket и использовать их в заданиях Flink.
Потоковое API
В нашей библиотеке мы больше фокусируемся на примерах SQL, потому что аудитория этой функции больше. Я считаю, что больше специалистов по данным знают SQL, чем Java. Сказав это, Streaming API был хорошей отправной точкой для проверки возможности запуска заданий с моделями MLeap.
В MleapMapFunction
мы представили способ использования пакетов MLeap. Загружаем модель открытым методом.
case class MleapMapFunction(bundleName: String, bundleLoader: BundleLoader) extends RichMapFunction[Double, Double] { private val LOG = LoggerFactory.getLogger(classOf[MleapMapFunction]) @transient var transformer: Transformer = _ override def open(parameters: Configuration): Unit = { transformer = bundleLoader.loadBundle(bundleName) match { case Failure(exception) => { LOG.error(s"Error while loading bundle: $bundleName", exception) throw BundleLoadProblem(exception) } case Success(value) => value } } override def map(value: Double): Double = { val dataset = Seq(Row(DenseTensor(Array(value), List(1)))) val frame = DefaultLeapFrame(transformer.inputSchema, dataset) val res = transformer.transform(frame).get.dataset.head(1).asInstanceOf[Double] res } }
Затем в методе карты мы делаем прогнозы. Как видите, это было очень простое решение.
Чтобы проверить это, мы реализовали простую работу Fink FlinkDatastreamWithMleap
:
object FlinkDatastreamWithMleap { def main(args: Array[String]): Unit = { implicit val typeInfo = TypeInformation.of(classOf[StructType]) val env = StreamExecutionEnvironment.getExecutionEnvironment val rand: Random = new Random() val text = env.fromElements(rand.nextDouble(), rand.nextDouble(), rand.nextDouble()) val bundlePath = getClass.getResource("/mleap-example-1").toString text.map(MleapMapFunction(bundlePath, FileBundleLoader)).print() env.execute() } }
Никогда не переставай улучшаться
Нет такой системы/библиотеки, которую нельзя было бы обогатить или улучшить. То же самое и с этой библиотекой. Основные моменты, которые мы хотели бы улучшить:
- протестировать нашу общую UDF с более сложными моделями ML,
- подготовьте примеры того, как использовать его с Kubernetes,
- добавить поддержку других сериализаторов моделей машинного обучения, подобных MLeap, таких как: PMML, что обеспечит поддержку большего количества библиотек машинного обучения.
Если вы заинтересованы в какой-либо из этих функций, сообщите нам об этом. Делая это, мы будем знать, что более полезно для вас, и сможем лучше расставить приоритеты в нашей работе! Спасибо за прочтение. Не забудьте проверить другие наши записи в блоге, такие как: Онлайн-модель машинного обучения с использованием MLeap и посетить наш github.
Первоначально опубликовано на https://getindata.com.
Автор блога: Bartosz Chodnicki — Senior Software Engineer