下面是一个简单的Java程序,用于发送和接收Kafka消息:

发送程序:

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerExample {
    public static void main(String[] args) {
        String topicName = "my-topic";
        String bootstrapServers = "localhost:9092";

        // 设置Kafka生产者的配置属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 发送消息
        for (int i = 0; i < 10; i++) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
            producer.send(record);
        }

        // 关闭Kafka生产者
        producer.close();
    }
}

接收程序:

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        String topicName = "my-topic";
        String bootstrapServers = "localhost:9092";
        String groupId = "my-group";

        // 设置Kafka消费者的配置属性
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建Kafka消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 订阅主题
        consumer.subscribe(Collections.singletonList(topicName));

        // 接收消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
            });
        }
    }
}

以上程序使用了Apache Kafka的Java客户端库。在发送程序中,我们创建了一个Kafka生产者,并发送了10条消息到指定的主题。在接收程序中,我们创建了一个Kafka消费者,并订阅了指定的主题,然后不断地从Kafka中拉取消息并进行处理

用java写一套kafka的推送和接受程序

原文地址: http://www.cveoy.top/t/topic/iMqK 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录