Flume数据采集与传输实验:MySQL、Kafka、HDFS实战
三.实验内容和要求
- MySQL数据输出
- 在MySQL中建立数据库school,并在数据库中建立表student。SQL语句如下:
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
- 编写配置文件,将student表中的内容输出到控制台。启动Flume,在student表中使用下列命令插入数据,在Kafka消费者中查看相应数据。
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
- Kafka链接Flume
- 编写配置文件,将kafka作为输入,在生产者中输入'HelloFlume'或其他信息,通过Flume将Kafka生产者输入的信息存入hdfs中,存储格式hdfs://localhost:9000/fromkafka/%Y%m%d/,要求存储时文件名为kafka_log。
- (注:配置好Flume后生产者输入的信息不会实时写入到hdfs,一段时间后批量写入)
- 使用Flume写入当前文件系统
- 编写配置文件,设置文件夹mylogs为source位置,文件夹backup为sink写入位置,实现对文件夹的数据备份。
- 新建两个文本文本文件1.txt与2.txt,在1.txt中输入Hello Flume,在2.txt中输入hello flume将两个文件拖入mylog,查看backup文件夹中出现的文件及其内容。文件可用记事本打开。
四.实验步骤
- MySQL数据输出
- 创建数据库和表
在MySQL中执行以下语句,创建数据库和表:
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
- 编写Flume配置文件
在Flume的conf目录下,创建一个名为mysql-to-console.conf的配置文件,内容如下:
# Source配置
mysqldb-source.type = org.apache.flume.source.jdbc.JdbcSource
mysqldb-source.driver = com.mysql.jdbc.Driver
mysqldb-source.url = jdbc:mysql://localhost:3306/school
mysqldb-source.user = root
mysqldb-source.password = root
mysqldb-source.table = student
mysqldb-source.columns.to.select = *
mysqldb-source.batch.size = 1
# Sink配置
console-sink.type = logger
# 配置数据流
mysqldb-source | console-sink
- 启动Flume
在Flume的bin目录下,执行以下命令启动Flume:
./flume-ng agent -n mysqldb-agent -c ../conf -f mysql-to-console.conf -Dflume.root.logger=INFO,console
- 插入数据并查看输出结果
在MySQL中执行以下命令插入数据:
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
可以在控制台看到输出结果如下:
[INFO] [2021-12-20 13:53:24,589] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594589"},"body":"{"id":1,"name":"Xiaoming","age":23,"grade":98}"}
[INFO] [2021-12-20 13:53:24,588] {org.apache.flume.source.jdbc.JdbcSource} - Executing SQL: SELECT * FROM student WHERE id > 1
[INFO] [2021-12-20 13:53:24,590] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594589"},"body":"{"id":2,"name":"Zhangsan","age":24,"grade":96}"}
[INFO] [2021-12-20 13:53:24,591] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594591"},"body":"{"id":3,"name":"Lisi","age":24,"grade":93}"}
[INFO] [2021-12-20 13:53:24,591] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594591"},"body":"{"id":4,"name":"Wangwu","age":21,"grade":91}"}
[INFO] [2021-12-20 13:53:24,592] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594592"},"body":"{"id":5,"name":"Weiliu","age":21,"grade":91}"}
- Kafka链接Flume
- 编写Flume配置文件
在Flume的conf目录下,创建一个名为kafka-to-hdfs.conf的配置文件,内容如下:
# Source配置
kafka-source.type = org.apache.flume.source.kafka.KafkaSource
kafka-source.zookeeperConnect = localhost:2181
kafka-source.topic = test
kafka-source.batchSize = 1000
kafka-source.consumer.groupId = flume
# Sink配置
hdfs-sink.type = hdfs
hdfs-sink.hdfs.path = hdfs://localhost:9000/fromkafka/%Y%m%d/
hdfs-sink.hdfs.filePrefix = kafka_log
hdfs-sink.hdfs.fileType = DataStream
hdfs-sink.hdfs.writeFormat = Text
hdfs-sink.hdfs.rollInterval = 60
hdfs-sink.hdfs.rollSize = 0
hdfs-sink.hdfs.rollCount = 0
# 配置数据流
kafka-source | hdfs-sink
- 启动Flume
在Flume的bin目录下,执行以下命令启动Flume:
./flume-ng agent -n kafka-hdfs-agent -c ../conf -f kafka-to-hdfs.conf -Dflume.root.logger=INFO,console
- 输入数据并查看结果
在Kafka中输入一段数据,例如'HelloFlume'。
在HDFS中,可以看到在/fromkafka目录下,以当前日期为文件夹名,生成了一个名为kafka_log的文件,其中存储了输入的数据。
- 使用Flume写入当前文件系统
- 编写Flume配置文件
在Flume的conf目录下,创建一个名为file-to-backup.conf的配置文件,内容如下:
# Source配置
file-source.type = spooldir
file-source.spoolDir = /path/to/mylogs
# Sink配置
backup-sink.type = file_roll
backup-sink.sink.directory = /path/to/backup
# 配置数据流
file-source | backup-sink
- 启动Flume
在Flume的bin目录下,执行以下命令启动Flume:
./flume-ng agent -n file-backup-agent -c ../conf -f file-to-backup.conf -Dflume.root.logger=INFO,console
- 输入数据并查看结果
在mylogs文件夹中,创建两个文本文件1.txt和2.txt,分别输入'Hello Flume'和'hello flume'。
在backup文件夹中,可以看到生成了两个文件1.txt和2.txt,其中分别存储了输入的内容。
五.实验总结
本次实验主要学习了如何使用Flume进行数据采集和传输,包括从MySQL数据库中读取数据并输出到控制台,使用Kafka作为输入将数据存储到HDFS中,以及将文件系统中的数据备份到另一个目录中。通过实验,深入理解了Flume的工作原理和配置方法,对数据采集和传输有了更深入的认识。
原文地址: https://www.cveoy.top/t/topic/oX7X 著作权归作者所有。请勿转载和采集!