熟悉Flume的基本使用方法一实验目的1了解并熟悉flume的功能2 掌握flume的使用方法学会按要求编写相关配置文件二实验平台Flume版本 190Kafka版本 212Mysql版本 8023Hadoop版本 313三.实验内容和要求1.MySQL数据输出在MySQL中建立数据库school在数据库中建立表student。SQL语句如下:create database school;use
四.实验步骤
- MySQL数据输出
(1) 在MySQL中创建数据库和表
首先在MySQL中创建数据库和表,SQL语句如下:
create database school; use school; create table student( id int not null, name varchar(40), age int, grade int, primary key(id) );
(2) 编写Flume配置文件
在Flume的conf目录下创建一个名为mysql.conf的配置文件,内容如下:
Name the components on this agent
mysql.sources = mysql-source mysql.sinks = mysql-sink mysql.channels = memory-channel
Describe/configure the source
mysql.sources.mysql-source.type = org.apache.flume.source.jdbc.JdbcSource mysql.sources.mysql-source.jdbc.driver = com.mysql.jdbc.Driver mysql.sources.mysql-source.jdbc.url = jdbc:mysql://localhost:3306/school mysql.sources.mysql-source.jdbc.user = root mysql.sources.mysql-source.jdbc.password = root mysql.sources.mysql-source.jdbc.table = student mysql.sources.mysql-source.jdbc.columns.to.select = id,name,age,grade mysql.sources.mysql-source.batch.size = 1000 mysql.sources.mysql-source.incremental.column.name = id mysql.sources.mysql-source.incremental.value = 0
Describe the sink
mysql.sinks.mysql-sink.type = logger
Use a channel which buffers events in memory
mysql.channels.memory-channel.type = memory mysql.channels.memory-channel.capacity = 1000 mysql.channels.memory-channel.transactionCapacity = 100
Bind the source and sink to the channel
mysql.sources.mysql-source.channels = memory-channel mysql.sinks.mysql-sink.channel = memory-channel
该配置文件中定义了一个source,一个sink和一个channel,source的类型是JdbcSource,从MySQL中读取数据,sink的类型是logger,将数据输出到控制台,channel的类型是memory,缓存事件。
(3) 启动Flume
启动Flume,使用以下命令:
bin/flume-ng agent --conf conf --conf-file conf/mysql.conf --name mysql -Dflume.root.logger=INFO,console
(4) 插入数据
在MySQL中插入数据,使用以下命令:
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98); insert into student(id,name,age,grade)value(2,'Zhangsan',24,96); insert into student(id,name,age,grade)value(3,'Lisi',24,93); insert into student(id,name,age,grade)value(4,'Wangwu',21,91); insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
(5) 查看控制台输出
在控制台中可以看到输出的数据:
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 31 20 7B 20 22 61 67 65 22 20 3A 20 32 35 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 65 69 6C 69 75 22 2C 20 22 69 64 22 20 3A 20 31 20 7D 0D 0A }
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 32 20 7B 20 22 61 67 65 22 20 3A 20 32 34 2C 20 22 67 72 61 64 65 22 20 3A 20 39 36 2C 20 22 6E 61 6D 65 22 20 3A 20 22 5A 68 61 6E 67 73 61 6E 22 2C 20 22 69 64 22 20 3A 20 32 20 7D 0D 0A }
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 33 20 7B 20 22 61 67 65 22 20 3A 20 32 34 2C 20 22 67 72 61 64 65 22 20 3A 20 39 33 2C 20 22 6E 61 6D 65 22 20 3A 20 22 4C 69 73 69 22 2C 20 22 69 64 22 20 3A 20 33 20 7D 0D 0A }
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 34 20 7B 20 22 61 67 65 22 20 3A 20 32 31 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 61 6E 67 77 75 22 2C 20 22 69 64 22 20 3A 20 34 20 7D 0D 0A }
INFO org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94) - Event: { headers:{} body: 35 20 7B 20 22 61 67 65 22 20 3A 20 32 31 2C 20 22 67 72 61 64 65 22 20 3A 20 39 31 2C 20 22 6E 61 6D 65 22 20 3A 20 22 57 65 69 6C 69 75 22 2C 20 22 69 64 22 20 3A 20 35 20 7D 0D 0A }
其中,body部分是数据的二进制表示,需要将其转换为字符串才能看到实际的数据。
- Kafka链接Flume
(1) 配置Kafka
首先需要配置Kafka,具体步骤可以参考Kafka官方文档。
(2) 编写Flume配置文件
在Flume的conf目录下创建一个名为kafka.conf的配置文件,内容如下:
Name the components on this agent
kafka.sources = kafka-source kafka.sinks = hdfs-sink kafka.channels = memory-channel
Describe/configure the source
kafka.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource kafka.sources.kafka-source.kafka.bootstrap.servers = localhost:9092 kafka.sources.kafka-source.kafka.topics = test kafka.sources.kafka-source.batchSize = 1000 kafka.sources.kafka-source.batchDurationMillis = 2000
Describe the sink
kafka.sinks.hdfs-sink.type = hdfs kafka.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/fromkafka/%Y%m%d/ kafka.sinks.hdfs-sink.hdfs.filePrefix = kafka_log kafka.sinks.hdfs-sink.hdfs.rollInterval = 0 kafka.sinks.hdfs-sink.hdfs.rollSize = 0 kafka.sinks.hdfs-sink.hdfs.rollCount = 10000 kafka.sinks.hdfs-sink.hdfs.writeFormat = Text
Use a channel which buffers events in memory
kafka.channels.memory-channel.type = memory kafka.channels.memory-channel.capacity = 1000 kafka.channels.memory-channel.transactionCapacity = 100
Bind the source and sink to the channel
kafka.sources.kafka-source.channels = memory-channel kafka.sinks.hdfs-sink.channel = memory-channel
该配置文件中定义了一个source,一个sink和一个channel,source的类型是KafkaSource,从Kafka中读取数据,sink的类型是hdfs,将数据存储到hdfs中,channel的类型是memory,缓存事件。
(3) 启动Flume
启动Flume,使用以下命令:
bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name kafka -Dflume.root.logger=INFO,console
(4) 输入数据
在Kafka的生产者中输入“Hello Flume”或其他信息。
(5) 查看hdfs中的数据
在hdfs中查看数据,可以使用以下命令:
hadoop fs -ls /fromkafka/
可以看到kafka_log文件,使用以下命令查看文件内容:
hadoop fs -cat /fromkafka/kafka_log
可以看到输入的数据。
- 使用Flume写入当前文件系统
(1) 创建文件夹和文件
首先在当前用户的文件夹中创建mylogs和backup文件夹,并在mylogs文件夹中创建两个文本文件1.txt和2.txt。
(2) 编写Flume配置文件
在Flume的conf目录下创建一个名为file.conf的配置文件,内容如下:
Name the components on this agent
file.sources = file-source file.sinks = file-sink file.channels = memory-channel
Describe/configure the source
file.sources.file-source.type = spooldir file.sources.file-source.spoolDir = /home/hadoop/mylogs file.sources.file-source.fileHeader = true file.sources.file-source.basenameHeader = true
Describe the sink
file.sinks.file-sink.type = file_roll file.sinks.file-sink.sink.directory = /home/hadoop/backup/ file.sinks.file-sink.sink.rollInterval = 0 file.sinks.file-sink.sink.rollSize = 1048576 file.sinks.file-sink.sink.rollCount = 0
Use a channel which buffers events in memory
file.channels.memory-channel.type = memory file.channels.memory-channel.capacity = 1000 file.channels.memory-channel.transactionCapacity = 100
Bind the source and sink to the channel
file.sources.file-source.channels = memory-channel file.sinks.file-sink.channel = memory-channel
该配置文件中定义了一个source,一个sink和一个channel,source的类型是spooldir,监控mylogs文件夹下的文件,sink的类型是file_roll,将数据写入到backup文件夹中,channel的类型是memory,缓存事件。
(3) 启动Flume
启动Flume,使用以下命令:
bin/flume-ng agent --conf conf --conf-file conf/file.conf --name file -Dflume.root.logger=INFO,console
(4) 输入数据
在1.txt中输入Hello Flume,在2.txt中输入hello flume,将两个文件拖入mylog文件夹中。
(5) 查看backup文件夹中的文件内容
在backup文件夹中可以看到两个文件1.txt和2.txt,使用记事本打开可以看到文件中的内容
原文地址: https://www.cveoy.top/t/topic/hvlD 著作权归作者所有。请勿转载和采集!