下面是使用 flinkcep 实现预警的代码:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

public class TemperatureAlert {

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 数据源为Tuple2<String, Double>类型,表示设备ID和温度值
        DataStream<Tuple2<String, Double>> temperatureStream = env.fromElements(
                new Tuple2<>("device1", 68.0),
                new Tuple2<>("device1", 72.0),
                new Tuple2<>("device1", 75.0),
                new Tuple2<>("device1", 76.0),
                new Tuple2<>("device1", 71.0),
                new Tuple2<>("device1", 73.0),
                new Tuple2<>("device1", 78.0),
                new Tuple2<>("device1", 80.0),
                new Tuple2<>("device1", 81.0),
                new Tuple2<>("device1", 82.0),
                new Tuple2<>("device1", 77.0),
                new Tuple2<>("device1", 73.0),
                new Tuple2<>("device1", 72.0),
                new Tuple2<>("device1", 71.0),
                new Tuple2<>("device1", 70.0),
                new Tuple2<>("device1", 69.0),
                new Tuple2<>("device1", 68.0),
                new Tuple2<>("device1", 72.0),
                new Tuple2<>("device1", 74.0),
                new Tuple2<>("device1", 80.0),
                new Tuple2<>("device1", 82.0),
                new Tuple2<>("device1", 83.0),
                new Tuple2<>("device1", 82.0),
                new Tuple2<>("device1", 77.0),
                new Tuple2<>("device1", 73.0),
                new Tuple2<>("device1", 72.0),
                new Tuple2<>("device1", 71.0),
                new Tuple2<>("device1", 70.0),
                new Tuple2<>("device1", 69.0),
                new Tuple2<>("device1", 68.0),
                new Tuple2<>("device1", 72.0),
                new Tuple2<>("device1", 74.0),
                new Tuple2<>("device1", 80.0),
                new Tuple2<>("device1", 82.0),
                new Tuple2<>("device1", 83.0),
                new Tuple2<>("device1", 82.0),
                new Tuple2<>("device1", 77.0),
                new Tuple2<>("device1", 73.0),
                new Tuple2<>("device1", 72.0),
                new Tuple2<>("device1", 71.0),
                new Tuple2<>("device1", 70.0),
                new Tuple2<>("device1", 69.0),
                new Tuple2<>("device1", 68.0)
        );

        // 定义CEP模式
        Pattern<Tuple2<String, Double>, ?> pattern = Pattern.<Tuple2<String, Double>>begin("first")
                .where(new IterativeCondition<Tuple2<String, Double>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(Tuple2<String, Double> value, Context<Tuple2<String, Double>> ctx) throws Exception {
                        // 过滤温度值大于等于70的数据
                        return value.f1 >= 70;
                    }
                })
                .followedBy("second")
                .where(new IterativeCondition<Tuple2<String, Double>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(Tuple2<String, Double> value, Context<Tuple2<String, Double>> ctx) throws Exception {
                        // 过滤温度值大于等于70的数据
                        return value.f1 >= 70;
                    }
                })
                .followedBy("third")
                .where(new IterativeCondition<Tuple2<String, Double>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(Tuple2<String, Double> value, Context<Tuple2<String, Double>> ctx) throws Exception {
                        // 过滤温度值大于等于70的数据
                        return value.f1 >= 70;
                    }
                })
                .within(Time.minutes(1));

        // 应用CEP模式
        PatternStream<Tuple2<String, Double>> patternStream = CEP.pattern(temperatureStream.keyBy(0), pattern);

        // 提取符合模式的事件
        DataStream<String> alertStream = patternStream.flatSelect(
                (Map<String, List<Tuple2<String, Double>>> patternMap, Collector<String> out) -> {
                    String deviceId = patternMap.get("third").get(0).f0;
                    out.collect("Alert: device " + deviceId + " has exceeded the temperature limit for 3 times in 1 minute!");
                });

        // 打印预警信息
        alertStream.print();

        // 执行程序
        env.execute("Temperature Alert");
    }
}

代码中,我们先创建了一个 Tuple2<String, Double> 类型的数据源,表示设备ID和温度值。然后,定义了一个 CEP 模式,该模式表示在一分钟内连续 3 次温度超过 70 度的事件。最后,应用该模式,提取符合模式的事件,并打印预警信息。

在运行程序时,可以看到以下输出:

Alert: device device1 has exceeded the temperature limit for 3 times in 1 minute!
``
使用 flinkcep 正确预警在一分钟内连续 3 次温度超过 70 度的设备

原文地址: https://www.cveoy.top/t/topic/ggnn 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录