可以使用 Flink 中的 interval join 来解决数据乱序或延时导致进度不一致的问题。具体实现步骤如下:
-
创建订单明细流和支付明细流,并对其进行时间戳和 watermark 的设置。
-
将订单明细流和支付明细流通过订单 id 进行关联,使用 interval join,设置时间窗口为 1 分钟(即订单数据和订单明细数据前后相差最大 1 分钟)。
-
在 interval join 的过程中,需要使用 CoGroupFunction 或 FlatJoinFunction 对关联后的数据进行处理,将其拉成宽表。
-
最后,将处理后的数据流保存至指定的数据源中。
示例代码如下:
//创建订单明细流
DataStream orderDetailStream = env.addSource(new OrderDetailSource())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(OrderDetail orderDetail) {
return orderDetail.getOrderTime();
}
});
//创建支付明细流
DataStream paymentInfoStream = env.addSource(new PaymentInfoSource())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(PaymentInfo paymentInfo) {
return paymentInfo.getPaymentTime();
}
});
//将订单明细流和支付明细流通过订单id进行关联,使用interval join,设置时间窗口为1分钟
DataStream orderWideStream = orderDetailStream.keyBy(OrderDetail::getOrderId)
.intervalJoin(paymentInfoStream.keyBy(PaymentInfo::getOrderId))
.between(Time.seconds(-60), Time.seconds(0))
.process(new OrderWideJoin())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(OrderWide orderWide) {
return orderWide.getOrderTime();
}
});
//将处理后的数据流保存至指定的数据源中
orderWideStream.addSink(new FlinkKafkaProducer<>(...));