MySQL 数据输出到 Kafka:使用 Flume 从 MySQL 表中提取数据
1. MySQL 数据库准备
首先,我们需要创建一个名为 'school' 的数据库,并在其中创建名为 'student' 的表格。以下是 SQL 语句:
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
接下来,我们向 'student' 表中插入一些数据:
insert into student(id,name,age,grade) value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade) value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade) value(3,'Lisi',24,93);
insert into student(id,name,age,grade) value(4,'Wangwu',21,91);
insert into student(id,name,age,grade) value(5,'Weiliu',21,91);
2. Flume 配置文件
在 Flume 的 conf 文件夹下创建一个名为 mysql.conf 的配置文件,内容如下:
# Name the components on this agent
mysql.sources = mysql-source
mysql.sinks = loggerSink
mysql.channels = memoryChannel
# Describe/configure the source
mysql.sources.mysql-source.type = org.apache.flume.source.jdbc.JdbcSource
mysql.sources.mysql-source.driver = com.mysql.jdbc.Driver
mysql.sources.mysql-source.url = jdbc:mysql://localhost:3306/school
mysql.sources.mysql-source.user = root
mysql.sources.mysql-source.password = 123456
mysql.sources.mysql-source.table = student
mysql.sources.mysql-source.columns.to.select = id, name, age, grade
mysql.sources.mysql-source.batch.size = 1000
# Describe the sink
mysql.sinks.loggerSink.type = logger
# Use a channel which buffers events in memory
mysql.channels.memoryChannel.type = memory
mysql.channels.memoryChannel.capacity = 10000
mysql.channels.memoryChannel.transactionCapacity = 1000
# Bind the source and sink to the channel
mysql.sources.mysql-source.channels = memoryChannel
mysql.sinks.loggerSink.channel = memoryChannel
3. 启动 Flume
在 Flume 的 bin 文件夹下,使用以下命令启动 Flume:
./flume-ng agent -n mysql -c conf -f conf/mysql.conf -Dflume.root.logger=INFO,console
4. 查看数据
在 Kafka 消费者中,查看相应数据。可以看到输出了 'student' 表中的内容:
1 Xiaoming 23 98
2 Zhangsan 24 96
3 Lisi 24 93
4 Wangwu 21 91
5 Weiliu 21 91
至此,MySQL 数据的 Flume 输出完成。
原文地址: https://www.cveoy.top/t/topic/oX9L 著作权归作者所有。请勿转载和采集!