Узнайте, как интегрировать Elasticsearch с Redpanda с помощью Kafka Connect и создать приложение для поиска текста в реальном времени.

В 2022 году ежедневно создается более 2,5 квинтиллионов байт данных. По состоянию на 2025 год оценки увеличиваются до 463 экзабайт данных каждый день.

Создавать данные — это хорошо, и иметь возможность хранить их где-то, где они всегда доступны, например, в облаке, — это здорово. Возможность запрашивать эти данные еще лучше. Но что, если вам нужно найти определенную часть данных в таком огромном пуле данных?

Что такое эластичный поиск?

Введите Elasticsearch, распределенную поисковую и аналитическую систему с открытым исходным кодом, построенную на Apache Lucene®. Elasticsearch поддерживает все типы данных, включая текстовые, числовые, геопространственные, структурированные и неструктурированные данные.

Такие компании, как Википедия, GitHub и Facebook, используют Elasticsearch для таких случаев использования, как реализация классической полнотекстовой поисковой системы, хранилища аналитики, автозаполнение, проверка орфографии, системы оповещения и хранилище документов общего назначения.

Elasticsearch становится еще более мощным, когда он интегрирован с потоковой платформой в реальном времени, такой как Redpanda. Эту интеграцию можно легко выполнить с помощью Kafka Connect® и совместимых соединителей, таких как Confluent Elasticsearch Sink Connector® или Camel Elasticsearch Sink Connector®.

Как передавать данные в Elasticsearch

Когда дело доходит до создания конвейеров потоковой передачи в реальном времени, которые питают такие платформы, как Elasticsearch, важно сделать это более производительным способом. Redpanda, совместимая с Apache Kafka® API, представляет собой современную и быструю потоковую платформу, которую разработчики используют для замены Kafka, когда производительность и безопасность являются обязательными требованиями.

Вы можете передавать любые структурированные данные через Redpanda, индексировать их в режиме реального времени в Elasticsearch и позволить Elasticsearch сохранять данные доступными для таких целей, как аналитика, полнотекстовый поиск или интеллектуальное автозаполнение, использующее данные машинного обучения.

В этом уроке вы узнаете, как сделать следующее:

  • Запустите узел Redpanda с помощью Docker
  • Создайте тему в Redpanda, используя ее rpk интерфейс командной строки.
  • Запустите узел Elasticsearch с помощью Docker.
  • Настройте и запустите кластер Kafka Connect для интеграции Redpanda и Elasticsearch.
  • Создавать данные из файла JSON в тему Redpanda с помощью rpk CLI и передавать данные в индекс Elasticsearch.
  • Запускайте запросы полнотекстового поиска для поиска необходимых данных в Elasticsearch.

Все ресурсы кода для этого руководства можно найти в этом репозитории.

Предпосылки

Для начала вам понадобится следующее:

  • Среда macOS или операционная система по вашему выбору. (Это руководство было разработано для среды macOS. Если вы выберете другую ОС, обратите внимание, что команды могут отличаться.)
  • На вашем компьютере установлена ​​последняя версия Docker (в этой статье используется Docker Desktop 4.6.1).
  • jq CLI, обеспечивающий форматирование вывода JSON (jq можно скачать здесь).

Сценарий: текстовый поиск в режиме реального времени в новостной индустрии.

Рассмотрим этот сценарий, чтобы подготовить почву для остальной части учебника.

PandaPost — вымышленная новостная компания, которая сотрудничает со многими журналистами-подрядчиками, которые публикуют новостные репортажи. Из-за того, что еженедельно поступает более тысячи новостных сообщений, у PandaPost возникают проблемы с эффективным поиском контента.

Они нанимают вас в качестве технического консультанта. Им нужна система, которая делает возможным полнотекстовый поиск содержания новостей. Им также необходимо, чтобы эта функция была доступна в режиме реального времени, чтобы, когда журналисты отправляют новый отчет, он был доступен для поиска в течение нескольких секунд.

Вы решаете использовать Redpanda для потоковой передачи данных, Elasticsearch для предоставления возможностей поиска и Kafka Connect для их интеграции.

Следующая диаграмма объясняет высокоуровневую архитектуру:

Настройка Редпанды

В этом руководстве вы запустите Redpanda в контейнере через Docker. Для получения дополнительной информации о вариантах установки Redpanda на другие платформы обратитесь к этой документации.

Убедитесь, что вы установили Docker и запустили демон Docker в своей среде.

Перед запуском Redpanda в Docker создайте папку с именем pandapost_integration в своем домашнем каталоге. Вы будете использовать этот каталог в качестве общего тома для контейнера Redpanda для будущих шагов.

Замените _YOUR_HOME_DIRECTORY_ своим домашним каталогом в следующей команде и запустите ее:

docker run -d --pull=always --name=redpanda-1 --rm \
    -v _YOUR_HOME_DIRECTORY_/pandapost_integration:/tmp/pandapost_integration \
    -p 9092:9092 \
    -p 9644:9644 \
    docker.vectorized.io/vectorized/redpanda:latest \
    redpanda start \
    --overprovisioned \
    --smp 1  \
    --memory 2G \
    --reserve-memory 1G \
    --node-id 0 \
    --check=false

Вы должны увидеть такой вывод:

Trying to pull docker.vectorized.io/vectorized/redpanda:latest...
Getting image source signatures
Copying blob sha256:245fe2b3f0d6b107b818db25affc44bb96daf57d56d7d90568a1f2f10839ec46
...output omitted...
Copying blob sha256:245fe2b3f0d6b107b818db25affc44bb96daf57d56d7d90568a1f2f10839ec46
Copying config sha256:fdaf68707351dda9ede29650f4df501b78b77a501ef4cfb5e2e9a03208f29068
Writing manifest to image destination
Storing signatures
105c7802c5a46fa691687d9f20c8b42cd461ce38d625f285cec7d6678af90a59

Предыдущая команда извлекает последний образ Redpanda из репозитория docker.vectorized.io и запускает контейнер с открытыми портами 9092 и 9644. В этом руководстве вы будете использовать порт 9092 для доступа к Redpanda.

Для получения дополнительной информации о том, как запустить Redpanda с помощью Docker, обратитесь к этой документации.

Затем проверьте кластер с помощью интерфейса командной строки Redpanda rpk в контейнере:

docker exec -it redpanda-1 \
    rpk cluster info

Это возвращает следующий вывод:

BROKERS
=======
ID    HOST        PORT
0*    0.0.0.0     9092

Теперь ваш кластер Redpanda готов к использованию.

Создание темы Redpanda

Чтобы производитель Kafka и кластер Kafka Connect работали должным образом, необходимо определить тему Kafka. Вы можете использовать rpk для создания тем в кластере Redpanda.

Запустите команду rpk, чтобы создать тему с именем news-reports в контейнере Docker:

docker exec -it redpanda-1 \
rpk topic create news-reports

Убедитесь, что у вас есть созданные темы:

docker exec -it redpanda-1 \
rpk cluster info

Это вернет следующий вывод:

BROKERS
=======
ID    HOST         PORT
0*    0.0.0.0      9092
TOPICS
======
NAME                   PARTITIONS  REPLICAS
news-reports               1           1

С этой конфигурацией Redpanda будет доступна через localhost:9092 на вашем компьютере до конца этой статьи.

Настройка эластичного поиска

В этом руководстве вы также запустите Elasticsearch как контейнер Docker. В другом окне терминала выполните следующую команду, чтобы выполнить Elasticsearch с портами 9200 и 9300:

docker run --name elastic-1 \
    -p 9200:9200 -p 9300:9300 -it \
    -e "discovery.type=single-node" \
    -e "xpack.security.enabled=false" \
    docker.elastic.co/elasticsearch/elasticsearch:7.17.2

Вывод должен выглядеть так:

...output omitted...
{"@timestamp":"2022-04-15T15:13:17.067Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-Country.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#7]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}
{"@timestamp":"2022-04-15T15:13:17.118Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-City.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#13]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}

Протестируйте экземпляр Elasticsearch с помощью следующей команды:

curl 'http://localhost:9200'

Вывод должен быть следующим:

{
  "name" : "de867178cae3",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "kSrE0ipYS8-v1Znjc6jgCQ",
  "version" : {
    "number" : "8.1.2",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "31df9689e80bad366ac20176aa7f2371ea5eb4c1",
    "build_date" : "2022-03-29T21:18:59.991429448Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

Вы успешно настроили Elasticsearch в контейнере Docker.

Настройка Кафка Коннект

Kafka Connect — это инструмент интеграции, выпущенный вместе с проектом Apache Kafka. Он обеспечивает надежную потоковую передачу данных между Apache Kafka и внешними системами, а также является масштабируемым и гибким. Вы можете использовать Kafka Connect для интеграции с другими системами, такими как базы данных, поисковые индексы и поставщики облачных хранилищ. Kafka Connect также работает с Redpanda, который совместим с API Kafka.

Kafka Connect использует для интеграции коннекторы source и sink. Соединители-источники передают данные из внешней системы в Kafka, а соединители-приемники передают данные из Kafka во внешнюю систему.

Kafka Connect входит в состав пакета Apache Kafka. Перейдите на страницу загрузки Apache для Kafka и щелкните предложенную ссылку для загрузки двоичного пакета Kafka 3.1.0.

Извлеките папку двоичных файлов Kafka в созданный ранее каталог _YOUR_HOME_DIRECTORY_/pandapost_integration.

Настройка кластера Connect

Чтобы запустить кластер Kafka Connect, сначала создайте файл конфигурации в формате properties.

В папке pandapost_integration создайте папку с именем configuration. Затем создайте в этом каталоге файл с именем connect.properties и добавьте в него следующее содержимое:

#Kafka broker addresses
bootstrap.servers=
#Cluster level converters
#These applies when the connectors don't define any converter
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#JSON schemas enabled to false in cluster level
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#Where to keep the Connect topic offset configurations
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
#Plugin path to put the connector binaries
plugin.path=

Установите для параметра bootstrap.servers значение localhost:9092, чтобы настроить кластер Connect для использования кластера Redpanda.

Вы также должны настроить plugin.path, который вы будете использовать для размещения двоичных файлов коннектора. Создайте папку с именем plugins в каталоге pandapost_integration. Затем перейдите на эту веб-страницу и нажмите Загрузить, чтобы загрузить заархивированные двоичные файлы. Разархивируйте файл и скопируйте файлы из папки lib в папку с именем kafka-connect-elasticsearch, placed in the plugins`.

Окончательная структура папок для pandapost_integration должна выглядеть так:

pandapost_integration
├── configuration
│   ├── connect.properties
├── plugins
│   ├── kafka-connect-elasticsearch
│   │   ├── aggs-matrix-stats-client-7.9.3.jar
│   │   ├── ...
│   │   └── snakeyaml-1.27.jar
└── kafka_2.13-3.1.0

Измените значение plugin.path на _YOUR_HOME_DIRECTORY_/pandapost_integration/plugins. Это настраивает кластер Kafka Connect для использования кластера Redpanda.

Окончательный файл connect.properties должен выглядеть так:

#Kafka broker addresses
bootstrap.servers=localhost:9092
#Cluster level converters
#These applies when the connectors don't define any converter
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#JSON schemas enabled to false in cluster level
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#Where to keep the Connect topic offset configurations
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
#Plugin path to put the connector binaries
plugin.path=_YOUR_HOME_DIRECTORY_/pandapost_integration/plugins

Настройка коннектора Elasticsearch

Настроить подключаемые модули коннекторов в кластере Kafka Connect для интеграции с внешними системами недостаточно — кластер Kafka Connect требует настройки коннекторов для выполнения интеграции.

В частности, вам нужно настроить коннектор Sink для Elasticsearch. Для этого создайте файл с именем elasticsearch-sink-connector.properties в каталоге _YOUR_HOME_DIRECTORY_/pandapost_integration/configuration со следующим содержимым:

name=elasticsearch-sink-connector
# Connector class
connector.class=
# The key converter for this connector
key.converter=org.apache.kafka.connect.storage.StringConverter
# The value converter for this connector
value.converter=org.apache.kafka.connect.json.JsonConverter
# Identify, if value contains a schema.
# Required value converter is `org.apache.kafka.connect.json.JsonConverter`.
value.converter.schemas.enable=false
tasks.max=1
# Topic name to get data from
topics=
key.ignore=true
schema.ignore=true
# Elasticsearch cluster address
connection.url=
type.name=_doc

Некоторые значения оставлены пустыми в учебных целях. Установите следующие значения для ключей в файле elasticsearch-sink-connector.properties:

KeyValueconnector.classio.confluent.connect.elasticsearch.ElasticsearchSinkConnectortopicsnews-reportsconnection.urlhttp://localhost:9200

Запуск кластера Kafka Connect

Чтобы запустить кластер с примененными конфигурациями, откройте новое окно терминала и перейдите в каталог _YOUR_HOME_DIRECTORY_/pandapost_integration/configuration. Выполните следующую команду в каталоге:

../kafka_2.13-3.1.0/bin/connect-standalone.sh connect.properties elasticsearch-sink-connector.properties

Вывод не должен содержать ошибок:

...output omitted...
 groupId=connect-elasticsearch-sink-connector] Successfully joined group with generation Generation{generationId=25, memberId='connector-consumer-elasticsearch-sink-connector-0-eb21795e-f3b3-4312-8ce9-46164a2cdb27', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:595)
[2022-04-17 03:37:06,872] INFO [elasticsearch-sink-connector|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-connector-0, groupId=connect-elasticsearch-sink-connector] Finished assignment for group at generation 25: {connector-consumer-elasticsearch-sink-connector-0-eb21795e-f3b3-4312-8ce9-46164a2cdb27=Assignment(partitions=[news-reports-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:652)
[2022-04-17 03:37:06,891] INFO [elasticsearch-sink-connector|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-connector-0, groupId=connect-elasticsearch-sink-connector] Successfully synced group in generation Generation{generationId=25, memberId='connector-consumer-elasticsearch-sink-connector-0-eb21795e-f3b3-4312-8ce9-46164a2cdb27', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:761)
[2022-04-17 03:37:06,891] INFO [elasticsearch-sink-connector|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-connector-0, groupId=connect-elasticsearch-sink-connector] Notifying assignor about the new Assignment(partitions=[news-reports-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:279)
[2022-04-17 03:37:06,893] INFO [elasticsearch-sink-connector|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-connector-0, groupId=connect-elasticsearch-sink-connector] Adding newly assigned partitions: news-reports-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:291)
[2022-04-17 03:37:06,903] INFO [elasticsearch-sink-connector|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-connector-0, groupId=connect-elasticsearch-sink-connector] Setting offset for partition news-reports-0 to the committed offset FetchPosition{offset=3250, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=absent}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)

Индексация данных в Elasticsearch

Затем загрузите файл JSON с некоторыми примерами данных, которые помогут вам имитировать новостные репортажи, которые присылают репортеры подрядчика. Перейдите к этому репозиторию и нажмите Загрузить. Имя файла должно быть news-reports-data.json.

Содержимое загруженного файла будет выглядеть так:

...output omitted...
{"reporterId": 8824, "reportId": 10000, "content": "Was argued independent 2002 film, The Slaughter Rule.", "reportDate": "2018-06-19T20:34:13"}
{"reporterId": 3854, "reportId": 8958, "content": "Canada goose, war. Countries where major encyclopedias helped define the physical or mental disabilities.", "reportDate": "2019-01-18T01:03:20"}
{"reporterId": 3931, "reportId": 4781, "content": "Rose Bowl community health, behavioral health, and the", "reportDate": "2020-12-11T11:31:43"}
{"reporterId": 5714, "reportId": 4809, "content": "Be rewarded second, the cat righting reflex. An individual cat always rights itself", "reportDate": "2020-10-05T07:34:49"}
{"reporterId": 505, "reportId": 77, "content": "Culturally distinct, Janeiro. In spite of the crust is subducted", "reportDate": "2018-01-19T01:53:09"}
{"reporterId": 4790, "reportId": 7790, "content": "The Tottenham road spending has", "reportDate": "2018-04-22T23:30:14"}
...output omitted...

Переместите файл в каталог _YOUR_HOME_DIRECTORY_/pandapost_integration и выполните следующую команду, чтобы легко создавать сообщения для Redpanda с помощью интерфейса командной строки rpk, который вы запускаете внутри контейнера Redpanda:

docker exec -it redpanda-1 /bin/sh -c \
'rpk topic produce news-reports < /tmp/pandapost_integration/news-reports-data.json'

Вы должны увидеть следующий вывод, который указывает на то, что вы успешно отправили 3250 записей в Redpanda за несколько секунд:

...output omitted...
Produced to partition 0 at offset 3244 with timestamp 1650112321454.
Produced to partition 0 at offset 3245 with timestamp 1650112321454.
Produced to partition 0 at offset 3246 with timestamp 1650112321454.
Produced to partition 0 at offset 3247 with timestamp 1650112321454.
Produced to partition 0 at offset 3248 with timestamp 1650112321454.
Produced to partition 0 at offset 3249 with timestamp 1650112321454.

Такое же количество записей было отправлено в Elasticsearch в индекс под названием news-reports. Обратите внимание, что это то же имя, что и тема Redpanda, которую вы создали. Коннектор Elasticsearch Sink по умолчанию создает индекс с тем же именем.

Чтобы убедиться, что Elasticsearch проиндексировал данные, выполните следующую команду в окне терминала:

curl 'http://localhost:9200/news-reports/_search' | jq

Это должно вернуть десять записей, так как Elasticsearch по умолчанию возвращает первые десять результатов:

{
  "took": 10,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3250,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+5",
        "_score": 1,
        "_source": {
          "reportId": 9849,
          "reportDate": "2018-04-02T00:34:24",
          "reporterId": 8847,
          "content": "Street, and lawyer has"
        }
      },
...output omitted...
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+31",
        "_score": 1,
        "_source": {
          "reportId": 963,
          "reportDate": "2020-08-15T11:05:08",
          "reporterId": 5124,
          "content": "A.; Donald On Earth's surface, where it is a small newspaper's print run might"
        }
      }
    ]
  }
}

Чтобы проверить количество записей в индексе news-reports, выполните следующую команду:

curl 'http://localhost:9200/news-reports/_count'

Вывод должен выглядеть так:

{"count":3250,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}

Обратите внимание, что он возвращает то же количество записей, что и Redpanda из файла news-reports-data.json.

Теперь давайте рассмотрим функциональность, которую требует PandaPost, выполнив поиск по ключевому слову в поисковом индексе news-reports.

Чтобы найти ключевое слово в индексе Elasticsearch, вы должны использовать путь /_search, за которым следует параметр q=, который обозначает query string parameter для full text query. Для получения дополнительной информации о полнотекстовых запросах и других типах поисковых запросов, предоставляемых Elasticsearch, посетите эту веб-страницу.

Следующая команда ищет ключевое слово film в новостях:

curl 'http://localhost:9200/news-reports/_search?q=content:film' | jq

Вывод должен быть следующим:

{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 10,
      "relation": "eq"
    },
    "max_score": 7.454871,
    "hits": [
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+606",
        "_score": 7.454871,
        "_source": {
          "reportId": 1517,
          "reportDate": "2020-12-04T20:51:16",
          "reporterId": 6341,
          "content": "Latino Film the \"Bielefeld"
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+1754",
        "_score": 7.454871,
        "_source": {
          "reportId": 4742,
          "reportDate": "2020-05-03T20:59:19",
          "reporterId": 7664,
          "content": "Novels, film, occupation ended"
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+422",
        "_score": 6.051381,
        "_source": {
          "reportId": 2833,
          "reportDate": "2019-08-05T04:38:33",
          "reporterId": 5716,
          "content": "These other eyes having a particularly strong film"
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+383",
        "_score": 6.051381,
        "_source": {
          "reportId": 3311,
          "reportDate": "2018-01-01T15:27:48",
          "reporterId": 8434,
          "content": "Societies was by writers, film-makers, philosophers, artists"
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+13",
        "_score": 6.051381,
        "_source": {
          "reportId": 10000,
          "reportDate": "2018-06-19T20:34:13",
          "reporterId": 8824,
          "content": "Was argued independent 2002 film, The Slaughter Rule."
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+3108",
        "_score": 5.5307574,
        "_source": {
          "reportId": 8417,
          "reportDate": "2019-08-01T19:07:14",
          "reporterId": 2653,
          "content": "Legislative body, strong film industry, including skills shortages, improving productivity"
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+1320",
        "_score": 5.302654,
        "_source": {
          "reportId": 2558,
          "reportDate": "2020-06-19T07:01:54",
          "reporterId": 7503,
          "content": "Still retain the waters of eastern Montana. Robert Redford's 1992 film"
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+203",
        "_score": 4.8985906,
        "_source": {
          "reportId": 7500,
          "reportDate": "2019-01-05T23:27:49",
          "reporterId": 14,
          "content": "Ridership of Film (\"Oscar\") went to the U.S., Britain and in that they"
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+1664",
        "_score": 4.8985906,
        "_source": {
          "reportId": 454,
          "reportDate": "2019-12-04T11:05:30",
          "reporterId": 2511,
          "content": "Film with its causes), and dynamics of the Hebrew מִצְרַיִם (Mitzráyim). The oldest"
        }
      },
      {
        "_index": "news-reports",
        "_type": "_doc",
        "_id": "news-reports+0+249",
        "_score": 4.718804,
        "_source": {
          "reportId": 2647,
          "reportDate": "2018-08-22T00:47:19",
          "reporterId": 8067,
          "content": "On experiments ship, Jose Gasparilla, and subsequent higher borrowing costs for film study in"
        }
      }
    ]
  }
}

Изучите результаты и обратите внимание, что во всех них в поле content указано слово film. Поздравляем! Это означает, что вы успешно передали данные из файла с помощью Redpanda и проиндексировали данные с помощью Kafka Connect!

Заключение

Как вы видели в этой статье, интеграция Elasticsearch с Redpanda открывает множество возможностей для конвейерной обработки данных и мира поиска. Знание того, как использовать эти инструменты вместе, позволяет вам передавать любые структурированные данные через Redpanda, индексировать данные в Elasticsearch в режиме реального времени, а затем позволить Elasticsearch сохранять ваши данные доступными для дальнейшего использования. Используя эти инструменты вместе, вы можете создать любое количество приложений.

Помните, что вы можете найти ресурсы кода для этого руководства в этом репозитории.

Взаимодействуйте с разработчиками Redpanda напрямую в Сообществе Redpanda в Slack или вносите свой вклад в репозиторий Redpanda на GitHub, доступный здесь. Чтобы узнать больше обо всем, что вы можете сделать с Redpanda, ознакомьтесь с нашей документацией здесь.

Эта статья первоначально появилась в блоге Redpanda.