1. 确保已经安装并启动了虚拟机和Kafka服务。
  2. 在虚拟机中创建一个Java项目,并添加Kafka的Java客户端依赖。
  3. 编写Java代码,模拟生成日志数据。
import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class LogProducer {
    public static void main(String[] args) throws InterruptedException {
        String topic = "ods_base_log";
        String brokers = "localhost:9092";
        String partitionerClass = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";

        Properties props = new Properties();
        props.put("bootstrap.servers", brokers);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("partitioner.class", partitionerClass);

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        Random random = new Random();
        String[] users = {"user1", "user2", "user3", "user4", "user5"};
        String[] events = {"login", "click", "view", "purchase"};

        for (int i = 0; i < 100; i++) {
            String user = users[random.nextInt(users.length)];
            String event = events[random.nextInt(events.length)];
            String message = user + " " + event;
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
            producer.send(record);
            System.out.println("Sent: " + message);
            Thread.sleep(1000);
        }

        producer.close();
    }
}
  1. 运行Java代码,模拟生成日志数据并发送到Kafka主题ods_base_log中。
  2. 在Kafka中消费ods_base_log主题中的日志数据,进行后续的数据处理
虚拟机中jar包模拟生成日志数据到kafka主题ods_base_log中操作

原文地址: http://www.cveoy.top/t/topic/hox7 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录