(1). 设置数据迟到标签:

DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<...>(Time.seconds(3)) {
        @Override
        public long extractTimestamp(...) {
            return ...;
        }
    });

(2). 使用滑动窗口,计算传感器最高的温度:

DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
    .keyBy(...)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .maxBy(...)

(3). 使用滑动窗口,计算每次平均成交额打印在制台:

DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
    .keyBy(...)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .apply(new WindowFunction<..., ..., ..., TimeWindow>() {
        @Override
        public void apply(...) {
            double sum = 0;
            int count = 0;
            for (...) {
                sum += ...;
                count++;
            }
            double avg = sum / count;
            System.out.println(avg);
        }
    });

(4). 使用滚动窗口,求出这段时间内最大成交量:

DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
    .keyBy(...)
    .timeWindow(Time.seconds(15))
    .maxBy(...)

(5). 使用滚动窗口,求出这段时间内的平均成交额:

DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
    .keyBy(...)
    .timeWindow(Time.seconds(15))
    .apply(new WindowFunction<..., ..., ..., TimeWindow>() {
        @Override
        public void apply(...) {
            double sum = 0;
            int count = 0;
            for (...) {
                sum += ...;
                count++;
            }
            double avg = sum / count;
            System.out.println(avg);
        }
    });

(6). 使用滚动窗口,求出这段时间内利润的和数据:

DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
    .keyBy(...)
    .timeWindow(Time.seconds(15))
    .apply(new WindowFunction<..., ..., ..., TimeWindow>() {
        @Override
        public void apply(...) {
            double sum = 0;
            for (...) {
                sum += ... - ...;
            }
            System.out.println(sum);
        }
    });
``

原文地址: http://www.cveoy.top/t/topic/ftmM 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录