三.实验内容和要求

  1. MySQL数据输出
  • 在MySQL中建立数据库school,并在数据库中建立表student。SQL语句如下:
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
  • 编写配置文件,将student表中的内容输出到控制台。启动Flume,在student表中使用下列命令插入数据,在Kafka消费者中查看相应数据。
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
  1. Kafka链接Flume
  • 编写配置文件,将kafka作为输入,在生产者中输入'HelloFlume'或其他信息,通过Flume将Kafka生产者输入的信息存入hdfs中,存储格式hdfs://localhost:9000/fromkafka/%Y%m%d/,要求存储时文件名为kafka_log。
  • (注:配置好Flume后生产者输入的信息不会实时写入到hdfs,一段时间后批量写入)
  1. 使用Flume写入当前文件系统
  • 编写配置文件,设置文件夹mylogs为source位置,文件夹backup为sink写入位置,实现对文件夹的数据备份。
  • 新建两个文本文本文件1.txt与2.txt,在1.txt中输入Hello Flume,在2.txt中输入hello flume将两个文件拖入mylog,查看backup文件夹中出现的文件及其内容。文件可用记事本打开。

四.实验步骤

  1. MySQL数据输出
  1. 创建数据库和表

在MySQL中执行以下语句,创建数据库和表:

create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
  1. 编写Flume配置文件

在Flume的conf目录下,创建一个名为mysql-to-console.conf的配置文件,内容如下:

# Source配置
mysqldb-source.type = org.apache.flume.source.jdbc.JdbcSource
mysqldb-source.driver = com.mysql.jdbc.Driver
mysqldb-source.url = jdbc:mysql://localhost:3306/school
mysqldb-source.user = root
mysqldb-source.password = root
mysqldb-source.table = student
mysqldb-source.columns.to.select = *
mysqldb-source.batch.size = 1

# Sink配置
console-sink.type = logger

# 配置数据流
mysqldb-source | console-sink
  1. 启动Flume

在Flume的bin目录下,执行以下命令启动Flume:

./flume-ng agent -n mysqldb-agent -c ../conf -f mysql-to-console.conf -Dflume.root.logger=INFO,console
  1. 插入数据并查看输出结果

在MySQL中执行以下命令插入数据:

insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);

可以在控制台看到输出结果如下:

[INFO] [2021-12-20 13:53:24,589] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594589"},"body":"{"id":1,"name":"Xiaoming","age":23,"grade":98}"}
[INFO] [2021-12-20 13:53:24,588] {org.apache.flume.source.jdbc.JdbcSource} - Executing SQL: SELECT * FROM student WHERE id > 1
[INFO] [2021-12-20 13:53:24,590] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594589"},"body":"{"id":2,"name":"Zhangsan","age":24,"grade":96}"}
[INFO] [2021-12-20 13:53:24,591] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594591"},"body":"{"id":3,"name":"Lisi","age":24,"grade":93}"}
[INFO] [2021-12-20 13:53:24,591] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594591"},"body":"{"id":4,"name":"Wangwu","age":21,"grade":91}"}
[INFO] [2021-12-20 13:53:24,592] {org.apache.flume.source.jdbc.JdbcSource} - Found 1 rows
{"headers":{"timestamp":"1640005594592"},"body":"{"id":5,"name":"Weiliu","age":21,"grade":91}"}
  1. Kafka链接Flume
  1. 编写Flume配置文件

在Flume的conf目录下,创建一个名为kafka-to-hdfs.conf的配置文件,内容如下:

# Source配置
kafka-source.type = org.apache.flume.source.kafka.KafkaSource
kafka-source.zookeeperConnect = localhost:2181
kafka-source.topic = test
kafka-source.batchSize = 1000
kafka-source.consumer.groupId = flume

# Sink配置
hdfs-sink.type = hdfs
hdfs-sink.hdfs.path = hdfs://localhost:9000/fromkafka/%Y%m%d/
hdfs-sink.hdfs.filePrefix = kafka_log
hdfs-sink.hdfs.fileType = DataStream
hdfs-sink.hdfs.writeFormat = Text
hdfs-sink.hdfs.rollInterval = 60
hdfs-sink.hdfs.rollSize = 0
hdfs-sink.hdfs.rollCount = 0

# 配置数据流
kafka-source | hdfs-sink
  1. 启动Flume

在Flume的bin目录下,执行以下命令启动Flume:

./flume-ng agent -n kafka-hdfs-agent -c ../conf -f kafka-to-hdfs.conf -Dflume.root.logger=INFO,console
  1. 输入数据并查看结果

在Kafka中输入一段数据,例如'HelloFlume'。

在HDFS中,可以看到在/fromkafka目录下,以当前日期为文件夹名,生成了一个名为kafka_log的文件,其中存储了输入的数据。

  1. 使用Flume写入当前文件系统
  1. 编写Flume配置文件

在Flume的conf目录下,创建一个名为file-to-backup.conf的配置文件,内容如下:

# Source配置
file-source.type = spooldir
file-source.spoolDir = /path/to/mylogs

# Sink配置
backup-sink.type = file_roll
backup-sink.sink.directory = /path/to/backup

# 配置数据流
file-source | backup-sink
  1. 启动Flume

在Flume的bin目录下,执行以下命令启动Flume:

./flume-ng agent -n file-backup-agent -c ../conf -f file-to-backup.conf -Dflume.root.logger=INFO,console
  1. 输入数据并查看结果

在mylogs文件夹中,创建两个文本文件1.txt和2.txt,分别输入'Hello Flume'和'hello flume'。

在backup文件夹中,可以看到生成了两个文件1.txt和2.txt,其中分别存储了输入的内容。

五.实验总结

本次实验主要学习了如何使用Flume进行数据采集和传输,包括从MySQL数据库中读取数据并输出到控制台,使用Kafka作为输入将数据存储到HDFS中,以及将文件系统中的数据备份到另一个目录中。通过实验,深入理解了Flume的工作原理和配置方法,对数据采集和传输有了更深入的认识。

Flume数据采集与传输实验:MySQL、Kafka、HDFS实战

原文地址: https://www.cveoy.top/t/topic/oX7X 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录