Flume 实践教程:MySQL 数据输出、Kafka 链接和文件系统写入
三.实验内容和要求
- MySQL 数据输出
在 MySQL 中建立数据库 school,在数据库中建立表 student。SQL 语句如下:
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
编写配置文件,将 student 表中的内容输出到控制台。启动 Flume,在 student 表中使用下列命令插入数据,在 Kafka 消费者中查看相应数据。
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
- Kafka 链接 Flume
编写配置文件,将 kafka 作为输入,在生产者中输入'HelloFlume' 或其他信息,通过 Flume 将 Kafka 生产者输入的信息存入 hdfs 中,存储格式 hdfs://localhost:9000/fromkafka/%Y%m%d/,要求存储时文件名为 kafka_log。
(注:配置好 Flume 后生产者输入的信息不会实时写入到 hdfs,一段时间后批量写入)
- 使用 Flume 写入当前文件系统
编写配置文件,设置文件夹 mylogs 为 source 位置,文件夹 backup 为 sink 写入位置,实现对文件夹的数据备份。
新建两个文本文本文件 1.txt 与 2.txt,在 1.txt 中输入 Hello Flume,在 2.txt 中输入 hello flume 将两个文件拖入 mylog,查看 backup 文件夹中出现的文件及其内容。文件可用记事本打开。
四.实验步骤
- MySQL 数据输出
1.1 准备工作
在 CentOS 7 系统上安装好 MySQL,并创建数据库 school 和表 student,插入数据。
1.2 编写 Flume 配置文件
在 Flume 的 conf 目录下,新建一个名为 mysql.conf 的文件,输入以下内容:
agent.sources = r1
agent.sinks = k1
agent.channels = c1
agent.sources.r1.type = jdbc
agent.sources.r1.driver = com.mysql.jdbc.Driver
agent.sources.r1.url = jdbc:mysql://localhost:3306/school
agent.sources.r1.user = root
agent.sources.r1.password = root
agent.sources.r1.sql = SELECT * FROM student
agent.sources.r1.batchSize = 1000
agent.sources.r1.channels = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000
agent.sinks.k1.type = logger
agent.sinks.k1.channel = c1
1.3 启动 Flume
在 Flume 的 bin 目录下,执行以下命令启动 Flume:
./flume-ng agent --conf conf --conf-file /path/to/mysql.conf --name agent -Dflume.root.logger=INFO,console
1.4 插入数据并查看结果
执行以下命令插入数据:
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
在 Flume 控制台中可以看到相应的输出结果。
- Kafka 链接 Flume
2.1 准备工作
在 CentOS 7 系统上安装好 Kafka 和 Hadoop,并启动 Zookeeper 和 Kafka。
2.2 编写 Flume 配置文件
在 Flume 的 conf 目录下,新建一个名为 kafka.conf 的文件,输入以下内容:
agent.sources = r1
agent.sinks = k1
agent.channels = c1
agent.sources.r1.type = avro
agent.sources.r1.bind = localhost
agent.sources.r1.port = 44444
agent.sources.r1.channels = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://localhost:9000/fromkafka/%Y%m%d/
agent.sinks.k1.hdfs.filePrefix = kafka_log
agent.sinks.k1.hdfs.fileSuffix = .log
agent.sinks.k1.hdfs.rollInterval = 0
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.channel = c1
2.3 启动 Flume
在 Flume 的 bin 目录下,执行以下命令启动 Flume:
./flume-ng agent --conf conf --conf-file /path/to/kafka.conf --name agent -Dflume.root.logger=INFO,console
2.4 发送消息到 Kafka
执行以下命令发送消息到 Kafka:
echo 'Hello Flume' | ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
2.5 查看结果
在 Hadoop 的 namenode 节点上执行以下命令查看结果:
hdfs dfs -ls /fromkafka
可以看到类似以下的输出结果:
drwxr-xr-x - root supergroup 0 2021-10-21 20:06 /fromkafka/20211021
-rw-r--r-- 1 root supergroup 12 2021-10-21 20:06 /fromkafka/20211021/kafka_log.log
其中,kafka_log.log 文件中存储了之前发送的消息。
- 使用 Flume 写入当前文件系统
3.1 准备工作
在 CentOS 7 系统上安装好 Flume。
3.2 编写 Flume 配置文件
在 Flume 的 conf 目录下,新建一个名为 file.conf 的文件,输入以下内容:
agent.sources = r1
agent.sinks = k1
agent.channels = c1
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /path/to/mylogs
agent.sources.r1.fileHeader = false
agent.sources.r1.deserializer.maxLineLength = 500000
agent.sources.r1.channels = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000
agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /path/to/backup
agent.sinks.k1.sink.rollInterval = 0
agent.sinks.k1.sink.rollSize = 0
agent.sinks.k1.sink.rollCount = 0
agent.sinks.k1.channel = c1
3.3 启动 Flume
在 Flume 的 bin 目录下,执行以下命令启动 Flume:
./flume-ng agent --conf conf --conf-file /path/to/file.conf --name agent -Dflume.root.logger=INFO,console
3.4 创建测试文件并拖入 mylogs 文件夹
新建两个文本文本文件 1.txt 和 2.txt,分别输入 Hello Flume 和 hello flume,将两个文件拖入 mylogs 文件夹。
3.5 查看结果
在 backup 文件夹中,可以看到两个文件 1.txt 和 2.txt,其中分别存储了之前输入的内容。可以使用记事本打开文件查看。
原文地址: https://www.cveoy.top/t/topic/oYbo 著作权归作者所有。请勿转载和采集!