Flink 流数据时间戳转换为时间格式
在 Flink 中,可以使用org.apache.flink.api.common.functions.MapFunction或org.apache.flink.api.common.functions.RichMapFunction来将流数据中的时间戳转换为时间格式。
下面是一个示例代码,演示如何将流数据中的时间戳转换为时间格式:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.text.SimpleDateFormat;
import java.util.Date;
public class TimestampToTime {
public static void main(String[] args) throws Exception {
// 设置流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 创建流数据源
DataStream<String> stream = env.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(1)) {
@Override
public long extractTimestamp(String element) {
// 从流数据中提取时间戳
long timestamp = Long.parseLong(element.split(",")[0]);
return timestamp;
}
});
// 将时间戳转换为时间格式
DataStream<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
long timestamp = Long.parseLong(value.split(",")[0]);
Date date = new Date(timestamp);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(date) + "," + value.split(",")[1];
}
});
// 打印结果
result.print();
// 执行流任务
env.execute("Timestamp to Time");
}
}
在上述代码中,首先通过assignTimestampsAndWatermarks方法为流数据设置事件时间,并提取时间戳。然后使用map方法将时间戳转换为时间格式,并将结果打印出来。
注意,上述代码中的时间戳需要是毫秒级别的,如果是秒级别的时间戳,需要进行转换。另外,代码中使用了SimpleDateFormat来定义时间格式,可以根据实际需求进行调整。
原文地址: https://www.cveoy.top/t/topic/o2b8 著作权归作者所有。请勿转载和采集!