Это первый из двух постов, в которых мы проиллюстрируем, как можно использовать ряд инструментов (в основном Kafka и MLFlow), чтобы помочь в разработке ML. С этой целью мы создадим простой сценарий, который, как мы надеемся, будет похож на некоторые реальные варианты использования, а затем опишем потенциальное решение. Репо-компаньон со всем кодом можно найти здесь.

Сценарий

Компания собирает данные с помощью ряда сервисов, которые генерируют события, когда пользователи / клиенты взаимодействуют с веб-сайтом или приложением компании. По мере того, как происходит такое взаимодействие, алгоритм должен работать в реальном времени, и на основе результатов (или прогнозов) алгоритма должны быть предприняты некоторые немедленные действия. Кроме того, после N взаимодействий (или наблюдений) алгоритм необходимо переобучить, без остановки службы прогноз , , поскольку пользователи будут продолжать взаимодействовать.

В данном упражнении мы использовали набор данных Взрослый, цель которого - предсказать, получают ли люди доход выше / ниже 50 тыс. В зависимости от их возраста, родной страны и т. Д. Чтобы адаптировать этот набор данных к сценарию, описанному ранее, один Можно предположить, что данные о возрасте, родной стране и т. д. собираются с помощью онлайн-анкеты / формы, и нам нужно предсказать, имеют ли пользователи высокий / низкий доход в режиме реального времени. Если высокий доход, то мы сразу звоним / пишем им по электронной почте, например, с каким-то предложением. Затем, после N новых наблюдений, мы переобучаем алгоритм, продолжая прогнозировать новых пользователей.

Решение

На рисунке 1 показано возможное решение. Для реализации этого решения мы использовали Kafka-Python (хороший учебник можно найти здесь) вместе с LightGBM и Hyperopt или HyperparameterHunter.

Единственный посторонний Python, который мы будем использовать в этом упражнении, - это Apache-Kafka (мы будем использовать python API Kafka-Python, но, тем не менее, Kafka должен быть установлен в вашей системе). Если у вас Mac, просто используйте Homebrew:

brew install kafka

который также установит зависимость zookeeper.

Как упоминалось ранее, мы использовали набор данных для взрослых. Это потому, что мы намерены проиллюстрировать потенциальный конвейер машинного обучения и предоставить полезный код, сохраняя его относительно простым. Однако обратите внимание, что описанный здесь конвейер, в принципе, не зависит от данных. Конечно, предварительная обработка будет меняться в зависимости от характера данных, но компоненты конвейера останутся похожими, если не идентичными.

Инициализация эксперимента

Код, использованный для этого поста (и не только), можно найти в нашем репо. Там есть скрипт с именем initialize.py. Этот скрипт загрузит набор данных, установит структуру каталога, предварительно обработает данные, обучит исходную модель на обучающем наборе данных и оптимизирует гиперпараметры этой модели. В реальном мире это будет соответствовать обычной фазе экспериментов и процессу обучения исходного алгоритма в автономном режиме.

В этом посте мы хотим больше сосредоточиться на конвейере и соответствующих компонентах, чем на машинном обучении. Тем не менее, позвольте нам кратко упомянуть инструменты машинного обучения, которые мы используем в этом упражнении.

Предварительная обработка данных довольно проста, учитывая набор данных, который мы используем. Мы закодировали настраиваемый класс под названием FeatureTools, который можно найти в модуле utils в репозитории. Этот класс имеет методы .fit и .transform, которые будут нормализовать / масштабировать числовые признаки, кодировать категориальные признаки и генерировать то, что мы называем «перекрещенными столбцами», которые являются результатом декартова произведения между двумя (или более) категориальными признаками.

После обработки данных мы используем LightGBM для соответствия модели вместе с Hyperopt или HyperparameterHunter для выполнения оптимизации гиперпараметров. Код, связанный с этой задачей, можно найти в модуле train, где находятся два скрипта train_hyperop.py и train_hyperparameterhunter.py.

Мы могли бы написать отдельный пост, сравнивающий пакеты оптимизации гиперпараметров в python (Skopt, Hyperopt и HyperparameterHunder), но пока знайте: если вам нужна скорость, используйте Hyperopt. Если вас не беспокоит скорость и вы хотите подробно отслеживать свою процедуру оптимизации, используйте HyperparameterHunter. По словам создателя пакета Hunter МакГушиона:

«Так долго оптимизация гиперпараметров была настолько трудоемким процессом, что просто указывала вам направление для дальнейшей оптимизации, а затем вам, по сути, пришлось начинать заново».

HyperparameterHunter здесь, чтобы решить эту проблему, и он делает это очень хорошо. В настоящее время пакет построен на основе Skopt, поэтому он заметно медленнее, чем Hyperopt. Тем не менее, я знаю, что предпринимаются попытки включить Hyperopt в качестве еще одного бэкэнда для HyperparameterHunter. Когда это произойдет, споров не будет, HyperparameterHunter должен быть вашим предпочтительным инструментом.

Тем не менее, если кому-то интересно, я включил в репо блокнот, сравнивающий характеристики Skopt и Hyperopt.

Теперь перейдем к самим конвейерным процессам.

Производитель сообщений приложений

Это должно быть относительно простой иллюстрацией того, как может выглядеть часть производственного конвейера. Поэтому мы напрямую используем набор данных для взрослых для генерации сообщений (объектов JSON).

В реальном мире можно было бы иметь несколько сервисов, которые будут генерировать события. Оттуда у вас есть несколько вариантов. Информация об этих событиях может храниться в базе данных, а затем агрегироваться с помощью ваших обычных запросов. Оттуда служба Kafka будет публиковать сообщения в конвейере. В качестве альтернативы, вся информация в этих событиях может быть напрямую опубликована в разных темах, и «служба агрегирования» может хранить всю информацию в одном сообщении, которое затем будет опубликовано в конвейере (конечно, можно также иметь комбинацию два).

Например, пользователям может быть разрешено регистрироваться через Facebook или Google, собирая свои имена и адреса электронной почты. Затем их могут попросить заполнить анкету, и мы продолжаем собирать события по мере их развития. В какой-то момент в процессе все эти события будут объединены в одно сообщение, а затем опубликованы с помощью производителя Kafka. Конвейер в этом посте начнется с того места, где собрана вся необходимая информация. Наши сообщения здесь - это отдельные наблюдения в наборе данных для взрослых. Ниже мы приводим пример содержания наших сообщений:

’{“age”:25,”workclass”:”Private”,”fnlwgt”:226802,”education”:”11th”,”marital_status”:”Never-married”,”occupation”:”Machine-op-inspct”,”relationship”:”Own-child”,”race”:”Black”,”gender”:”Male”,”capital_gain”:0,”capital_loss”:0,”hours_per_week”:40,”native_country”:”United-States”,”income_bracket”:”<=50K.”}’

Ядром Приложения / Сервиса (серый крайний левый прямоугольник на Рисунке 1) является приведенный ниже фрагмент:

Обратите внимание, что мы используем набор данных тестирования для создания сообщений. Это потому, что мы разработали сценарий, который пытается максимально походить на реальный мир (в определенных пределах). Имея это в виду, мы использовали набор обучающих данных для создания начальных model и dataprocessor объектов. Затем мы используем тестовый набор данных для создания сообщений с целью моделирования процесса получения новой информации во времени.

Что касается приведенного выше фрагмента, просто производитель опубликует сообщения в конвейере (start_producing()) и будет потреблять сообщения с окончательным прогнозом (start_consuming()). Так же, как описываемый здесь конвейер не включает в себя самое начало процесса (сбор и агрегацию событий), мы также пропускаем самый конец, то есть что делать с окончательным прогнозом. Тем не менее, мы кратко обсудим некоторые варианты использования, в которых этот конвейер может быть полезен, ближе к концу поста, который проиллюстрирует этот заключительный этап.

На самом деле, помимо игнорирования самого начала и конца процесса, мы считаем, что этот конвейер достаточно хорошо похож на тот, который можно было бы использовать в реальном мире. Как следствие, мы надеемся, что код, включенный в наше репо, будет полезен для некоторых ваших проектов.

Предиктор и инструктор

Основная цель этой реализации - запускать алгоритм в реальном времени и переобучать его каждые N наблюдений, без остановки службы прогнозирования. С этой целью мы реализовали два компонента: Predictor (predictor.py в репозитории) и Trainer (trainer.py).

Теперь давайте по порядку опишем числа, показанные на рисунке 1, используя фрагменты кода в качестве руководства. Обратите внимание, что в приведенном ниже процессе предполагается, что был запущен сценарий initialize.py, поэтому в соответствующем каталоге существуют начальные файлы model.p и dataprocessor.p. Также обратите внимание, что приведенный ниже код составляет ядро ​​Predictor и Trainer. Полный код смотрите в репо.

Предиктор

Ядро кода Predictor показано ниже.

(1a) Строка 12 во фрагменте predictor.py. Предиктор получит сообщение от приложения / службы, будет выполнять обработку данных и запускать модель в реальном времени по мере получения сообщений. Все это происходит с использованием существующих объектов dataprocessor и model в функции predict.

(1b) Строка 13 в predictor.pysnippet. Как только мы выполним прогноз, Predictor опубликует результат (publish_prediction()), который в конечном итоге будет получен приложением / службой.

(2) Строки 17–20 в predictor.pysnippet. Каждые RETRAIN_EVERY messages, Predictor будет публиковать сообщение «retrain» (send_retrain_message()) для чтения обучающим.

Тренер

(3) Строка 12 во фрагменте trainer.py. Инструктор прочитает сообщение и запустит процесс переобучения с новым накопленным набором данных (train()). Это исходный набор данных плюс RETRAIN_EVERYновые наблюдения. Функция train выполнит весь процесс, описанный в разделе «Инициализация эксперимента» , независимо от процессов, описанных в 1a и 1b. . Другими словами, Trainer переобучит модель, в то время как Predictor будет обслуживать прогнозы по мере поступления сообщений.

На этом этапе стоит упомянуть, что здесь мы обнаруживаем еще одно различие между нашей реализацией и той, которая будет использоваться в реальном мире. В нашей реализации можно переобучить алгоритм, как только будет обработано RETRAIN_EVERY число наблюдений. Это связано с тем, что мы используем набор данных тестирования для взрослых для создания сообщений, который включает целевой столбец («доход_бракет»). На самом деле, реальный результат действий, предпринятых на основе результатов работы алгоритма, обычно не будет доступен сразу после запуска алгоритма, а будет доступен через некоторое время. В этом сценарии другой процесс должен собирать истинный результат, и, как только количество собранных истинных результатов станет равным RETRAIN_EVERY, алгоритм будет переобучен.

Например, предположим, что этот конвейер реализует систему рекомендаций в режиме реального времени для электронной коммерции. Мы обучили алгоритм рекомендаций в автономном режиме, и целевой столбец является категориальным представлением того, насколько нашим пользователям нравятся наши рекомендации: 0, 1, 2 и 3 для пользователей, которым не понравился элемент или которые взаимодействовали с ним, понравился элемент (например, нажали нравится кнопка), добавили товар в свою корзину и купили товар соответственно. К тому времени, когда система выполнит рекомендации, мы все еще не знаем, что в конечном итоге сделает пользователь.

Следовательно, наряду с процессом сбора и хранения информации о пользователях, когда они перемещаются по веб-сайту (или приложению), второй процесс должен собирать окончательный результат нашей рекомендации. Только когда оба процесса соберут RETRAIN_EVERY сообщения и результаты, алгоритм будет переобучен.

(4) Строка 13 во фрагменте trainer.py. После завершения переобучения будет опубликовано сообщение с соответствующей информацией (published_training_completed()).

(5) Строки 5–8 во фрагменте predictor.py. Потребитель Predictor подписан на две темы: [‘app_messages’, ‘retrain_topic’]. Как только он получит информацию о том, что переподготовка завершена через «retrain_topic», он загрузит новую модель и продолжит процесс как обычно, без остановки в любой момент во время процесса.

Как запустить трубопровод

В сопутствующий репозиторий мы включили инструкции по запуску конвейера (локально). На самом деле довольно просто.

  1. Запускаем zookeper и kafka:
$ brew services start zookeeper
==> Successfully started `zookeeper` (label: homebrew.mxcl.zookeeper)
$ brew services start kafka
==> Successfully started `kafka` (label: homebrew.mxcl.kafka)

2. Запустите initialize.py:

python initialize.py

3. В Терминале №1 запустите Predictor (или Trainer):

python predictor.py

4. В Терминале №2 запустите трейнер (или предсказатель):

python trainer.py

5. В Терминале №3 запустите пример приложения.

python samplea_app.py

Затем, когда будет обработано N сообщений, вы должны увидеть что-то вроде этого:

Верхний правый терминал: мы переобучили модель, и Hyperopt выполнила 10 оценок (на практике их должно быть несколько сотен). Верхний левый терминал: после переобучения и оптимизации модели мы видим, как предсказатель загрузил новую модель (после раздражающего предупреждающего сообщения от новой версии LightGBM). Нижний терминал: обслуживание продолжается в обычном режиме.

Некоторые возможные варианты использования

Вот несколько возможных вариантов использования среди (многих) других.

  1. Адаптация онлайн-поездок в режиме реального времени

Давайте рассмотрим электронную торговлю, где продаются некоторые товары. По мере того, как пользователи перемещаются по веб-сайту, мы собираем события с информацией об их действиях. Мы уже обучили алгоритм и знаем, что после, скажем, 10 взаимодействий, мы можем узнать, купит ли клиент нашу продукцию. Более того, мы также знаем, что продукты, которые они потенциально купят, могут быть дорогими. Поэтому мы хотели бы настроить их путешествие «на ходу», чтобы облегчить им совершение покупок. Настройка здесь может означать что угодно, от сокращения пути до изменения макета страницы.

2. Электронная почта / звонок вашим клиентам

Как и в предыдущем случае, давайте предположим, что теперь клиент решает прекратить путешествие (скучно, не хватает времени, может быть, слишком сложно и т. Д.). Мы могли бы использовать конвейер, подобный описанному в этом посте, чтобы немедленно или с контролируемой задержкой отправить электронное письмо или позвонить им, если алгоритм предсказывает, что этот клиент имеет большой потенциал.

Дальнейшие действия

  1. Журналы и мониторинг: в следующем посте мы добавим в конвейер функции журналирования и мониторинга через MLFlow. Вместе с HyperparameterHunter это решение будет автоматически полностью отслеживать производительность модели и оптимизацию гиперпараметров, предлагая при этом визуальный мониторинг.
  2. Управление потоком: описанное здесь решение и соответствующий код разработаны таким образом, чтобы его можно было легко запускать на портативном компьютере локально. Однако можно было бы предположить, что в реальной жизни это должно будет запускать облако в нужном масштабе. Эта конкретная часть не будет рассмотрена в двух публикациях. Тем не менее, я могу заверить вас, что перемещение описанной здесь структуры в AWS (например) и ее автоматический запуск по вашему усмотрению (с использованием EC2, S3 и любого внешнего интерфейса, который должен быть для вашего конкретного случая) не представляет особой сложности.

Любые вопросы / предложения, пожалуйста, пишите: [email protected]