帮我写一个python 使用kakfa集群的生产者和消费者的示例代码
以下是一个使用Python Kafka客户端库kafka-python编写的基本示例代码,展示如何创建Kafka生产者和消费者。
生产者示例代码:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
for i in range(5):
message = {'key': 'value', 'number': i}
producer.send('test_topic', message)
上面的代码创建了一个Kafka生产者,并将JSON序列化的消息发送到名为test_topic的主题中。
消费者示例代码:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('test_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
print(message.value)
上面的代码创建了一个Kafka消费者,并从名为test_topic的主题中接收JSON序列化的消息,然后将其反序列化为Python对象并打印到控制台上。
注意:在运行上述代码之前,请确保你已经安装了kafka-python库并且已经启动了Kafka集群
原文地址: https://www.cveoy.top/t/topic/cnUo 著作权归作者所有。请勿转载和采集!