以下是一个简单的Java程序,演示了如何使用Kafka在发布者和订阅者之间传输消息。在这个例子中,我们将使用Kafka的Java API,这个API可以从Apache的Maven仓库中获取。

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaExample {
    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my-group";

    public static void main(String[] args) {
        // Set up producer properties
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", StringSerializer.class.getName());

        // Set up consumer properties
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        consumerProps.put("group.id", GROUP_ID);
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());

        // Create a Kafka producer instance
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

        // Create a Kafka consumer instance
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

        // Subscribe to the topic
        consumer.subscribe(Collections.singleton(TOPIC_NAME));

        // Start a new thread to consume messages
        new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                }
            }
        }).start();

        // Publish some messages
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            producer.send(record);
            System.out.println("Published message: " + message);
        }

        // Close the producer and consumer instances
        producer.close();
        consumer.close();
    }
}

在这个例子中,我们使用了一个名为“my-topic”的主题。我们先定义了一些常量,如主题名称、Kafka服务器地址和消费者组ID。然后,我们设置了生产者和消费者的属性并创建了它们的实例。

接下来,我们订阅了主题,并启动了一个新的线程,用于消费消息。在这个线程中,我们使用poll()方法从消费者拉取消息,并对每个消息进行处理。在主线程中,我们循环10次,发布一些消息。

最后,我们关闭了生产者和消费者实例。这是一个非常简单的例子,但它演示了如何使用Kafka在Java中实现消息发布和订阅。

用 java 实现一个 kafka 消息发布订阅的例子

原文地址: https://www.cveoy.top/t/topic/AI6 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录