在 Flink 中保存 Checkpoint 到 HDFS,需要在 Flink 配置文件中设置相关参数。以下是一个示例代码:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;

public class CheckpointExample {

  public static void main(String[] args) throws Exception {

    // 创建流处理环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 启用检查点
    env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);

    // 设置检查点保存到 HDFS
    StateBackend backend = new FsStateBackend('hdfs://<namenode>:<port>/flink/checkpoints');
    env.setStateBackend(backend);

    // ... 其他代码

    env.execute('Checkpoint Example');
  }
}

在上面的示例中,<namenode><port> 分别表示 HDFS 的 NameNode 的地址和端口号。在实际使用中,需要将它们替换成实际的值。

HDFS 的默认端口号是 8020,可以在 HDFS 配置文件 hdfs-site.xml 中查看或修改。

Flink Checkpoint 保存到 HDFS 实践指南:示例代码和端口配置

原文地址: https://www.cveoy.top/t/topic/mkd7 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录