Java Kafka Producer and Consumer Example
Sure, here's an example of how to produce and consume Kafka messages using Java:
import java.util.Properties;
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "my-topic";
public static void main(String[] args) throws InterruptedException {
// Set up Kafka producer properties
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create Kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// Send a message to Kafka
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", "value");
producer.send(record);
// Set up Kafka consumer properties
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Create Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// Subscribe to the topic
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// Poll for messages from Kafka
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key=%s, value=%s\n", record.key(), record.value());
}
}
}
}
In this example, we first set up the properties for the Kafka producer and consumer, including the bootstrap servers and serialization/deserialization classes for the key and value. We then create a Kafka producer and use it to send a message to the 'my-topic' topic.
Next, we create a Kafka consumer and subscribe to the same topic. We then continuously poll for messages from Kafka and print out the key and value of each message we receive.
Note that in a real-world scenario, you would likely want to handle errors and exceptions more gracefully, and you may have multiple producers and consumers sending and receiving messages from different topics.
原文地址: https://www.cveoy.top/t/topic/nEPM 著作权归作者所有。请勿转载和采集!