MySQL 数据输出到 Kafka:使用 Flume 实现实时数据同步
1. 创建 MySQL 数据库和数据表
首先,我们需要在 MySQL 中创建名为 school 的数据库,并在该数据库中创建名为 student 的数据表。
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
接下来,向 student 表中插入一些示例数据:
insert into student(id,name,age,grade) values
(1,'Xiaoming',23,98),
(2,'Zhangsan',24,96),
(3,'Lisi',24,93),
(4,'Wangwu',21,91),
(5,'Weiliu',21,91);
2. 配置 Flume
创建 Flume 配置文件 flume.conf,内容如下:
# 定义 agent 名称
agent.sources = mysql-source
agent.sinks = logger-sink
agent.channels = memory-channel
# 配置 source
agent.sources.mysql-source.type = org.apache.flume.source.jdbc.JdbcSource
agent.sources.mysql-source.jdbc.driver = com.mysql.jdbc.Driver
agent.sources.mysql-source.jdbc.url = jdbc:mysql://localhost:3306/school
agent.sources.mysql-source.jdbc.user = root
agent.sources.mysql-source.jdbc.password = root
agent.sources.mysql-source.jdbc.sql = select id,name,age,grade from student
agent.sources.mysql-source.batchSize = 100
agent.sources.mysql-source.incrementalColumn = id
agent.sources.mysql-source.incrementalValue = 0
# 配置 sink
agent.sinks.logger-sink.type = logger
agent.sinks.logger-sink.channel = memory-channel
# 配置 channel
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 1000
agent.channels.memory-channel.transactionCapacity = 100
# 连接 source 和 sink
agent.sources.mysql-source.channels = memory-channel
agent.sinks.logger-sink.channel = memory-channel
3. 启动 Flume
打开终端,进入 Flume 的 bin 目录,并使用以下命令启动 Flume agent:
./flume-ng agent -n agent -c conf -f /path/to/flume.conf -Dflume.root.logger=INFO,console
其中 /path/to/flume.conf 为 Flume 配置文件的路径。
4. 查看 Kafka 消费者
在另一个终端中,进入 Kafka 的 bin 目录,并使用以下命令启动 Kafka 消费者:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic student --from-beginning
您将在控制台中看到类似以下输出:
1,Xiaoming,23,98
2,Zhangsan,24,96
3,Lisi,24,93
4,Wangwu,21,91
5,Weiliu,21,91
这些数据是从 MySQL 的 student 表中读取并经过 Flume 传送到 Kafka 的 student 主题。
注意:
- 确保您的 Kafka Broker 正在运行,并且 Flume 已配置连接到正确的 Kafka Broker 地址。
- 您可以根据实际需求修改 Flume 配置文件中的参数,例如
jdbc.url、jdbc.user、jdbc.password、batchSize等。 - 使用 Flume 可以将 MySQL 中的数据实时输出到 Kafka,从而实现数据同步和实时分析。
原文地址: https://www.cveoy.top/t/topic/oYbx 著作权归作者所有。请勿转载和采集!