在 Flink 中,可以通过以下方式设置水位线:

  1. 使用 assignTimestampsAndWatermarks 方法设置水位线生成器和事件时间提取器。例如:
DataStream<MyEvent> stream = ...;

stream
  .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(MyEvent element) {
      return element.getTimestamp();
    }
  });
  1. 使用 WatermarkGenerator 接口自定义水位线生成器。例如:
public class MyWatermarkGenerator implements WatermarkGenerator<MyEvent> {
  private long maxTimestamp;

  @Override
  public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
    maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    output.emitWatermark(new Watermark(maxTimestamp - 1000));
  }

  @Override
  public void onPeriodicEmit(WatermarkOutput output) {
    output.emitWatermark(new Watermark(maxTimestamp - 1000));
  }
}

DataStream<MyEvent> stream = ...;

stream
  .assignTimestampsAndWatermarks(WatermarkStrategy.<MyEvent>forGenerator(new MyWatermarkGenerator()));

注意,在使用自定义水位线生成器时,需要在 onEvent 方法中更新最大事件时间戳,并在 onPeriodicEmit 方法中发送水位线

flink中设置水位线

原文地址: http://www.cveoy.top/t/topic/hnRp 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录