Flink Checkpoint 保存到 HDFS:完整指南和示例代码
在 Flink 中将 checkpoint 保存到 HDFS,需要在 Flink 作业配置文件和代码中进行配置。
配置步骤
- 在作业配置文件中添加如下配置:
# 设置 checkpoint 保存到 HDFS 中
state.backend: filesystem
state.checkpoints.dir: hdfs://<hdfs-namenode>:<port>/<path-to-checkpoint-dir>
其中,<hdfs-namenode> 是 HDFS 的 NameNode 的地址,<port> 是 HDFS 的端口号,<path-to-checkpoint-dir> 是 HDFS 上存储 checkpoint 的目录路径。
- 在代码中启用 checkpoint:
// 开启 checkpoint,并设置 checkpoint 的时间间隔为 10 秒
env.enableCheckpointing(10000);
// 设置 checkpoint 的保存模式为 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置 checkpoint 的超时时间为 1 分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 设置同时进行的 checkpoint 的最大数量为 1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设置 checkpoint 的保存路径
env.setStateBackend(new FsStateBackend("hdfs://<hdfs-namenode>:<port>/<path-to-checkpoint-dir>"));
在代码中,我们首先启用了 checkpoint,并设置了 checkpoint 的时间间隔为 10 秒,保存模式为 EXACTLY_ONCE,超时时间为 1 分钟,同时进行的 checkpoint 的最大数量为 1。然后,我们设置了 checkpoint 的保存路径为 HDFS 上的路径。
完整示例代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
// 解析命令行参数
final ParameterTool params = ParameterTool.fromArgs(args);
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 checkpoint 保存到 HDFS 中
env.setStateBackend(new FsStateBackend("hdfs://<hdfs-namenode>:<port>/<path-to-checkpoint-dir>"));
env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设置 Kafka 参数
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", params.get("bootstrap.servers", "localhost:9092"));
kafkaProps.setProperty("group.id", params.get("group.id", "test-group"));
// 创建 Kafka 消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
params.get("input-topic", "test-topic"),
new SimpleStringSchema(),
kafkaProps
);
// 添加 Kafka 消费者到执行环境中
DataStream<String> stream = env.addSource(consumer);
// 对每条消息进行转换,并输出到控制台
stream.map((MapFunction<String, String>) value -> "Received: " + value)
.addSink(new PrintSinkFunction<>());
// 执行作业
env.execute("Kafka Consumer Example");
}
}
最佳实践:
- 选择合适的 checkpoint 时间间隔,以平衡性能和数据一致性。
- 确保 HDFS 上的 checkpoint 目录有足够的存储空间。
- 使用 EXACTLY_ONCE 模式来保证数据的一致性。
- 监控 checkpoint 的执行情况,及时解决可能出现的问题。
通过以上配置和代码示例,您可以轻松地将 Flink 的 checkpoint 保存到 HDFS,并实现数据的一致性。
原文地址: https://www.cveoy.top/t/topic/mkdy 著作权归作者所有。请勿转载和采集!