Flink Checkpoint 保存到 HDFS:配置详解及示例
在 Flink 中,将 Checkpoint 保存到 HDFS 可以通过以下配置实现:
- 配置 HDFS 文件系统地址和端口号
在 flink-conf.yaml 文件中配置 HDFS 文件系统的地址和端口号:
fs.defaultFS: hdfs://<hdfs-namenode-hostname>:<hdfs-namenode-port>
- 启用 Checkpoint 并设置属性
在 Flink 作业中启用 Checkpoint 并设置相关属性,例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint,设置 Checkpoint 间隔时间为 30 秒
env.enableCheckpointing(30000);
// 设置 Checkpoint 存储的目录
env.getCheckpointConfig().setCheckpointStorage('hdfs://<hdfs-namenode-hostname>:<hdfs-namenode-port>/flink/checkpoint');
// 设置 Checkpoint 的模式为 exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置 Checkpoint 的超时时间为 10 分钟
env.getCheckpointConfig().setCheckpointTimeout(600000);
// 设置同时进行的最大 Checkpoint 数量为 3 个
env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
// 设置如果 Checkpoint 失败,作业是否应该停止
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
上述代码中,env.enableCheckpointing(30000) 表示每 30 秒进行一次 Checkpoint,env.getCheckpointConfig().setCheckpointStorage('hdfs://<hdfs-namenode-hostname>:<hdfs-namenode-port>/flink/checkpoint') 表示将 Checkpoint 存储在 HDFS 中的 /flink/checkpoint 目录下。
查询 HDFS 端口号
HDFS 的端口号一般为 9000,您可以在 Hadoop 配置文件 core-site.xml 中查看。
原文地址: https://www.cveoy.top/t/topic/mkdS 著作权归作者所有。请勿转载和采集!