Программирование

Kafka Python Обработка данных

Цель этой статьи - прочитать данные из таблицы Oracle DB и отправить записи в формате JSON в Kafka Broker, а затем прочитать сообщения от Kafka Broker и вставить сообщения JSON в коллекцию MongoDB.
Блог содержит фундаментальный ETL сборка системы обмена сообщениями с использованием Oracle в качестве источника, Kafka в качестве промежуточного программного обеспечения и MongoDB в качестве цели.

Это мой третий блог из серии Kafka, предыдущие две ссылки блога содержат концептуальные детали Kafka, ссылка как ниже:

Https://medium.com/towards-artificial-intelligence/getting-started-with-apache-kafka-beginners-tutorial-d38e3634706c?source=friends_link&sk=2b98454c001fb88527fff8c2217947e2

Https://medium.com/towards-artificial-intelligence/diving-deep-into-kafka-29160f32d408?source=friends_link&sk=0cf78cc0ee1ef12f0bac1634e71d1550

  1. Подключитесь к Oracle и извлеките данные:
#import required libraries
from json import dumps
import json
from kafka import KafkaProducer
import cx_Oracle
conn = cx_Oracle.connect('scott/scott@oracle')
cursor=conn.cursor()
query='select empno,ename,sal from emp'
result=cursor.execute(query)
data=[]
#convert data into JSON format
for row in result:
    data.append({'empno':row[0], 'ename':row[1], 'sal':row[2]})
conn.close()
for records in data:
    print(records)

Поскольку MongoDB принимает данные в формате JSON, мы преобразуем данные в формат «ключ-значение» JSON.

2. Создайте продюсера и отправляйте сообщения в Kafka Broker:

producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’],
 value_serializer=lambda x:json.dumps(x).encode(‘utf-8’))
#push message to kafka topic mongo_poc
try:
 for val in data:
 producer.send(‘mongo_poc’,val)
except Exception as e:
 print(e)

сообщения в формате пары ключ-значение

3. Создайте потребителя и вставьте записи в MongoDB:

#import required libraries
from kafka import KafkaConsumer
from json import loads
import json
from pymongo import MongoClient
#mongodb connection details
client = MongoClient(‘localhost:27017’)
db=client.mydb
#write Kafka consumer
consumer = KafkaConsumer(‘mongo_poc’,
 bootstrap_servers=[‘localhost:9092’],
 auto_offset_reset=’earliest’,
 enable_auto_commit=True,
 group_id=’my-group’,
 consumer_timeout_ms=1000,
 value_deserializer=lambda x: loads(x.decode(‘utf-8’)))
#read messages from Kafka and insert into mongodb collection named kafka_mongo
for message in consumer:
 msg = message.value
 print(msg)
 db.kafka_mongo.insert(msg)
 consumer.commit()

Коллекцию MongoDB можно рассматривать как таблицу СУБД. Каждая запись в MongoDB называется документом. Итак, запись Oracle - это документ MongoDB.

find () метод используется для запроса коллекции - kafka_mongo

Посетите мой блог MongoDB, чтобы понять основные операции CRUD, ссылка, как показано ниже: https://mongocrud.blogspot.com/2020/03/mongodb-crud-operations.html

Ура, мы успешно прочитали сообщения от Kafka Broker и вставили их в коллекцию MongoDB.

Резюме:

· Соединение Python Oracle с использованием cx_Oracle

· Создание данных JSON

· Push-сообщения с помощью Kafka Producer

· Потребитель Kafka вставляет данные в коллекцию MongoDB

Спасибо всем за чтение моего блога, и если вам нравится мой контент и объяснения, подпишитесь на меня в среде и поделитесь своими отзывами, которые всегда помогут всем нам расширить наши знания.