使用 flinkcep 正确预警在一分钟内连续 3 次温度超过 70 度的设备
下面是使用 flinkcep 实现预警的代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class TemperatureAlert {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 数据源为Tuple2<String, Double>类型,表示设备ID和温度值
DataStream<Tuple2<String, Double>> temperatureStream = env.fromElements(
new Tuple2<>("device1", 68.0),
new Tuple2<>("device1", 72.0),
new Tuple2<>("device1", 75.0),
new Tuple2<>("device1", 76.0),
new Tuple2<>("device1", 71.0),
new Tuple2<>("device1", 73.0),
new Tuple2<>("device1", 78.0),
new Tuple2<>("device1", 80.0),
new Tuple2<>("device1", 81.0),
new Tuple2<>("device1", 82.0),
new Tuple2<>("device1", 77.0),
new Tuple2<>("device1", 73.0),
new Tuple2<>("device1", 72.0),
new Tuple2<>("device1", 71.0),
new Tuple2<>("device1", 70.0),
new Tuple2<>("device1", 69.0),
new Tuple2<>("device1", 68.0),
new Tuple2<>("device1", 72.0),
new Tuple2<>("device1", 74.0),
new Tuple2<>("device1", 80.0),
new Tuple2<>("device1", 82.0),
new Tuple2<>("device1", 83.0),
new Tuple2<>("device1", 82.0),
new Tuple2<>("device1", 77.0),
new Tuple2<>("device1", 73.0),
new Tuple2<>("device1", 72.0),
new Tuple2<>("device1", 71.0),
new Tuple2<>("device1", 70.0),
new Tuple2<>("device1", 69.0),
new Tuple2<>("device1", 68.0),
new Tuple2<>("device1", 72.0),
new Tuple2<>("device1", 74.0),
new Tuple2<>("device1", 80.0),
new Tuple2<>("device1", 82.0),
new Tuple2<>("device1", 83.0),
new Tuple2<>("device1", 82.0),
new Tuple2<>("device1", 77.0),
new Tuple2<>("device1", 73.0),
new Tuple2<>("device1", 72.0),
new Tuple2<>("device1", 71.0),
new Tuple2<>("device1", 70.0),
new Tuple2<>("device1", 69.0),
new Tuple2<>("device1", 68.0)
);
// 定义CEP模式
Pattern<Tuple2<String, Double>, ?> pattern = Pattern.<Tuple2<String, Double>>begin("first")
.where(new IterativeCondition<Tuple2<String, Double>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple2<String, Double> value, Context<Tuple2<String, Double>> ctx) throws Exception {
// 过滤温度值大于等于70的数据
return value.f1 >= 70;
}
})
.followedBy("second")
.where(new IterativeCondition<Tuple2<String, Double>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple2<String, Double> value, Context<Tuple2<String, Double>> ctx) throws Exception {
// 过滤温度值大于等于70的数据
return value.f1 >= 70;
}
})
.followedBy("third")
.where(new IterativeCondition<Tuple2<String, Double>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple2<String, Double> value, Context<Tuple2<String, Double>> ctx) throws Exception {
// 过滤温度值大于等于70的数据
return value.f1 >= 70;
}
})
.within(Time.minutes(1));
// 应用CEP模式
PatternStream<Tuple2<String, Double>> patternStream = CEP.pattern(temperatureStream.keyBy(0), pattern);
// 提取符合模式的事件
DataStream<String> alertStream = patternStream.flatSelect(
(Map<String, List<Tuple2<String, Double>>> patternMap, Collector<String> out) -> {
String deviceId = patternMap.get("third").get(0).f0;
out.collect("Alert: device " + deviceId + " has exceeded the temperature limit for 3 times in 1 minute!");
});
// 打印预警信息
alertStream.print();
// 执行程序
env.execute("Temperature Alert");
}
}
代码中,我们先创建了一个 Tuple2<String, Double> 类型的数据源,表示设备ID和温度值。然后,定义了一个 CEP 模式,该模式表示在一分钟内连续 3 次温度超过 70 度的事件。最后,应用该模式,提取符合模式的事件,并打印预警信息。
在运行程序时,可以看到以下输出:
Alert: device device1 has exceeded the temperature limit for 3 times in 1 minute!
``
原文地址: https://www.cveoy.top/t/topic/ggnn 著作权归作者所有。请勿转载和采集!