使用 Flink SQL 从 Kafka 读取数据写入 Hive 时,可能会遇到 timestamp 类型不匹配的问题。在 Flink SQL 中,Kafka 的 timestamp 默认以 bigint 类型解析,而在 Hive 中,timestamp 类型是以字符串的形式存储的,需要进行转换。

问题原因:

由于 Flink SQL 中的 Kafka 数据的 timestamp 字段为 bigint 类型,而 Hive 表中定义的 timestamp 字段为 string 类型,导致数据写入时无法匹配。

解决方案:

  1. **修改 Hive 建表语句:**将 Hive 表中 timestamp 字段的类型改为 string 类型。
create table hhy_dw.ods_test_kafka2hive_dt (
    start_milestone int comment '起始断面位置桩号',
    end_milestone int comment '路段结束桩号',
    timestamp string comment '起始时间',
    lane int comment '车道号',
    direction int comment '路段方向',
    vehicle_quantity int comment '车道流量',
    average_speed double comment '车道速度',
    total_flow double comment '路段流量'
)
comment 'kafka2hive 实时测试'
partitioned by (
    ds string comment '分区'
)
  1. **使用 Flink SQL 的 TO_TIMESTAMP() 函数:**在 Flink SQL 中,可以使用 TO_TIMESTAMP() 函数将 bigint 类型的 timestamp 转换为 Hive 中的 timestamp 类型。
create table hhy_dw.ods_test_kafka2hive_dt (
    start_milestone int comment '起始断面位置桩号',
    end_milestone int comment '路段结束桩号',
    timestamp timestamp comment '起始时间',
    lane int comment '车道号',
    direction int comment '路段方向',
    vehicle_quantity int comment '车道流量',
    average_speed double comment '车道速度',
    total_flow double comment '路段流量'
)
comment 'kafka2hive 实时测试'
partitioned by (
    ds string comment '分区'
)

在 Flink SQL 查询数据时,可以使用 TO_TIMESTAMP() 函数将 timestamp 转换为 bigint 类型,以便进行计算和比较。

SELECT start_milestone, end_milestone, TO_TIMESTAMP(timestamp) as timestamp, lane, direction, vehicle_quantity, average_speed, total_flow
FROM ods_test_kafka2hive_dt;

示例 Kafka 数据:

{'roadGBCode': 'G60', 'totalFlow': 28.0, 'storageTime': 1694677876000, 'occupancy': -1.0, 'roadId': '86400', 'startMilestone': 86400, 'fictitious_road_code': '1001', 'uuid': '79.9760-1186500-1-1311-1.0G6086400467.97864001694677876000169467780000028.086.786', 'speed85': 67.97, 'endMilestone': 86500, 'congestionLevel': -1, 'gap': -1, 'start_hour': '2023-09-14 15:00:00', 'id': -29561485, 'lane': 3, 'direction': 1, 'timestamp': 1694677800000, 'smallCarQuantity': 4, 'headway': -1, 'averageSpeed': 79.97, 'totalSpeed': 86.78, 'largeCarQuantity': 1, 'fictitious_milestone': '21321', 'mediumCarQuantity': 1, 'circle': 60, 'vehicleQuantity': 6}

Hive 建表语句:

create table hhy_dw.ods_test_kafka2hive_dt (
    start_milestone int comment '起始断面位置桩号',
    end_milestone int comment '路段结束桩号',
    timestamp bigint comment '起始时间',
    lane int comment '车道号',
    direction int comment '路段方向',
    vehicle_quantity int comment '车道流量',
    average_speed double comment '车道速度',
    total_flow double comment '路段流量'
)
comment 'kafka2hive 实时测试'
partitioned by (
    ds string comment '分区'
)

通过以上方法,可以有效解决 Flink SQL 读取 Kafka 数据写入 Hive 时遇到的 timestamp 类型不匹配问题,确保数据能够正确地写入 Hive 表。

Flink SQL 读取 Kafka 数据写入 Hive - 解决 timestamp 类型不匹配问题

原文地址: https://www.cveoy.top/t/topic/lJE3 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录