java实现订阅kafka队列生产者并向队列中发送信息
以下是Java实现订阅Kafka队列生产者并向队列中发送信息的步骤:
- 首先需要引入Kafka的依赖,例如Maven中的pom.xml文件中添加以下内容:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
- 创建Kafka生产者配置信息,例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
其中,bootstrap.servers为Kafka服务器地址,key.serializer和value.serializer为序列化器。
- 创建Kafka生产者实例,例如:
Producer<String, String> producer = new KafkaProducer<>(props);
- 创建Kafka消息对象,例如:
String topic = "test-topic";
String key = "key";
String value = "value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
其中,topic为Kafka队列名称,key和value为消息的键值对。
- 发送消息到Kafka队列,例如:
producer.send(record);
- 关闭Kafka生产者实例,例如:
producer.close();
完整代码如下:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "test-topic";
String key = "key";
String value = "value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
producer.close();
}
}
原文地址: https://www.cveoy.top/t/topic/qOx 著作权归作者所有。请勿转载和采集!