Программирование
Kafka Python Обработка данных
Цель этой статьи - прочитать данные из таблицы Oracle DB и отправить записи в формате JSON в Kafka Broker, а затем прочитать сообщения от Kafka Broker и вставить сообщения JSON в коллекцию MongoDB.
Блог содержит фундаментальный ETL сборка системы обмена сообщениями с использованием Oracle в качестве источника, Kafka в качестве промежуточного программного обеспечения и MongoDB в качестве цели.
Это мой третий блог из серии Kafka, предыдущие две ссылки блога содержат концептуальные детали Kafka, ссылка как ниже:
- Подключитесь к Oracle и извлеките данные:
#import required libraries from json import dumps import json from kafka import KafkaProducer import cx_Oracle conn = cx_Oracle.connect('scott/scott@oracle') cursor=conn.cursor() query='select empno,ename,sal from emp' result=cursor.execute(query) data=[] #convert data into JSON format for row in result: data.append({'empno':row[0], 'ename':row[1], 'sal':row[2]}) conn.close() for records in data: print(records)
Поскольку MongoDB принимает данные в формате JSON, мы преобразуем данные в формат «ключ-значение» JSON.
2. Создайте продюсера и отправляйте сообщения в Kafka Broker:
producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’], value_serializer=lambda x:json.dumps(x).encode(‘utf-8’)) #push message to kafka topic mongo_poc try: for val in data: producer.send(‘mongo_poc’,val) except Exception as e: print(e)
сообщения в формате пары ключ-значение
3. Создайте потребителя и вставьте записи в MongoDB:
#import required libraries from kafka import KafkaConsumer from json import loads import json from pymongo import MongoClient #mongodb connection details client = MongoClient(‘localhost:27017’) db=client.mydb #write Kafka consumer consumer = KafkaConsumer(‘mongo_poc’, bootstrap_servers=[‘localhost:9092’], auto_offset_reset=’earliest’, enable_auto_commit=True, group_id=’my-group’, consumer_timeout_ms=1000, value_deserializer=lambda x: loads(x.decode(‘utf-8’))) #read messages from Kafka and insert into mongodb collection named kafka_mongo for message in consumer: msg = message.value print(msg) db.kafka_mongo.insert(msg) consumer.commit()
Коллекцию MongoDB можно рассматривать как таблицу СУБД. Каждая запись в MongoDB называется документом. Итак, запись Oracle - это документ MongoDB.
find () метод используется для запроса коллекции - kafka_mongo
Посетите мой блог MongoDB, чтобы понять основные операции CRUD, ссылка, как показано ниже: https://mongocrud.blogspot.com/2020/03/mongodb-crud-operations.html
Ура, мы успешно прочитали сообщения от Kafka Broker и вставили их в коллекцию MongoDB.
Резюме:
· Соединение Python Oracle с использованием cx_Oracle
· Создание данных JSON
· Push-сообщения с помощью Kafka Producer
· Потребитель Kafka вставляет данные в коллекцию MongoDB
Спасибо всем за чтение моего блога, и если вам нравится мой контент и объяснения, подпишитесь на меня в среде и поделитесь своими отзывами, которые всегда помогут всем нам расширить наши знания.