以下是Python代码示例:

from kafka import KafkaConsumer, KafkaProducer
import random

consumer = KafkaConsumer('test_topic', bootstrap_servers=['localhost:9092'])
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

for msg in consumer:
    data = msg.value.decode('utf-8')
    for i in range(20):
        # 随机生成数据
        new_data = data + str(random.randint(1, 100))
        producer.send('test_topic', new_data.encode('utf-8'))
        print(f"Sent message: {new_data}")

该代码从名为“test_topic”的Kafka主题中消费消息,并将随机生成的数据发送回同一主题。每个原始消息将产生20个随机数据。请注意,这将导致Kafka主题中的消息数量增加20倍

通过kafka消费者将上面的数据随机发送20条数据

原文地址: https://www.cveoy.top/t/topic/f5nJ 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录