Flink 读取本地文件操作:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;

public class LocalFileReader {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.readTextFile("file:///path/to/file");
        DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer())
            .groupBy(0)
            .sum(1);
        counts.print();
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

Flink 读取 Kafka 数据操作:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceRecord;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaReader {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        FlinkKafkaConsumer<SourceRecord> consumer = new FlinkKafkaConsumer<>("my-topic", new SourceFunction() {
            @Override
            public void run(SourceContext ctx) throws Exception {
                // implement Kafka source function
            }

            @Override
            public void cancel() {
                // implement Kafka source cancel function
            }
        }, properties);
        env.addSource(consumer).print();
        env.execute("Kafka Reader");
    }
}
``
flink分别创建两个类一个用于读取本地文件操作一个用于读取 kafka 数据操作

原文地址: https://www.cveoy.top/t/topic/ggxH 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录