Flink SQL 读取 Kafka 数据写入 Hive:实战指南及常见问题解决
Flink SQL 读取 Kafka 数据写入 Hive:实战指南及常见问题解决
本文介绍如何使用 Flink SQL 从 Kafka 读取数据并写入 Hive 表,并解决常见问题,例如数据类型不匹配和时间戳解析错误等。文章还提供了示例代码和详细的步骤,帮助你快速上手 Flink SQL 和 Hive 集成。
1. 准备工作
- 确保 Flink、Kafka 和 Hive 环境已配置并正常运行。
- 准备一个包含测试数据的 Kafka 主题,例如:
{'roadGBCode': 'G60', 'totalFlow': 28.0, 'storageTime': 1694677876000, 'occupancy': -1.0, 'roadId': '86400', 'startMilestone': 86400, 'fictitious_road_code': '1001', 'uuid': '79.9760-1186500-1-1311-1.0G6086400467.97864001694677876000169467780000028.086.786', 'speed85': 67.97, 'endMilestone': 86500, 'congestionLevel': -1, 'gap': -1, 'start_hour': '2023-09-14 15:00:00', 'id': -29561485, 'lane': 3, 'direction': 1, 'timestamp': 1694677800000, 'smallCarQuantity': 4, 'headway': -1, 'averageSpeed': 79.97, 'totalSpeed': 86.78, 'largeCarQuantity': 1, 'fictitious_milestone': '21321', 'mediumCarQuantity': 1, 'circle': 60, 'vehicleQuantity': 6}
- 创建 Hive 表,定义目标表的结构和分区信息。
CREATE TABLE hhy_dw.ods_test_kafka2hive_dt (
start_milestone int comment '起始断面位置桩号',
end_milestone int comment '路段结束桩号',
timestamp bigint comment '起始时间',
lane int comment '车道号',
direction int comment '路段方向',
vehicle_quantity int comment '车道流量',
average_speed double comment '车道速度',
total_flow double comment '路段流量'
)
comment 'kafka2hive 实时测试'
partitioned by (
ds string comment '分区'
)
2. Flink SQL 代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StreamTableEnvironment;
public class KafkaToHiveExample {
public static void main(String[] args) throws Exception {
// 创建 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// Kafka 连接参数
String kafkaBootstrapServers = 'localhost:9092';
String kafkaTopic = 'your_topic';
String groupId = 'your_group_id';
// 创建 Kafka 源表
String createKafkaSource = String.format('CREATE TABLE kafka_source (
' +
' roadGBCode STRING,
' +
' totalFlow DOUBLE,
' +
' storageTime BIGINT,
' +
' occupancy DOUBLE,
' +
' roadId STRING,
' +
' startMilestone INT,
' +
' fictitious_road_code STRING,
' +
' uuid STRING,
' +
' speed85 DOUBLE,
' +
' endMilestone INT,
' +
' congestionLevel INT,
' +
' gap INT,
' +
' start_hour STRING,
' +
' id INT,
' +
' lane INT,
' +
' direction INT,
' +
' timestamp BIGINT,
' +
' smallCarQuantity INT,
' +
' headway INT,
' +
' averageSpeed DOUBLE,
' +
' totalSpeed DOUBLE,
' +
' largeCarQuantity INT,
' +
' fictitious_milestone STRING,
' +
' mediumCarQuantity INT,
' +
' circle INT,
' +
' vehicleQuantity INT
' +
') WITH (
' +
' 'connector' = 'kafka',
' +
' 'topic' = '%s',
' +
' 'properties.bootstrap.servers' = '%s',
' +
' 'properties.group.id' = '%s',
' +
' 'scan.startup.mode' = 'earliest-offset',
' +
' 'format' = 'json',
' +
' 'json.fail-on-missing-field' = 'false'
' + ')', kafkaTopic, kafkaBootstrapServers, groupId); tEnv.executeSql(createKafkaSource);
// 创建 Hive Sink 表
String createHiveSink = 'CREATE TABLE hive_sink (
' +
' start_milestone INT COMMENT '起始断面位置桩号',
' +
' end_milestone INT COMMENT '路段结束桩号',
' +
' timestamp BIGINT COMMENT '起始时间',
' +
' lane INT COMMENT '车道号',
' +
' direction INT COMMENT '路段方向',
' +
' vehicle_quantity INT COMMENT '车道流量',
' +
' average_speed DOUBLE COMMENT '车道速度',
' +
' total_flow DOUBLE COMMENT '路段流量'
' +
')
' +
'COMMENT 'kafka2hive 实时测试'
' +
'PARTITIONED BY (ds STRING COMMENT '分区')
' +
'STORED AS PARQUET
' +
'LOCATION 'hdfs://your_hdfs_path'';
tEnv.executeSql(createHiveSink);
// 将 Kafka 数据写入 Hive 表
String insertIntoHive = 'INSERT INTO hive_sink
' +
'SELECT
' +
' startMilestone AS start_milestone,
' +
' endMilestone AS end_milestone,
' +
' timestamp,
' +
' lane,
' +
' direction,
' +
' vehicleQuantity AS vehicle_quantity,
' +
' averageSpeed AS average_speed,
' +
' totalFlow AS total_flow
' +
'FROM kafka_source';
tEnv.executeSql(insertIntoHive);
// 启动 Flink 任务
env.execute();
}
}
3. 常见问题解决
- 时间戳解析错误: 确保 Hive 表中的
timestamp字段定义为bigint类型,并确保 Kafka 数据中的时间戳字段类型为bigint或long类型,并且值是毫秒级时间戳。 - 数据类型不匹配: 检查 Flink SQL 中的字段类型定义是否与 Hive 表中定义一致。例如,如果 Hive 表中
startMilestone字段定义为int类型,则 Flink SQL 中的startMilestone字段也应该定义为INT类型。 - 分区问题: 确保 Flink SQL 中的
ds字段与 Hive 表中的ds字段保持一致,并且分区值正确。
4. 总结
本文详细介绍了如何使用 Flink SQL 从 Kafka 读取数据并写入 Hive 表,并提供了解决常见问题的建议。通过本文的学习,你可以快速上手 Flink SQL 和 Hive 集成,实现实时数据处理和数据集成。
原文地址: https://www.cveoy.top/t/topic/lJGG 著作权归作者所有。请勿转载和采集!