使用flink过滤日志数据中的脏数据非JSON格式的字符串并提取ts字段作为最大时间戳生成水位线最大乱序程度为5秒
假设我们的日志数据格式为JSON字符串,包含字段"ts"表示时间戳。可以使用Flink的JSON解析器来解析JSON数据,并过滤掉非法数据。代码示例如下:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
public class LogFilter {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", params.get("bootstrap.servers", "localhost:9092"));
kafkaProps.setProperty("group.id", params.get("group.id", "flink-log-filter"));
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
params.get("input.topic", "log-topic"),
new SimpleStringSchema(),
kafkaProps);
DataStream<String> logStream = env
.addSource(kafkaConsumer)
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
try {
// 尝试解析为JSON对象
JSONObject json = new JSONObject(value);
// 判断是否包含ts字段
return json.has("ts");
} catch (JSONException e) {
// 解析失败,说明不是JSON格式的字符串
return false;
}
}
});
// 提取时间戳,并生成Watermark
DataStream<String> timestampedStream = logStream
.map(new MapFunction<String, Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> map(String value) throws Exception {
JSONObject json = new JSONObject(value);
long ts = json.getLong("ts");
return Tuple2.of(ts, value);
}
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Long, String>>(Time.seconds(5)) {
@Override
public long extractTimestamp(Tuple2<Long, String> element) {
return element.f0;
}
})
.map(new MapFunction<Tuple2<Long, String>, String>() {
@Override
public String map(Tuple2<Long, String> value) throws Exception {
return value.f1;
}
});
// 输出到Kafka
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
params.get("output.topic", "filtered-log-topic"),
new SimpleStringSchema(),
kafkaProps);
timestampedStream.addSink(kafkaProducer);
env.execute("LogFilter");
}
}
在代码中,我们使用了Flink的JSON解析器org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode来解析JSON字符串,判断是否包含"ts"字段。如果解析失败,说明该字符串不是一个合法的JSON格式,就过滤掉。
接着,我们使用map操作提取出时间戳,并使用assignTimestampsAndWatermarks操作生成Watermark。其中,我们使用了BoundedOutOfOrdernessTimestampExtractor作为时间戳分配器,设置最大乱序程度为5秒。
最后,我们将过滤后的日志数据输出到Kafka中
原文地址: http://www.cveoy.top/t/topic/hoFr 著作权归作者所有。请勿转载和采集!