以下是Java实现订阅Kafka队列生产者并向队列中发送信息的步骤:

  1. 首先需要引入Kafka的依赖,例如Maven中的pom.xml文件中添加以下内容:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>
  1. 创建Kafka生产者配置信息,例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

其中,bootstrap.servers为Kafka服务器地址,key.serializer和value.serializer为序列化器。

  1. 创建Kafka生产者实例,例如:
Producer<String, String> producer = new KafkaProducer<>(props);
  1. 创建Kafka消息对象,例如:
String topic = "test-topic";
String key = "key";
String value = "value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

其中,topic为Kafka队列名称,key和value为消息的键值对。

  1. 发送消息到Kafka队列,例如:
producer.send(record);
  1. 关闭Kafka生产者实例,例如:
producer.close();

完整代码如下:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        String topic = "test-topic";
        String key = "key";
        String value = "value";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        producer.send(record);

        producer.close();
    }
}

原文地址: https://www.cveoy.top/t/topic/qOx 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录