Учебное пособие по созданию приложения для отслеживания активности в стиле Fitbit с помощью Redpanda и Materialize.

Данные, генерируемые устройствами Интернета вещей (IoT), считаются большими данными. Сбор и обработка данных IoT были сложными из-за классических «3V» больших данных: объем, скорость и достоверность, что требовало использования аналитических систем IoT для получения сложной инфраструктуры хранения и обработки данных.

При рассмотрении инструментов для обработки данных IoT в режиме реального времени хорошей комбинацией для совместного использования являются Redpanda и Materialize. Redpanda — это платформа потоковой передачи данных, API-совместимая с экосистемой Apache Kafka®. Он специально создан для обеспечения скорости, точности и безопасности потоковой передачи данных. Он позволяет принимать большие объемы данных IoT с высокой пропускной способностью и обеспечивает масштабируемое и отказоустойчивое хранилище. Materialise — это потоковая база данных, которая может принимать потоки данных в реальном времени из Redpanda и делать их доступными для запросов в течение нескольких секунд, позволяя разработчикам выполнять запросы онлайн-аналитической обработки (OLAP) к ним.

Комбинируя Redpanda и Materialise, разработчики могут создавать приложения реального времени, которые принимают и обрабатывают данные IoT в масштабе. Сюда входят варианты использования, такие как информационные панели в реальном времени, обнаружение аномалий и микросервисы, управляемые событиями.

Хотя мы будем обсуждать данные Materialise, Redpanda и Fitbit в этом блоге, стоит также отметить, что аналогичные потоковые приложения IoT могут быть созданы с использованием этих инструментов с Apache Pinot®, FastAPI и Raspberry Pi.

Пример использования: интеграция Redpanda и Materialise для потоковой передачи данных IoT.

Представьте, что организация хочет проводить ежедневные соревнования, основанные на количестве калорий, сожженных пользователями, и раздавать ежедневные награды трем лучшим энтузиастам фитнеса в таблице лидеров. Поскольку пользователи могут тренироваться в любое время, данные, поступающие через трекер Fitbit любого пользователя, должны обрабатываться немедленно, чтобы таблица лидеров обновлялась в режиме реального времени. Используя вместе Redpanda и Materialize в нашем стеке приложений, мы можем получать представления данных в реальном времени, чтобы делиться ими с пользователями.

В этом руководстве мы используем пример приложения-трекера Fitbit для потоковой передачи исходных данных с помощью Redpanda и передачи их в представления в памяти с помощью Materialize. (Этот сценарий предназначен только для демонстрационных целей и может не обязательно отражать типичный вариант использования.)

Давайте теперь посмотрим на предварительные условия, чтобы начать.

Предпосылки

Образец кода

Вы можете найти код для создания демонстрационного приложения для отслеживания активности Fitbit в этом репозитории GitHub. Пожалуйста, следуйте инструкциям README, чтобы запустить код в вашей среде.

Красная панда

Вы можете скачать бинарный файл Redpanda с GitHub здесь. В этом документе мы будем работать в среде Linux (Ubuntu20). Чтобы увидеть другие варианты установки, просмотрите документацию здесь.

Примечание. Если вы работаете в среде Mac/Docker, запишите IP-адреса брокеров, которые вы сможете найти после установки. Это будет похоже на изображение ниже.

PostgreSQL

Поскольку Materialize использует psql в качестве CLI, вам необходимо установить PostgreSQL.

Материализовать

Скачайте и установите Материализировать.

Версия Python ›= 3.7

Для генерации данных и моделирования наших потоков данных мы будем использовать Python. Для этого нам нужна библиотека Kafka для передачи данных в наши темы. Библиотека Kafka поддерживает только версию Python ›=3.7.

Построение схемы темы трекера

Мы используем схему трекера активности Fitbit. В Redpanda данные считываются из нескольких источников, а затем распределяются по темам. Мы будем использовать две темы для этого примера.

Первый из них называется fitbit_activity_tracker и показывает различные действия, созданные в определенный день в 2022 году. Он записывает шаги, количество съеденного, сожженные калории, минуты активности и минуты сидячего образа жизни. Помимо этого, он также включает в себя пару полей идентификатора, называемых owner_id и activity_id.

Вторая тема, названная fitbit_owner, содержит имя владельца, а также owner_id и date. Значения owner_id и date между двумя темами одинаковы, что формирует отношение псевдовнешнего ключа между парой. Вот простая схема-схема тем:

Разработка кода производителя приложения

Логику производителя для нашего приложения можно найти в файле redpanda_producer.py. Запуск кода производителя выполнит следующее:

  • Создайте темы fitbit_activity_tracker и fitbit_owner
  • Вставляйте одну запись в fitbit_owner и от одной до пяти записей в fitbit_activity_tracker каждые 0,5 секунды. Вставка продолжается до тех пор, пока в fitbit_owner не будет 100000 записей.
  • Записи, отправляемые производителем, имеют тип JSON.

После успешного запуска сценария вы должны увидеть сообщения, созданные в темах. Вы также можете проверить вывод отдельных производителей с помощью инструмента командной строки Redpanda, rpk. Выполните следующую команду:

rpk topic consume fitbit_activity_tracker

Примечание. Если вы используете Redpanda в среде Mac/Docker, вам нужно будет настроить команду, добавив флаг --brokers. Таким образом, новая команда будет:

rpk topic consume fitbit_activity_tracker --brokers <IP:PORT>

Вы увидите вывод ниже. Данные, вставленные сценарием, будут находиться под полем value каждого сообщения.

Мы можем просмотреть данные в другой теме, запустив:

rpk topic consume fitbit_owner

Создание панели мониторинга в реальном времени с помощью Materialize

Теперь, когда мы настроили наши потоки данных, давайте добавим Materialize в картину.

Чтобы начать использовать Materialize, нам нужно создать пару источников Materialize из тем. Источники — это подключения к данным, которые вы хотите использовать в Materialise. В этом сценарии мы бы хотели, чтобы Materialize указывал на темы, которые постоянно генерируют ввод/вывод. Прежде чем запускать какие-либо команды, связанные с Materialise, мы должны сначала войти в экземпляр, на котором работает Materialize. Для этого запустите эту команду:

psql -U materialize -h localhost -p 6875 materialize

После входа в экземпляр мы можем создать источники.

Настройка источников данных:

Команда для создания источника по теме fitbit_activity_tracker:

CREATE source fitbit_activity_src 
FROM kafka broker '<YOUR BROKER HOST:PORT>'
topic 'fitbit_activity_tracker'
format text;

Команда для создания источника по теме fitbit_owner:

CREATE source fitbit_owner_src 
FROM kafka broker '<YOUR BROKER HOST:PORT>' 
topic 'fitbit_owner' 
format text;

После успешного выполнения вы должны увидеть следующее:

Настройка определений представлений:

Далее мы создаем два представления для каждой темы. Представление представляет собой запрос, который вы хотите выполнить. Это также сокращает длинные запросы, которые может быть неудобно вводить несколько раз.

Здесь мы создадим два представления, каждое из которых будет представлять все столбцы в темах. Поскольку мы отправляем записи в формате JSON, мы будем декодировать каждый ключ и хранить его в виде столбца. Чтобы создать представления, выполните следующие команды:

CREATE VIEW fitbit_activity_view AS
SELECT ((text::jsonb)->>'date')::          date    AS date,
       ((text::jsonb)->>'total_steps')::   int     AS total_steps,
       ((text::jsonb)->>'exercise')::      varchar AS exercise,
       ((text::jsonb)->>'calories_burnt')::int     AS calories_burnt,
       ((text::jsonb)->>'active_mins')::   int     AS active_mins,
       ((text::jsonb)->>'sedentary_mins')::int     AS sedentary_mins,
       ((text::jsonb)->>'owner_id')::int           AS owner_id,
       ((text::jsonb)->>'activity_id')::int        AS activity_id
FROM   fitbit_activity_src;

Затем запустите:

CREATE VIEW fitbit_owner_view AS
SELECT ((text::jsonb)->>'owner_name')::   varchar AS owner_name,
       ((text::jsonb)->>'owner_id')::   int AS owner_id,
       ((text::jsonb)->>'date')::   date AS date
FROM   fitbit_owner_src;

Вы увидите это после успешного выполнения:

Построение материализованных представлений:

Теперь мы можем создать два материализованных представления на основе двух обычных представлений. Материализованные представления действуют как традиционные представления SQL, но разница в том, что, в отличие от представления SQL, они постоянно обновляют результаты внутреннего SQL-запроса. С помощью материализованных представлений мы можем получать обновления новых данных из памяти в режиме реального времени, что обеспечивает вывод с невероятно низкими задержками. Вы даже можете выполнять более сложные операции, такие как объединение, агрегирование и т. д., на этих скоростях.

Теперь мы создадим пару материализованных представлений.

Представление для расчета среднего количества сожженных калорий за упражнение get_avg_calories_burnt_materialized_view:

CREATE materialized VIEW get_avg_calories_burnt_materialized_view AS 
SELECT exercise,
         Avg(calories_burnt):: numeric(10, 0) AS average_calories
FROM     fitbit_activity_view
GROUP BY exercise;

Чтобы проверить результат, выполните:

SELECT *
FROM   get_avg_calories_burnt_materialized_view;

Поскольку мы загружаем данные каждые 0,5 секунды, мы получим разные значения, если подождем некоторое время и перезапустим запрос.

Альтернативой этому может быть использование команды, с помощью которой вы можете получить последовательность обновлений, влияющих на результаты с течением времени. Для этого мы можем использовать следующую команду:

COPY (TAIL get_avg_calories_burnt_materialized_view) TO stdout;

Эта команда дает непрерывный поток обновлений в представлении. Чтобы выйти из потока, нажмите CTRL + C.

Как видно из приведенного выше вывода, запись столбца -1 указывает на изменение значения.

Бегуны Fitbit, преодолевающие более 9500 шагов:

Представление для расчета бегунов, превышающих 9500 шагов: get_total_steps_of_each_owner

Materialize поддерживает все типы соединений SQL, ожидаемые от традиционной реляционной базы данных. Для этого представления мы будем использовать это. Следующая команда представляет собой представление:

CREATE materialized VIEW get_total_steps_of_each_owner AS
SELECT s.owner_name       AS owner,
       d.date        AS DATE,
       d.total_steps AS steps
FROM   fitbit_activity_view d
JOIN   fitbit_owner_view s
ON     s.owner_id = d.owner_id
AND    d.total_steps > 9500
AND    d.exercise='jogging';

В этом представлении отображается список fitbit_owners людей, пробежавших трусцой и набравших более 9500 шагов.

Чтобы просмотреть вывод этого представления, запустите:

SELECT *
FROM   get_total_steps_of_each_owner;

Опять же, запустив тот же запрос через некоторое время, мы увидим изменение значений.

Интеграция Redpanda с Materialise позволяет в режиме реального времени анализировать ваши потоки данных. Обычно панель инструментов используется для визуализации данных, и вы можете использовать инструмент бизнес-аналитики, такой как Метабаза, или любой другой инструмент, совместимый с проводной сетью Postgres. Вы можете найти более подробную информацию о поддерживаемых инструментах и ​​интеграциях в Документации по поддерживаемым инструментам Materialise.

Заключение

Хотя сегодняшнее руководство посвящено отслеживанию работоспособности, базовые технологии, обсуждаемые в этой статье, можно использовать для создания информационных панелей в реальном времени в различных отраслях, чтобы обеспечить более быструю, мощную и экономичную аналитику в реальном времени.

Напоминаем, что вы можете получить доступ к образцу кода для этого руководства в любое время в этом репозитории GitHub. Если у вас есть вопросы о совместном использовании Redpanda и Materialise, присоединитесь к Slack сообщества Redpanda, чтобы задать их. Прочтите эти другие руководства по интеграции, чтобы узнать больше о создании собственных приложений для потоковой передачи данных.