以下是一个使用 Python Kafka 客户端库 'kafka-python' 编写的基本示例代码,展示如何创建 Kafka 生产者和消费者。

生产者示例代码:

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))

for i in range(5):
    message = {'key': 'value', 'number': i}
    producer.send('test_topic', message)

上面的代码创建了一个 Kafka 生产者,并将 JSON 序列化的消息发送到名为 'test_topic' 的主题中。

消费者示例代码:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('test_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))

for message in consumer:
    print(message.value)

上面的代码创建了一个 Kafka 消费者,并从名为 'test_topic' 的主题中接收 JSON 序列化的消息,然后将其反序列化为 Python 对象并打印到控制台上。

注意:在运行上述代码之前,请确保你已经安装了 'kafka-python' 库并且已经启动了 Kafka 集群。

Python Kafka 集群生产者和消费者示例代码

原文地址: https://www.cveoy.top/t/topic/nuEI 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录