how to convert DataSource in ExecutionEnvironment to DataStream in StreamExecutionEnvironment in apache flink
To convert a DataSource in ExecutionEnvironment to DataStream in StreamExecutionEnvironment in Apache Flink, you can follow these steps:
- Create an ExecutionEnvironment object and load the data from a file or a collection. For example:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> input = env.readTextFile("path/to/file");
- Create a StreamExecutionEnvironment object.
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- Convert the DataSet to DataStream by calling the
toDataStream()method.
DataStream<String> stream = input.toDataStream(streamEnv);
- You can now apply transformations on the DataStream, like filtering, mapping, aggregating, etc.
DataStream<Integer> numbers = stream.flatMap(new FlatMapFunction<String, Integer>() {
@Override
public void flatMap(String value, Collector<Integer> out) throws Exception {
String[] tokens = value.split(",");
for (String token : tokens) {
out.collect(Integer.parseInt(token));
}
}
});
DataStream<Integer> filteredNumbers = numbers.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
DataStream<Integer> sum = filteredNumbers.keyBy(0).sum(1);
- Finally, you can execute the stream job by calling the
execute()method.
sum.print();
streamEnv.execute("My Streaming Job");
原文地址: https://www.cveoy.top/t/topic/bffC 著作权归作者所有。请勿转载和采集!