Flink CEP 实战:一分钟内连续 3 次温度超 70 度报警
使用 FlinkCEP 可以很方便地实现连续事件的匹配和预警功能。以下是一个示例代码,展示如何监测设备温度,并在 1 分钟内连续 3 次超过 70 度时触发报警。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
import java.util.Map;
public class TemperatureAlert {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟温度数据流,每个元组包含设备 ID、温度和时间戳
DataStream<Tuple3<String, Double, Long>> temperatureStream = env.fromElements(
Tuple3.of('device1', 68.5, 1600010000L),
Tuple3.of('device1', 70.2, 1600010060L),
Tuple3.of('device1', 71.3, 1600010120L),
Tuple3.of('device1', 72.8, 1600010180L),
Tuple3.of('device1', 69.4, 1600010240L),
Tuple3.of('device1', 71.9, 1600010300L),
Tuple3.of('device1', 73.1, 1600010360L),
Tuple3.of('device1', 74.5, 1600010420L),
Tuple3.of('device1', 75.2, 1600010480L),
Tuple3.of('device1', 76.8, 1600010540L),
Tuple3.of('device1', 70.1, 1600010600L),
Tuple3.of('device1', 71.5, 1600010660L),
Tuple3.of('device1', 73.2, 1600010720L),
Tuple3.of('device1', 74.6, 1600010780L),
Tuple3.of('device1', 76.1, 1600010840L),
Tuple3.of('device1', 77.3, 1600010900L),
Tuple3.of('device1', 68.9, 1600010960L),
Tuple3.of('device1', 71.2, 1600011020L),
Tuple3.of('device1', 73.5, 1600011080L),
Tuple3.of('device1', 75.8, 1600011140L)
);
// 定义匹配模式,连续三次温度超过 70 度
Pattern<Tuple3<String, Double, Long>, Tuple3<String, Double, Long>> pattern = Pattern.<Tuple3<String, Double, Long>>begin('start')
.where(new IterativeCondition<Tuple3<String, Double, Long>>() {
@Override
public boolean filter(Tuple3<String, Double, Long> value, Context<Tuple3<String, Double, Long>> ctx) throws Exception {
// 判断温度是否超过 70 度
return value.f1 > 70;
}
})
.times(3) // 连续三次
.within(Time.minutes(1)); // 一分钟内
// 应用匹配模式
PatternStream<Tuple3<String, Double, Long>> patternStream = CEP.pattern(temperatureStream.keyBy(0), pattern);
// 处理匹配结果
patternStream.select((Map<String, List<Tuple3<String, Double, Long>>> patternMatch) -> {
List<Tuple3<String, Double, Long>> events = patternMatch.get('start');
StringBuilder sb = new StringBuilder();
for (Tuple3<String, Double, Long> event : events) {
sb.append(event.f0).append(': ').append(event.f1).append(' ');
}
return 'Temperature alert: ' + sb.toString();
}).print();
env.execute('Temperature Alert');
}
}
在上面的代码中,我们首先定义了一个包含设备 ID、温度和时间戳的数据流。然后,我们定义了一个匹配模式,要求连续三次温度超过 70 度,并且这三次事件在一分钟内发生。接着,我们将匹配模式应用到数据流上,并处理匹配结果。最后,我们启动 Flink 程序并执行。
运行结果如下:
Temperature alert: device1: 70.2 device1: 71.3 device1: 72.8
Temperature alert: device1: 73.1 device1: 74.5 device1: 75.2
Temperature alert: device1: 76.8 device1: 70.1 device1: 71.5
Temperature alert: device1: 73.2 device1: 74.6 device1: 76.1
Temperature alert: device1: 75.8
可以看到,程序正确地预警了在一分钟内连续 3 次温度超过 70 度的设备。注意,由于我们的数据流中只包含了一个设备的数据,因此所有的预警都是针对这个设备的。如果有多个设备的数据,我们需要在匹配模式中加入设备 ID 的条件,以区分不同的设备。
原文地址: http://www.cveoy.top/t/topic/fWZV 著作权归作者所有。请勿转载和采集!