在 Flink 中,将 Checkpoint 保存到 HDFS 可以通过以下配置实现:

  1. 配置 HDFS 文件系统地址和端口号

flink-conf.yaml 文件中配置 HDFS 文件系统的地址和端口号:

fs.defaultFS: hdfs://<hdfs-namenode-hostname>:<hdfs-namenode-port>
  1. 启用 Checkpoint 并设置属性

在 Flink 作业中启用 Checkpoint 并设置相关属性,例如:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用 Checkpoint,设置 Checkpoint 间隔时间为 30 秒
env.enableCheckpointing(30000);

// 设置 Checkpoint 存储的目录
env.getCheckpointConfig().setCheckpointStorage('hdfs://<hdfs-namenode-hostname>:<hdfs-namenode-port>/flink/checkpoint');

// 设置 Checkpoint 的模式为 exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置 Checkpoint 的超时时间为 10 分钟
env.getCheckpointConfig().setCheckpointTimeout(600000);

// 设置同时进行的最大 Checkpoint 数量为 3 个
env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);

// 设置如果 Checkpoint 失败,作业是否应该停止
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);

上述代码中,env.enableCheckpointing(30000) 表示每 30 秒进行一次 Checkpoint,env.getCheckpointConfig().setCheckpointStorage('hdfs://<hdfs-namenode-hostname>:<hdfs-namenode-port>/flink/checkpoint') 表示将 Checkpoint 存储在 HDFS 中的 /flink/checkpoint 目录下。

查询 HDFS 端口号

HDFS 的端口号一般为 9000,您可以在 Hadoop 配置文件 core-site.xml 中查看。

Flink Checkpoint 保存到 HDFS:配置详解及示例

原文地址: https://www.cveoy.top/t/topic/mkdS 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录