Flink 需求开发:1 设置数据迟到标签允许迟到时间为3s。2 使用滑动窗口10s步长5s滑动一次计算传感器最高的温度。3 使用滑动窗口10s步长5s滑动一次计算每次平均成交额打印在制台。4 使用滚动窗口滚动时间为15s求出这段时间内最大成交量。5 使用滚动窗口滚动时间为15s求出这段时间内的平均成交额。6 使用滚动窗口滚动时间为15s求出这段时间内利润的和数据。
(1). 设置数据迟到标签:
DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<...>(Time.seconds(3)) {
@Override
public long extractTimestamp(...) {
return ...;
}
});
(2). 使用滑动窗口,计算传感器最高的温度:
DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
.keyBy(...)
.timeWindow(Time.seconds(10), Time.seconds(5))
.maxBy(...)
(3). 使用滑动窗口,计算每次平均成交额打印在制台:
DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
.keyBy(...)
.timeWindow(Time.seconds(10), Time.seconds(5))
.apply(new WindowFunction<..., ..., ..., TimeWindow>() {
@Override
public void apply(...) {
double sum = 0;
int count = 0;
for (...) {
sum += ...;
count++;
}
double avg = sum / count;
System.out.println(avg);
}
});
(4). 使用滚动窗口,求出这段时间内最大成交量:
DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
.keyBy(...)
.timeWindow(Time.seconds(15))
.maxBy(...)
(5). 使用滚动窗口,求出这段时间内的平均成交额:
DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
.keyBy(...)
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<..., ..., ..., TimeWindow>() {
@Override
public void apply(...) {
double sum = 0;
int count = 0;
for (...) {
sum += ...;
count++;
}
double avg = sum / count;
System.out.println(avg);
}
});
(6). 使用滚动窗口,求出这段时间内利润的和数据:
DataStream<...> input = ...;
SingleOutputStreamOperator<...> result = input
.keyBy(...)
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<..., ..., ..., TimeWindow>() {
@Override
public void apply(...) {
double sum = 0;
for (...) {
sum += ... - ...;
}
System.out.println(sum);
}
});
``
原文地址: http://www.cveoy.top/t/topic/ftmM 著作权归作者所有。请勿转载和采集!