Flume 入门指南:基本使用方法详解

一. 实验目的

  1. 了解并熟悉 Flume 的功能
  2. 掌握 Flume 的使用方法,学会按要求编写相关配置文件

二. 实验平台

  • Flume 版本:1.9.0
  • Kafka 版本:2.12
  • MySQL 版本:8.0.23
  • Hadoop 版本:3.1.3

三.实验内容和要求

1. MySQL 数据输出

  1. 在 MySQL 中建立数据库 school,并在数据库中建立表 student。SQL 语句如下:
create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
  1. 编写配置文件,将 student 表中的内容输出到控制台。启动 Flume,在 student 表中使用下列命令插入数据,在 Kafka 消费者中查看相应数据。
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);

2. Kafka 链接 Flume

  1. 编写配置文件,将 Kafka 作为输入,在生产者中输入 'HelloFlume' 或其他信息,通过 Flume 将 Kafka 生产者输入的信息存入 HDFS 中,存储格式 hdfs://localhost:9000/fromkafka/%Y%m%d/,要求存储时文件名为 kafka_log。(注:配置好 Flume 后生产者输入的信息不会实时写入到 HDFS,一段时间后批量写入)

3. 使用 Flume 写入当前文件系统

  1. 编写配置文件,设置文件夹 mylogs 为 source 位置,文件夹 backup 为 sink 写入位置,实现对文件夹的数据备份。

  2. 新建两个文本文本文件 1.txt2.txt,在 1.txt 中输入 'Hello Flume',在 2.txt 中输入 'hello flume' 将两个文件拖入 mylog,查看 backup 文件夹中出现的文件及其内容。文件可用记事本打开。(注:配置文件中 source 的 type 为 spooldir,sink 的 type 为 file_roll,请自行学习使用方法,附官网文档:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html)

四.实验步骤

1. MySQL 数据输出

  1. 在 MySQL 中创建数据库和表

首先在 MySQL 中创建数据库和表,SQL 语句如下:

create database school;
use school;
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
  1. 编写 Flume 配置文件

在 Flume 的 conf 目录下创建一个名为 mysql.conf 的配置文件,内容如下:

# Name the components on this agent
mysql.sources = mysql-source
mysql.sinks = mysql-sink
mysql.channels = memory-channel

# Describe/configure the source
mysql.sources.mysql-source.type = org.apache.flume.source.jdbc.JdbcSource
mysql.sources.mysql-source.jdbc.driver = com.mysql.jdbc.Driver
mysql.sources.mysql-source.jdbc.url = jdbc:mysql://localhost:3306/school
mysql.sources.mysql-source.jdbc.user = root
mysql.sources.mysql-source.jdbc.password = root
mysql.sources.mysql-source.jdbc.table = student
mysql.sources.mysql-source.jdbc.columns.to.select = id,name,age,grade
mysql.sources.mysql-source.batch.size = 1000
mysql.sources.mysql-source.incremental.column.name = id
mysql.sources.mysql-source.incremental.value = 0

# Describe the sink
mysql.sinks.mysql-sink.type = logger

# Use a channel which buffers events in memory
mysql.channels.memory-channel.type = memory
mysql.channels.memory-channel.capacity = 1000
mysql.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
mysql.sources.mysql-source.channels = memory-channel
mysql.sinks.mysql-sink.channel = memory-channel

该配置文件中定义了一个 source,一个 sink 和一个 channel,source 的类型是 JdbcSource,从 MySQL 中读取数据,sink 的类型是 logger,将数据输出到控制台,channel 的类型是 memory,缓存事件。

  1. 启动 Flume

启动 Flume,使用以下命令:

bin/flume-ng agent --conf conf --conf-file conf/mysql.conf --name mysql -Dflume.root.logger=INFO,console
  1. 插入数据

在 MySQL 中插入数据,使用以下命令:

insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
  1. 查看控制台输出

在控制台中可以看到输出的数据:

INFO  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 31 20 7B 20 22 61 67 65 22 20 3A 20 32 35 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 65 69 6C 69 75 22 2C 20 22 69 64 22 20 3A 20 31 20 7D 0D 0A }

INFO  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 32 20 7B 20 22 61 67 65 22 20 3A 20 32 34 2C 20 22 67 72 61 64 65 22 20 3A 20 39 36 2C 20 22 6E 61 6D 65 22 20 3A 20 22 5A 68 61 6E 67 73 61 6E 22 2C 20 22 69 64 22 20 3A 20 32 20 7D 0D 0A }

INFO  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 33 20 7B 20 22 61 67 65 22 20 3A 20 32 34 2C 20 22 67 72 61 64 65 22 20 3A 20 39 33 2C 20 22 6E 61 6D 65 22 20 3A 20 22 4C 69 73 69 22 2C 20 22 69 64 22 20 3A 20 33 20 7D 0D 0A }

INFO  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 34 20 7B 20 22 61 67 65 22 20 3A 20 32 31 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 61 6E 67 77 75 22 2C 20 22 69 64 22 20 3A 20 34 20 7D 0D 0A }

INFO  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 35 20 7B 20 22 61 67 65 22 20 3A 20 32 31 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 65 69 6C 69 75 22 2C 20 22 69 64 22 20 3A 20 35 20 7D 0D 0A }

其中,body 部分是数据的二进制表示,需要将其转换为字符串才能看到实际的数据。

2. Kafka 链接 Flume

  1. 配置 Kafka

首先需要配置 Kafka,具体步骤可以参考 Kafka 官方文档。

  1. 编写 Flume 配置文件

在 Flume 的 conf 目录下创建一个名为 kafka.conf 的配置文件,内容如下:

# Name the components on this agent
kafka.sources = kafka-source
kafka.sinks = hdfs-sink
kafka.channels = memory-channel

# Describe/configure the source
kafka.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
kafka.sources.kafka-source.kafka.topics = test
kafka.sources.kafka-source.batchSize = 1000
kafka.sources.kafka-source.batchDurationMillis = 2000

# Describe the sink
kafka.sinks.hdfs-sink.type = hdfs
kafka.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/fromkafka/%Y%m%d/
kafka.sinks.hdfs-sink.hdfs.filePrefix = kafka_log
kafka.sinks.hdfs-sink.hdfs.rollInterval = 0
kafka.sinks.hdfs-sink.hdfs.rollSize = 0
kafka.sinks.hdfs-sink.hdfs.rollCount = 10000
kafka.sinks.hdfs-sink.hdfs.writeFormat = Text

# Use a channel which buffers events in memory
kafka.channels.memory-channel.type = memory
kafka.channels.memory-channel.capacity = 1000
kafka.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
kafka.sources.kafka-source.channels = memory-channel
kafka.sinks.hdfs-sink.channel = memory-channel

该配置文件中定义了一个 source,一个 sink 和一个 channel,source 的类型是 KafkaSource,从 Kafka 中读取数据,sink 的类型是 hdfs,将数据存储到 HDFS 中,channel 的类型是 memory,缓存事件。

  1. 启动 Flume

启动 Flume,使用以下命令:

bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name kafka -Dflume.root.logger=INFO,console
  1. 输入数据

在 Kafka 的生产者中输入 'Hello Flume' 或其他信息。

  1. 查看 HDFS 中的数据

在 HDFS 中查看数据,可以使用以下命令:

hadoop fs -ls /fromkafka/

可以看到 kafka_log 文件,使用以下命令查看文件内容:

hadoop fs -cat /fromkafka/kafka_log

可以看到输入的数据。

3. 使用 Flume 写入当前文件系统

  1. 创建文件夹和文件

首先在当前用户的文件夹中创建 mylogsbackup 文件夹,并在 mylogs 文件夹中创建两个文本文件 1.txt2.txt

  1. 编写 Flume 配置文件

在 Flume 的 conf 目录下创建一个名为 file.conf 的配置文件,内容如下:

# Name the components on this agent
file.sources = file-source
file.sinks = file-sink
file.channels = memory-channel

# Describe/configure the source
file.sources.file-source.type = spooldir
file.sources.file-source.spoolDir = /home/hadoop/mylogs
file.sources.file-source.fileHeader = true
file.sources.file-source.basenameHeader = true

# Describe the sink
file.sinks.file-sink.type = file_roll
file.sinks.file-sink.sink.directory = /home/hadoop/backup/
file.sinks.file-sink.sink.rollInterval = 0
file.sinks.file-sink.sink.rollSize = 1048576
file.sinks.file-sink.sink.rollCount = 0

# Use a channel which buffers events in memory
file.channels.memory-channel.type = memory
file.channels.memory-channel.capacity = 1000
file.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
file.sources.file-source.channels = memory-channel
file.sinks.file-sink.channel = memory-channel

该配置文件中定义了一个 source,一个 sink 和一个 channel,source 的类型是 spooldir,监控 mylogs 文件夹下的文件,sink 的类型是 file_roll,将数据写入到 backup 文件夹中,channel 的类型是 memory,缓存事件。

  1. 启动 Flume

启动 Flume,使用以下命令:

bin/flume-ng agent --conf conf --conf-file conf/file.conf --name file -Dflume.root.logger=INFO,console
  1. 输入数据

1.txt 中输入 'Hello Flume',在 2.txt 中输入 'hello flume',将两个文件拖入 mylog 文件夹中。

  1. 查看 backup 文件夹中的文件内容

backup 文件夹中可以看到两个文件 1.txt2.txt,使用记事本打开可以看到文件中的内容。

Flume 入门指南:基本使用方法详解

原文地址: https://www.cveoy.top/t/topic/oYbl 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录