flink中设置水位线
在 Flink 中,可以通过以下方式设置水位线:
- 使用
assignTimestampsAndWatermarks方法设置水位线生成器和事件时间提取器。例如:
DataStream<MyEvent> stream = ...;
stream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getTimestamp();
}
});
- 使用
WatermarkGenerator接口自定义水位线生成器。例如:
public class MyWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private long maxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
output.emitWatermark(new Watermark(maxTimestamp - 1000));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - 1000));
}
}
DataStream<MyEvent> stream = ...;
stream
.assignTimestampsAndWatermarks(WatermarkStrategy.<MyEvent>forGenerator(new MyWatermarkGenerator()));
注意,在使用自定义水位线生成器时,需要在 onEvent 方法中更新最大事件时间戳,并在 onPeriodicEmit 方法中发送水位线
原文地址: http://www.cveoy.top/t/topic/hnRp 著作权归作者所有。请勿转载和采集!