三.实验内容和要求

  1. MySQL 数据输出

在 MySQL 中建立数据库 school,在数据库中建立表 student。SQL 语句如下:

create database school;
use school;
create table student(
  id int not null,
  name varchar(40),
  age int,
  grade int,
  primary key(id)
);

编写配置文件,将 student 表中的内容输出到控制台。启动 Flume,在 student 表中使用下列命令插入数据,在 Kafka 消费者中查看相应数据。

insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
  1. Kafka 链接 Flume

编写配置文件,将 kafka 作为输入,在生产者中输入'HelloFlume' 或其他信息,通过 Flume 将 Kafka 生产者输入的信息存入 hdfs 中,存储格式 hdfs://localhost:9000/fromkafka/%Y%m%d/,要求存储时文件名为 kafka_log。

(注:配置好 Flume 后生产者输入的信息不会实时写入到 hdfs,一段时间后批量写入)

  1. 使用 Flume 写入当前文件系统

编写配置文件,设置文件夹 mylogs 为 source 位置,文件夹 backup 为 sink 写入位置,实现对文件夹的数据备份。

新建两个文本文本文件 1.txt 与 2.txt,在 1.txt 中输入 Hello Flume,在 2.txt 中输入 hello flume 将两个文件拖入 mylog,查看 backup 文件夹中出现的文件及其内容。文件可用记事本打开。

四.实验步骤

  1. MySQL 数据输出

1.1 准备工作

在 CentOS 7 系统上安装好 MySQL,并创建数据库 school 和表 student,插入数据。

1.2 编写 Flume 配置文件

在 Flume 的 conf 目录下,新建一个名为 mysql.conf 的文件,输入以下内容:

agent.sources = r1
agent.sinks = k1
agent.channels = c1

agent.sources.r1.type = jdbc
agent.sources.r1.driver = com.mysql.jdbc.Driver
agent.sources.r1.url = jdbc:mysql://localhost:3306/school
agent.sources.r1.user = root
agent.sources.r1.password = root
agent.sources.r1.sql = SELECT * FROM student
agent.sources.r1.batchSize = 1000
agent.sources.r1.channels = c1

agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000

agent.sinks.k1.type = logger
agent.sinks.k1.channel = c1

1.3 启动 Flume

在 Flume 的 bin 目录下,执行以下命令启动 Flume:

./flume-ng agent --conf conf --conf-file /path/to/mysql.conf --name agent -Dflume.root.logger=INFO,console

1.4 插入数据并查看结果

执行以下命令插入数据:

insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);

在 Flume 控制台中可以看到相应的输出结果。

  1. Kafka 链接 Flume

2.1 准备工作

在 CentOS 7 系统上安装好 Kafka 和 Hadoop,并启动 Zookeeper 和 Kafka。

2.2 编写 Flume 配置文件

在 Flume 的 conf 目录下,新建一个名为 kafka.conf 的文件,输入以下内容:

agent.sources = r1
agent.sinks = k1
agent.channels = c1

agent.sources.r1.type = avro
agent.sources.r1.bind = localhost
agent.sources.r1.port = 44444
agent.sources.r1.channels = c1

agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000

agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://localhost:9000/fromkafka/%Y%m%d/
agent.sinks.k1.hdfs.filePrefix = kafka_log
agent.sinks.k1.hdfs.fileSuffix = .log
agent.sinks.k1.hdfs.rollInterval = 0
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.channel = c1

2.3 启动 Flume

在 Flume 的 bin 目录下,执行以下命令启动 Flume:

./flume-ng agent --conf conf --conf-file /path/to/kafka.conf --name agent -Dflume.root.logger=INFO,console

2.4 发送消息到 Kafka

执行以下命令发送消息到 Kafka:

echo 'Hello Flume' | ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

2.5 查看结果

在 Hadoop 的 namenode 节点上执行以下命令查看结果:

hdfs dfs -ls /fromkafka

可以看到类似以下的输出结果:

drwxr-xr-x   - root supergroup          0 2021-10-21 20:06 /fromkafka/20211021
-rw-r--r--   1 root supergroup         12 2021-10-21 20:06 /fromkafka/20211021/kafka_log.log

其中,kafka_log.log 文件中存储了之前发送的消息。

  1. 使用 Flume 写入当前文件系统

3.1 准备工作

在 CentOS 7 系统上安装好 Flume。

3.2 编写 Flume 配置文件

在 Flume 的 conf 目录下,新建一个名为 file.conf 的文件,输入以下内容:

agent.sources = r1
agent.sinks = k1
agent.channels = c1

agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /path/to/mylogs
agent.sources.r1.fileHeader = false
agent.sources.r1.deserializer.maxLineLength = 500000
agent.sources.r1.channels = c1

agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000

agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /path/to/backup
agent.sinks.k1.sink.rollInterval = 0
agent.sinks.k1.sink.rollSize = 0
agent.sinks.k1.sink.rollCount = 0
agent.sinks.k1.channel = c1

3.3 启动 Flume

在 Flume 的 bin 目录下,执行以下命令启动 Flume:

./flume-ng agent --conf conf --conf-file /path/to/file.conf --name agent -Dflume.root.logger=INFO,console

3.4 创建测试文件并拖入 mylogs 文件夹

新建两个文本文本文件 1.txt 和 2.txt,分别输入 Hello Flume 和 hello flume,将两个文件拖入 mylogs 文件夹。

3.5 查看结果

在 backup 文件夹中,可以看到两个文件 1.txt 和 2.txt,其中分别存储了之前输入的内容。可以使用记事本打开文件查看。

Flume 实践教程:MySQL 数据输出、Kafka 链接和文件系统写入

原文地址: https://www.cveoy.top/t/topic/oYbo 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录