Flink 从本地文件读取数据并发送到 Kafka
在 Flink 中使用 Kafka 生产者读取本地文件中的数据,需要进行以下步骤:
- 导入必要的依赖:在 pom.xml 文件中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
- 创建 Kafka 生产者:使用 Flink 提供的 KafkaProducer 类创建一个 Kafka 生产者,设置 Kafka 的配置信息,如 Kafka 集群地址、消息序列化方式等。
Properties props = new Properties();
props.setProperty('bootstrap.servers', 'localhost:9092');
// 设置消息序列化方式,这里使用 StringSerializer
props.setProperty('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
props.setProperty('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
// 创建 KafkaProducer
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<'topic-name', new SimpleStringSchema(), props>;
- 读取本地文件数据并发送到 Kafka:使用 Flink 提供的 DataStream API 读取本地文件数据,将数据发送到 Kafka。
// 读取本地文件数据
DataStream<String> lines = env.readTextFile('path/to/local/file');
// 将数据发送到 Kafka
lines.addSink(producer);
完整代码如下:
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class KafkaProducerFromFile {
public static void main(String[] args) throws Exception {
// 从命令行参数中获取 Kafka 集群地址和 topic 名称
final ParameterTool params = ParameterTool.fromArgs(args);
String kafkaBrokers = params.get('kafkaBrokers', 'localhost:9092');
String topic = params.get('topic', 'test');
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 KafkaProducer 的配置信息
Properties props = new Properties();
props.setProperty('bootstrap.servers', kafkaBrokers);
props.setProperty('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
props.setProperty('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
// 创建 KafkaProducer
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(topic, new SimpleStringSchema(), props);
// 读取本地文件数据并发送到 Kafka
DataStream<String> lines = env.readTextFile('path/to/local/file');
lines.addSink(producer);
// 执行任务
env.execute('KafkaProducerFromFile');
}
}
原文地址: https://www.cveoy.top/t/topic/onG6 著作权归作者所有。请勿转载和采集!