Java Kafka 消息推送与接收程序代码示例
下面是一个简单的Java程序,用于发送和接收Kafka消息:
发送程序:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerExample {
public static void main(String[] args) {
String topicName = 'my-topic';
String bootstrapServers = 'localhost:9092';
// 设置Kafka生产者的配置属性
Properties properties = new Properties();
properties.put('bootstrap.servers', bootstrapServers);
properties.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
properties.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
// 创建Kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 发送消息
for (int i = 0; i < 10; i++) {
String key = 'key-' + i;
String value = 'value-' + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer.send(record);
}
// 关闭Kafka生产者
producer.close();
}
}
接收程序:
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerExample {
public static void main(String[] args) {
String topicName = 'my-topic';
String bootstrapServers = 'localhost:9092';
String groupId = 'my-group';
// 设置Kafka消费者的配置属性
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建Kafka消费者
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Collections.singletonList(topicName));
// 接收消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println('Received message: key = ' + record.key() + ', value = ' + record.value());
});
}
}
}
以上程序使用了Apache Kafka的Java客户端库。在发送程序中,我们创建了一个Kafka生产者,并发送了10条消息到指定的主题。在接收程序中,我们创建了一个Kafka消费者,并订阅了指定的主题,然后不断地从Kafka中拉取消息并进行处理。
原文地址: https://www.cveoy.top/t/topic/qsDD 著作权归作者所有。请勿转载和采集!