java通过kafka读取数据源文件输出
以下是Java代码示例,演示如何使用Kafka读取数据源文件并输出:
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaFileReader {
public static void main(String[] args) throws Exception {
// 设置Kafka生产者的属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 读取数据源文件
BufferedReader reader = new BufferedReader(new FileReader("data.txt"));
String line;
while ((line = reader.readLine()) != null) {
// 发送数据到Kafka
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", line);
producer.send(record);
// 输出发送的数据
System.out.println("Sent: " + line);
}
reader.close();
// 关闭Kafka生产者
producer.close();
}
}
在上面的示例中,我们使用了Kafka生产者来发送数据到Kafka集群中的主题。我们使用了一个BufferedReader来读取数据源文件,并使用ProducerRecord将每一行数据发送到Kafka主题中。在发送数据后,我们还输出了发送的数据。最后,我们关闭了Kafka生产者
原文地址: https://www.cveoy.top/t/topic/f5pD 著作权归作者所有。请勿转载和采集!