Python Kafka 集群生产者和消费者示例代码
以下是一个使用 Python Kafka 客户端库 'kafka-python' 编写的基本示例代码,展示如何创建 Kafka 生产者和消费者。
生产者示例代码:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
for i in range(5):
message = {'key': 'value', 'number': i}
producer.send('test_topic', message)
上面的代码创建了一个 Kafka 生产者,并将 JSON 序列化的消息发送到名为 'test_topic' 的主题中。
消费者示例代码:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('test_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
print(message.value)
上面的代码创建了一个 Kafka 消费者,并从名为 'test_topic' 的主题中接收 JSON 序列化的消息,然后将其反序列化为 Python 对象并打印到控制台上。
注意:在运行上述代码之前,请确保你已经安装了 'kafka-python' 库并且已经启动了 Kafka 集群。
原文地址: https://www.cveoy.top/t/topic/nuEI 著作权归作者所有。请勿转载和采集!