寫一個用java代碼調用kafa的例子
以下是一個簡單的Java代碼示例,用於生產和消費Kafka消息:
生產者:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample { public static void main(String[] args) throws Exception{
String topicName = "test-topic";
String key = "sample-key";
String value = "sample-value";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
producer.close();
} }
消費者:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.util.Collections; import java.util.Properties;
public class KafkaConsumerExample { public static void main(String[] args) throws Exception {
String topicName = "test-topic";
String groupName = "test-group";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupName);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
});
}
} }
這個例子使用Kafka的Java客戶端API,生產一個包含“sample-key”和“sample-value”的消息,並將其發送到名為“test-topic”的主題中。然後,消費者訂閱此主題並從中接收消息。在消費者中,我們使用了一個無限循環,不斷從Kafka主題中拉取消息。當收到消息時,我們只是簡單地打印它們的偏移量,鍵和值
原文地址: http://www.cveoy.top/t/topic/eEWm 著作权归作者所有。请勿转载和采集!