可以使用 Flink 中的 interval join 来解决数据乱序或延时导致进度不一致的问题。具体实现步骤如下:

  1. 创建订单明细流和支付明细流,并对其进行时间戳和 watermark 的设置。

  2. 将订单明细流和支付明细流通过订单 id 进行关联,使用 interval join,设置时间窗口为 1 分钟(即订单数据和订单明细数据前后相差最大 1 分钟)。

  3. 在 interval join 的过程中,需要使用 CoGroupFunction 或 FlatJoinFunction 对关联后的数据进行处理,将其拉成宽表。

  4. 最后,将处理后的数据流保存至指定的数据源中。

示例代码如下:

//创建订单明细流 DataStream orderDetailStream = env.addSource(new OrderDetailSource()) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { @Override public long extractAscendingTimestamp(OrderDetail orderDetail) { return orderDetail.getOrderTime(); } });

//创建支付明细流 DataStream paymentInfoStream = env.addSource(new PaymentInfoSource()) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { @Override public long extractAscendingTimestamp(PaymentInfo paymentInfo) { return paymentInfo.getPaymentTime(); } });

//将订单明细流和支付明细流通过订单id进行关联,使用interval join,设置时间窗口为1分钟 DataStream orderWideStream = orderDetailStream.keyBy(OrderDetail::getOrderId) .intervalJoin(paymentInfoStream.keyBy(PaymentInfo::getOrderId)) .between(Time.seconds(-60), Time.seconds(0)) .process(new OrderWideJoin()) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { @Override public long extractAscendingTimestamp(OrderWide orderWide) { return orderWide.getOrderTime(); } });

//将处理后的数据流保存至指定的数据源中 orderWideStream.addSink(new FlinkKafkaProducer<>(...));

Flink 流处理:使用 Interval Join 关联订单明细和支付明细数据

原文地址: https://www.cveoy.top/t/topic/op4P 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录