flinkcdc读取postgre数据库的分区表时我希望增量消费最新的数据不读取历史数据因此将debeziumsnapshotmode配置为never但是这样却没有读取到数据请问该如何解决
可能是由于FlinkCDC默认只会消费最新的数据,而不会读取历史数据。如果您使用的是PostgreSQL 10或以上版本,并且表是按时间分区的,您可以尝试使用FlinkCDC提供的时间戳提取器来按时间分区读取数据。
您可以通过在FlinkCDC的配置文件中设置以下参数来启用时间戳提取器:
flinkcdc.postgres.time.extractor.mode = auto
flinkcdc.postgres.time.extractor.timestamp.type = wallclock
flinkcdc.postgres.time.extractor.timestamp.field = <your-timestamp-field>
其中,flinkcdc.postgres.time.extractor.mode设置为auto表示开启时间戳提取器,flinkcdc.postgres.time.extractor.timestamp.type设置为wallclock表示使用系统时间作为时间戳,flinkcdc.postgres.time.extractor.timestamp.field设置为您的时间戳字段的名称。
请注意,您需要使用FlinkCDC的bin/flink-cdc命令启动FlinkCDC,才能使用时间戳提取器。如果您使用的是Flink集群,请确保安装了FlinkCDC,并使用flink-cdc run命令启动FlinkCDC。
此外,如果您的表是按时间分区的,您还需要使用FlinkCDC提供的--partitions参数指定分区范围,如下所示:
flink-cdc run --source postgres --source-properties-file postgres.properties --sink kafka --sink-properties-file kafka.properties --partitions "start_partition:end_partition"
其中,start_partition和end_partition分别表示您要读取的分区范围的起始分区和结束分区。请注意,如果您使用的是时间戳提取器,您需要使用时间戳作为分区的名称
原文地址: https://www.cveoy.top/t/topic/fIrj 著作权归作者所有。请勿转载和采集!