Flume 入门指南:基本使用方法详解
Flume 入门指南:基本使用方法详解
一. 实验目的
- 了解并熟悉 Flume 的功能
- 掌握 Flume 的使用方法,学会按要求编写相关配置文件
二. 实验平台
- Flume 版本:1.9.0
- Kafka 版本:2.12
- MySQL 版本:8.0.23
- Hadoop 版本:3.1.3
三.实验内容和要求
1. MySQL 数据输出
- 在 MySQL 中建立数据库
school,并在数据库中建立表student。SQL 语句如下:
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
- 编写配置文件,将
student表中的内容输出到控制台。启动 Flume,在student表中使用下列命令插入数据,在 Kafka 消费者中查看相应数据。
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
2. Kafka 链接 Flume
- 编写配置文件,将 Kafka 作为输入,在生产者中输入 'HelloFlume' 或其他信息,通过 Flume 将 Kafka 生产者输入的信息存入 HDFS 中,存储格式
hdfs://localhost:9000/fromkafka/%Y%m%d/,要求存储时文件名为kafka_log。(注:配置好 Flume 后生产者输入的信息不会实时写入到 HDFS,一段时间后批量写入)
3. 使用 Flume 写入当前文件系统
-
编写配置文件,设置文件夹
mylogs为 source 位置,文件夹backup为 sink 写入位置,实现对文件夹的数据备份。 -
新建两个文本文本文件
1.txt与2.txt,在1.txt中输入 'Hello Flume',在2.txt中输入 'hello flume' 将两个文件拖入mylog,查看backup文件夹中出现的文件及其内容。文件可用记事本打开。(注:配置文件中 source 的 type 为spooldir,sink 的 type 为file_roll,请自行学习使用方法,附官网文档:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html)
四.实验步骤
1. MySQL 数据输出
- 在 MySQL 中创建数据库和表
首先在 MySQL 中创建数据库和表,SQL 语句如下:
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
- 编写 Flume 配置文件
在 Flume 的 conf 目录下创建一个名为 mysql.conf 的配置文件,内容如下:
# Name the components on this agent
mysql.sources = mysql-source
mysql.sinks = mysql-sink
mysql.channels = memory-channel
# Describe/configure the source
mysql.sources.mysql-source.type = org.apache.flume.source.jdbc.JdbcSource
mysql.sources.mysql-source.jdbc.driver = com.mysql.jdbc.Driver
mysql.sources.mysql-source.jdbc.url = jdbc:mysql://localhost:3306/school
mysql.sources.mysql-source.jdbc.user = root
mysql.sources.mysql-source.jdbc.password = root
mysql.sources.mysql-source.jdbc.table = student
mysql.sources.mysql-source.jdbc.columns.to.select = id,name,age,grade
mysql.sources.mysql-source.batch.size = 1000
mysql.sources.mysql-source.incremental.column.name = id
mysql.sources.mysql-source.incremental.value = 0
# Describe the sink
mysql.sinks.mysql-sink.type = logger
# Use a channel which buffers events in memory
mysql.channels.memory-channel.type = memory
mysql.channels.memory-channel.capacity = 1000
mysql.channels.memory-channel.transactionCapacity = 100
# Bind the source and sink to the channel
mysql.sources.mysql-source.channels = memory-channel
mysql.sinks.mysql-sink.channel = memory-channel
该配置文件中定义了一个 source,一个 sink 和一个 channel,source 的类型是 JdbcSource,从 MySQL 中读取数据,sink 的类型是 logger,将数据输出到控制台,channel 的类型是 memory,缓存事件。
- 启动 Flume
启动 Flume,使用以下命令:
bin/flume-ng agent --conf conf --conf-file conf/mysql.conf --name mysql -Dflume.root.logger=INFO,console
- 插入数据
在 MySQL 中插入数据,使用以下命令:
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
- 查看控制台输出
在控制台中可以看到输出的数据:
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 31 20 7B 20 22 61 67 65 22 20 3A 20 32 35 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 65 69 6C 69 75 22 2C 20 22 69 64 22 20 3A 20 31 20 7D 0D 0A }
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 32 20 7B 20 22 61 67 65 22 20 3A 20 32 34 2C 20 22 67 72 61 64 65 22 20 3A 20 39 36 2C 20 22 6E 61 6D 65 22 20 3A 20 22 5A 68 61 6E 67 73 61 6E 22 2C 20 22 69 64 22 20 3A 20 32 20 7D 0D 0A }
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 33 20 7B 20 22 61 67 65 22 20 3A 20 32 34 2C 20 22 67 72 61 64 65 22 20 3A 20 39 33 2C 20 22 6E 61 6D 65 22 20 3A 20 22 4C 69 73 69 22 2C 20 22 69 64 22 20 3A 20 33 20 7D 0D 0A }
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 34 20 7B 20 22 61 67 65 22 20 3A 20 32 31 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 61 6E 67 77 75 22 2C 20 22 69 64 22 20 3A 20 34 20 7D 0D 0A }
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 35 20 7B 20 22 61 67 65 22 20 3A 20 32 31 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 65 69 6C 69 75 22 2C 20 22 69 64 22 20 3A 20 35 20 7D 0D 0A }
其中,body 部分是数据的二进制表示,需要将其转换为字符串才能看到实际的数据。
2. Kafka 链接 Flume
- 配置 Kafka
首先需要配置 Kafka,具体步骤可以参考 Kafka 官方文档。
- 编写 Flume 配置文件
在 Flume 的 conf 目录下创建一个名为 kafka.conf 的配置文件,内容如下:
# Name the components on this agent
kafka.sources = kafka-source
kafka.sinks = hdfs-sink
kafka.channels = memory-channel
# Describe/configure the source
kafka.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
kafka.sources.kafka-source.kafka.topics = test
kafka.sources.kafka-source.batchSize = 1000
kafka.sources.kafka-source.batchDurationMillis = 2000
# Describe the sink
kafka.sinks.hdfs-sink.type = hdfs
kafka.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/fromkafka/%Y%m%d/
kafka.sinks.hdfs-sink.hdfs.filePrefix = kafka_log
kafka.sinks.hdfs-sink.hdfs.rollInterval = 0
kafka.sinks.hdfs-sink.hdfs.rollSize = 0
kafka.sinks.hdfs-sink.hdfs.rollCount = 10000
kafka.sinks.hdfs-sink.hdfs.writeFormat = Text
# Use a channel which buffers events in memory
kafka.channels.memory-channel.type = memory
kafka.channels.memory-channel.capacity = 1000
kafka.channels.memory-channel.transactionCapacity = 100
# Bind the source and sink to the channel
kafka.sources.kafka-source.channels = memory-channel
kafka.sinks.hdfs-sink.channel = memory-channel
该配置文件中定义了一个 source,一个 sink 和一个 channel,source 的类型是 KafkaSource,从 Kafka 中读取数据,sink 的类型是 hdfs,将数据存储到 HDFS 中,channel 的类型是 memory,缓存事件。
- 启动 Flume
启动 Flume,使用以下命令:
bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name kafka -Dflume.root.logger=INFO,console
- 输入数据
在 Kafka 的生产者中输入 'Hello Flume' 或其他信息。
- 查看 HDFS 中的数据
在 HDFS 中查看数据,可以使用以下命令:
hadoop fs -ls /fromkafka/
可以看到 kafka_log 文件,使用以下命令查看文件内容:
hadoop fs -cat /fromkafka/kafka_log
可以看到输入的数据。
3. 使用 Flume 写入当前文件系统
- 创建文件夹和文件
首先在当前用户的文件夹中创建 mylogs 和 backup 文件夹,并在 mylogs 文件夹中创建两个文本文件 1.txt 和 2.txt。
- 编写 Flume 配置文件
在 Flume 的 conf 目录下创建一个名为 file.conf 的配置文件,内容如下:
# Name the components on this agent
file.sources = file-source
file.sinks = file-sink
file.channels = memory-channel
# Describe/configure the source
file.sources.file-source.type = spooldir
file.sources.file-source.spoolDir = /home/hadoop/mylogs
file.sources.file-source.fileHeader = true
file.sources.file-source.basenameHeader = true
# Describe the sink
file.sinks.file-sink.type = file_roll
file.sinks.file-sink.sink.directory = /home/hadoop/backup/
file.sinks.file-sink.sink.rollInterval = 0
file.sinks.file-sink.sink.rollSize = 1048576
file.sinks.file-sink.sink.rollCount = 0
# Use a channel which buffers events in memory
file.channels.memory-channel.type = memory
file.channels.memory-channel.capacity = 1000
file.channels.memory-channel.transactionCapacity = 100
# Bind the source and sink to the channel
file.sources.file-source.channels = memory-channel
file.sinks.file-sink.channel = memory-channel
该配置文件中定义了一个 source,一个 sink 和一个 channel,source 的类型是 spooldir,监控 mylogs 文件夹下的文件,sink 的类型是 file_roll,将数据写入到 backup 文件夹中,channel 的类型是 memory,缓存事件。
- 启动 Flume
启动 Flume,使用以下命令:
bin/flume-ng agent --conf conf --conf-file conf/file.conf --name file -Dflume.root.logger=INFO,console
- 输入数据
在 1.txt 中输入 'Hello Flume',在 2.txt 中输入 'hello flume',将两个文件拖入 mylog 文件夹中。
- 查看
backup文件夹中的文件内容
在 backup 文件夹中可以看到两个文件 1.txt 和 2.txt,使用记事本打开可以看到文件中的内容。
原文地址: https://www.cveoy.top/t/topic/oYbl 著作权归作者所有。请勿转载和采集!