1. 在node101服务器上下载安装Kafka,并启动Kafka服务。
  2. 在node101服务器上下载安装Log4j2,并编写一个Log4j2配置文件,用于生成模拟日志数据到本地文件中。
  3. 在node101服务器上编写一个Java程序,读取本地文件中的模拟日志数据,并将数据发送到Kafka主题ods_base_log中。
  4. 在node101服务器上下载安装Kafka工具包,并使用Kafka工具包验证数据是否成功发送到Kafka主题中。

以下是示例代码:

Log4j2配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/sample.log"
                     filePattern="logs/sample-%d{yyyy-MM-dd-HH-mm-ss}.log.gz">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1" modulate="true"/>
            </Policies>
        </RollingFile>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="RollingFile"/>
        </Root>
    </Loggers>
</Configuration>

Java程序:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

public class KafkaProducerExample {

    public static void main(String[] args) throws IOException {
        String topicName = "ods_base_log";
        String filePath = "logs/sample.log";

        // create Kafka producer properties
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());

        // create Kafka producer
        Producer<String, String> producer = new KafkaProducer<>(props);

        // read data from local file
        BufferedReader reader = new BufferedReader(new FileReader(filePath));
        String line;
        while ((line = reader.readLine()) != null) {
            // send data to Kafka topic
            producer.send(new ProducerRecord<>(topicName, line));
        }
        reader.close();

        // close Kafka producer
        producer.close();
    }
}

使用Kafka工具包验证数据是否成功发送到Kafka主题中:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ods_base_log --from-beginning

如果成功发送到Kafka主题中,应该会看到模拟的日志数据

node101中jar包模拟生成日志数据到kafka主题ods_base_log中

原文地址: http://www.cveoy.top/t/topic/hox3 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录