Sure, here's an example of how to produce and consume Kafka messages using Java:

import java.util.Properties;
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaExample {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC_NAME = "my-topic";

    public static void main(String[] args) throws InterruptedException {

        // Set up Kafka producer properties
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Create Kafka producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

        // Send a message to Kafka
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", "value");
        producer.send(record);

        // Set up Kafka consumer properties
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Create Kafka consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

        // Subscribe to the topic
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        // Poll for messages from Kafka
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key=%s, value=%s\n", record.key(), record.value());
            }
        }
    }
}

In this example, we first set up the properties for the Kafka producer and consumer, including the bootstrap servers and serialization/deserialization classes for the key and value. We then create a Kafka producer and use it to send a message to the 'my-topic' topic.

Next, we create a Kafka consumer and subscribe to the same topic. We then continuously poll for messages from Kafka and print out the key and value of each message we receive.

Note that in a real-world scenario, you would likely want to handle errors and exceptions more gracefully, and you may have multiple producers and consumers sending and receiving messages from different topics.

Java Kafka Producer and Consumer Example

原文地址: https://www.cveoy.top/t/topic/nEPM 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录