在 Flink 中将 checkpoint 保存到 HDFS,需要在 Flink 作业配置文件和代码中进行配置。

配置步骤

  1. 在作业配置文件中添加如下配置:
# 设置 checkpoint 保存到 HDFS 中
state.backend: filesystem
state.checkpoints.dir: hdfs://<hdfs-namenode>:<port>/<path-to-checkpoint-dir>

其中,<hdfs-namenode> 是 HDFS 的 NameNode 的地址,<port> 是 HDFS 的端口号,<path-to-checkpoint-dir> 是 HDFS 上存储 checkpoint 的目录路径。

  1. 在代码中启用 checkpoint:
// 开启 checkpoint,并设置 checkpoint 的时间间隔为 10 秒
env.enableCheckpointing(10000);

// 设置 checkpoint 的保存模式为 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置 checkpoint 的超时时间为 1 分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 设置同时进行的 checkpoint 的最大数量为 1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 设置 checkpoint 的保存路径
env.setStateBackend(new FsStateBackend("hdfs://<hdfs-namenode>:<port>/<path-to-checkpoint-dir>"));

在代码中,我们首先启用了 checkpoint,并设置了 checkpoint 的时间间隔为 10 秒,保存模式为 EXACTLY_ONCE,超时时间为 1 分钟,同时进行的 checkpoint 的最大数量为 1。然后,我们设置了 checkpoint 的保存路径为 HDFS 上的路径。

完整示例代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaConsumerExample {

    public static void main(String[] args) throws Exception {

        // 解析命令行参数
        final ParameterTool params = ParameterTool.fromArgs(args);

        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置 checkpoint 保存到 HDFS 中
        env.setStateBackend(new FsStateBackend("hdfs://<hdfs-namenode>:<port>/<path-to-checkpoint-dir>"));
        env.enableCheckpointing(10000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // 设置 Kafka 参数
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", params.get("bootstrap.servers", "localhost:9092"));
        kafkaProps.setProperty("group.id", params.get("group.id", "test-group"));

        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( 
                params.get("input-topic", "test-topic"),
                new SimpleStringSchema(),
                kafkaProps
        );

        // 添加 Kafka 消费者到执行环境中
        DataStream<String> stream = env.addSource(consumer);

        // 对每条消息进行转换,并输出到控制台
        stream.map((MapFunction<String, String>) value -> "Received: " + value)
                .addSink(new PrintSinkFunction<>());

        // 执行作业
        env.execute("Kafka Consumer Example");
    }
}

最佳实践:

  • 选择合适的 checkpoint 时间间隔,以平衡性能和数据一致性。
  • 确保 HDFS 上的 checkpoint 目录有足够的存储空间。
  • 使用 EXACTLY_ONCE 模式来保证数据的一致性。
  • 监控 checkpoint 的执行情况,及时解决可能出现的问题。

通过以上配置和代码示例,您可以轻松地将 Flink 的 checkpoint 保存到 HDFS,并实现数据的一致性。

Flink Checkpoint 保存到 HDFS:完整指南和示例代码

原文地址: https://www.cveoy.top/t/topic/mkdy 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录