flink 110版本streamExecutionEnv设置并行度为1java实现
下面是一个使用 Flink 1.10 版本的 Java 代码示例,设置并行度为 1 的 streamExecutionEnv:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class ParallelismExample {
public static void main(String[] args) throws Exception {
// setup execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// set parallelism to 1
env.setParallelism(1);
streamEnv.setParallelism(1);
// read input file path from command-line arguments
final ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input");
// read data from input file
final DataStream<String> stream = streamEnv.readTextFile(input);
// perform flat map operation with parallelism 1
stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split("\\s+");
for (String word : words) {
out.collect(word);
}
}
}).setParallelism(1).print();
// execute program
env.execute("Parallelism Example");
}
}
在上面的代码中,我们首先创建了一个 ExecutionEnvironment 和一个 StreamExecutionEnvironment。然后,我们使用 setParallelism() 方法来将它们的并行度设置为 1。
接下来,我们从命令行参数中读取输入文件路径,并使用 readTextFile() 方法从文件中读取数据创建一个 DataStream。然后,我们使用 flatMap() 方法对数据进行处理,并将其并行度设置为 1。
最后,我们使用 execute() 方法来执行 Flink 程序。在程序执行期间,我们可以查看输出结果,以确保程序按照预期方式工作
原文地址: https://www.cveoy.top/t/topic/eHdl 著作权归作者所有。请勿转载和采集!