用 java 实现一个 kafka 消息发布订阅的例子
以下是一个简单的Java程序,演示了如何使用Kafka在发布者和订阅者之间传输消息。在这个例子中,我们将使用Kafka的Java API,这个API可以从Apache的Maven仓库中获取。
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaExample {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
public static void main(String[] args) {
// Set up producer properties
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
producerProps.put("key.serializer", StringSerializer.class.getName());
producerProps.put("value.serializer", StringSerializer.class.getName());
// Set up consumer properties
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
consumerProps.put("group.id", GROUP_ID);
consumerProps.put("key.deserializer", StringDeserializer.class.getName());
consumerProps.put("value.deserializer", StringDeserializer.class.getName());
// Create a Kafka producer instance
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// Create a Kafka consumer instance
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// Subscribe to the topic
consumer.subscribe(Collections.singleton(TOPIC_NAME));
// Start a new thread to consume messages
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}).start();
// Publish some messages
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
producer.send(record);
System.out.println("Published message: " + message);
}
// Close the producer and consumer instances
producer.close();
consumer.close();
}
}
在这个例子中,我们使用了一个名为“my-topic”的主题。我们先定义了一些常量,如主题名称、Kafka服务器地址和消费者组ID。然后,我们设置了生产者和消费者的属性并创建了它们的实例。
接下来,我们订阅了主题,并启动了一个新的线程,用于消费消息。在这个线程中,我们使用poll()方法从消费者拉取消息,并对每个消息进行处理。在主线程中,我们循环10次,发布一些消息。
最后,我们关闭了生产者和消费者实例。这是一个非常简单的例子,但它演示了如何使用Kafka在Java中实现消息发布和订阅。
原文地址: https://www.cveoy.top/t/topic/AI6 著作权归作者所有。请勿转载和采集!