flink分别创建两个类一个用于读取本地文件操作一个用于读取 kafka 数据操作
Flink 读取本地文件操作:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
public class LocalFileReader {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("file:///path/to/file");
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
counts.print();
}
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
Flink 读取 Kafka 数据操作:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceRecord;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaReader {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<SourceRecord> consumer = new FlinkKafkaConsumer<>("my-topic", new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {
// implement Kafka source function
}
@Override
public void cancel() {
// implement Kafka source cancel function
}
}, properties);
env.addSource(consumer).print();
env.execute("Kafka Reader");
}
}
``
原文地址: https://www.cveoy.top/t/topic/ggxH 著作权归作者所有。请勿转载和采集!