以下是一個簡單的Java代碼示例,用於生產和消費Kafka消息:

生產者:

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample { public static void main(String[] args) throws Exception{

  String topicName = "test-topic";
  String key = "sample-key";
  String value = "sample-value";

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  Producer<String, String> producer = new KafkaProducer<>(props);

  ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
  producer.send(record);
  producer.close();

} }

消費者:

import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration; import java.util.Collections; import java.util.Properties;

public class KafkaConsumerExample { public static void main(String[] args) throws Exception {

  String topicName = "test-topic";
  String groupName = "test-group";

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", groupName);
  props.put("key.deserializer", StringDeserializer.class.getName());
  props.put("value.deserializer", StringDeserializer.class.getName());

  Consumer<String, String> consumer = new KafkaConsumer<>(props);
  consumer.subscribe(Collections.singletonList(topicName));

  while (true) {
     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
     records.forEach(record -> {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     });
  }

} }

這個例子使用Kafka的Java客戶端API,生產一個包含“sample-key”和“sample-value”的消息,並將其發送到名為“test-topic”的主題中。然後,消費者訂閱此主題並從中接收消息。在消費者中,我們使用了一個無限循環,不斷從Kafka主題中拉取消息。當收到消息時,我們只是簡單地打印它們的偏移量,鍵和值

寫一個用java代碼調用kafa的例子

原文地址: http://www.cveoy.top/t/topic/eEWm 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录