Flink SQL 读取 Kafka 数据写入 Hive:实战指南及常见问题解决

本文介绍如何使用 Flink SQL 从 Kafka 读取数据并写入 Hive 表,并解决常见问题,例如数据类型不匹配和时间戳解析错误等。文章还提供了示例代码和详细的步骤,帮助你快速上手 Flink SQL 和 Hive 集成。

1. 准备工作

  • 确保 Flink、Kafka 和 Hive 环境已配置并正常运行。
  • 准备一个包含测试数据的 Kafka 主题,例如:
{'roadGBCode': 'G60', 'totalFlow': 28.0, 'storageTime': 1694677876000, 'occupancy': -1.0, 'roadId': '86400', 'startMilestone': 86400, 'fictitious_road_code': '1001', 'uuid': '79.9760-1186500-1-1311-1.0G6086400467.97864001694677876000169467780000028.086.786', 'speed85': 67.97, 'endMilestone': 86500, 'congestionLevel': -1, 'gap': -1, 'start_hour': '2023-09-14 15:00:00', 'id': -29561485, 'lane': 3, 'direction': 1, 'timestamp': 1694677800000, 'smallCarQuantity': 4, 'headway': -1, 'averageSpeed': 79.97, 'totalSpeed': 86.78, 'largeCarQuantity': 1, 'fictitious_milestone': '21321', 'mediumCarQuantity': 1, 'circle': 60, 'vehicleQuantity': 6}
  • 创建 Hive 表,定义目标表的结构和分区信息。
CREATE TABLE hhy_dw.ods_test_kafka2hive_dt (
	start_milestone	int	comment	'起始断面位置桩号',
	end_milestone	int	comment	'路段结束桩号',
	timestamp	bigint	comment	'起始时间',
	lane	int	comment	'车道号',
	direction	int	comment	'路段方向',
	vehicle_quantity	int	comment	'车道流量',
	average_speed	double	comment	'车道速度',
	total_flow	double	comment	'路段流量'
)
comment	 'kafka2hive 实时测试'
partitioned by (
	ds	string	comment	'分区'
)

2. Flink SQL 代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StreamTableEnvironment;

public class KafkaToHiveExample {

    public static void main(String[] args) throws Exception {

        // 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // Kafka 连接参数
        String kafkaBootstrapServers = 'localhost:9092';
        String kafkaTopic = 'your_topic';
        String groupId = 'your_group_id';

        // 创建 Kafka 源表
        String createKafkaSource = String.format('CREATE TABLE kafka_source (
' +
                '  roadGBCode STRING,
' +
                '  totalFlow DOUBLE,
' +
                '  storageTime BIGINT,
' +
                '  occupancy DOUBLE,
' +
                '  roadId STRING,
' +
                '  startMilestone INT,
' +
                '  fictitious_road_code STRING,
' +
                '  uuid STRING,
' +
                '  speed85 DOUBLE,
' +
                '  endMilestone INT,
' +
                '  congestionLevel INT,
' +
                '  gap INT,
' +
                '  start_hour STRING,
' +
                '  id INT,
' +
                '  lane INT,
' +
                '  direction INT,
' +
                '  timestamp BIGINT,
' +
                '  smallCarQuantity INT,
' +
                '  headway INT,
' +
                '  averageSpeed DOUBLE,
' +
                '  totalSpeed DOUBLE,
' +
                '  largeCarQuantity INT,
' +
                '  fictitious_milestone STRING,
' +
                '  mediumCarQuantity INT,
' +
                '  circle INT,
' +
                '  vehicleQuantity INT
' +
                ') WITH (
' +
                '  'connector' = 'kafka',
' +
                '  'topic' = '%s',
' +
                '  'properties.bootstrap.servers' = '%s',
' +
                '  'properties.group.id' = '%s',
' +
                '  'scan.startup.mode' = 'earliest-offset',
' +
                '  'format' = 'json',
' +
                '  'json.fail-on-missing-field' = 'false'
' +                ')', kafkaTopic, kafkaBootstrapServers, groupId);        tEnv.executeSql(createKafkaSource);

        // 创建 Hive Sink 表
        String createHiveSink = 'CREATE TABLE hive_sink (
' +
                '  start_milestone INT COMMENT '起始断面位置桩号',
' +
                '  end_milestone INT COMMENT '路段结束桩号',
' +
                '  timestamp BIGINT COMMENT '起始时间',
' +
                '  lane INT COMMENT '车道号',
' +
                '  direction INT COMMENT '路段方向',
' +
                '  vehicle_quantity INT COMMENT '车道流量',
' +
                '  average_speed DOUBLE COMMENT '车道速度',
' +
                '  total_flow DOUBLE COMMENT '路段流量'
' +
                ')
' +
                'COMMENT 'kafka2hive 实时测试'
' +
                'PARTITIONED BY (ds STRING COMMENT '分区')
' +
                'STORED AS PARQUET
' +
                'LOCATION 'hdfs://your_hdfs_path'';
        tEnv.executeSql(createHiveSink);

        // 将 Kafka 数据写入 Hive 表
        String insertIntoHive = 'INSERT INTO hive_sink
' +
                'SELECT
' +
                '  startMilestone AS start_milestone,
' +
                '  endMilestone AS end_milestone,
' +
                '  timestamp,
' +
                '  lane,
' +
                '  direction,
' +
                '  vehicleQuantity AS vehicle_quantity,
' +
                '  averageSpeed AS average_speed,
' +
                '  totalFlow AS total_flow
' +
                'FROM kafka_source';
        tEnv.executeSql(insertIntoHive);

        // 启动 Flink 任务
        env.execute();
    }
}

3. 常见问题解决

  • 时间戳解析错误: 确保 Hive 表中的 timestamp 字段定义为 bigint 类型,并确保 Kafka 数据中的时间戳字段类型为 bigintlong 类型,并且值是毫秒级时间戳。
  • 数据类型不匹配: 检查 Flink SQL 中的字段类型定义是否与 Hive 表中定义一致。例如,如果 Hive 表中 startMilestone 字段定义为 int 类型,则 Flink SQL 中的 startMilestone 字段也应该定义为 INT 类型。
  • 分区问题: 确保 Flink SQL 中的 ds 字段与 Hive 表中的 ds 字段保持一致,并且分区值正确。

4. 总结

本文详细介绍了如何使用 Flink SQL 从 Kafka 读取数据并写入 Hive 表,并提供了解决常见问题的建议。通过本文的学习,你可以快速上手 Flink SQL 和 Hive 集成,实现实时数据处理和数据集成。

Flink SQL 读取 Kafka 数据写入 Hive:实战指南及常见问题解决

原文地址: https://www.cveoy.top/t/topic/lJGG 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录