虚拟机中jar包模拟生成日志数据到kafka主题ods_base_log中操作
- 确保已经安装并启动了虚拟机和Kafka服务。
- 在虚拟机中创建一个Java项目,并添加Kafka的Java客户端依赖。
- 编写Java代码,模拟生成日志数据。
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class LogProducer {
public static void main(String[] args) throws InterruptedException {
String topic = "ods_base_log";
String brokers = "localhost:9092";
String partitionerClass = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", partitionerClass);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Random random = new Random();
String[] users = {"user1", "user2", "user3", "user4", "user5"};
String[] events = {"login", "click", "view", "purchase"};
for (int i = 0; i < 100; i++) {
String user = users[random.nextInt(users.length)];
String event = events[random.nextInt(events.length)];
String message = user + " " + event;
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record);
System.out.println("Sent: " + message);
Thread.sleep(1000);
}
producer.close();
}
}
- 运行Java代码,模拟生成日志数据并发送到Kafka主题ods_base_log中。
- 在Kafka中消费ods_base_log主题中的日志数据,进行后续的数据处理
原文地址: http://www.cveoy.top/t/topic/hox7 著作权归作者所有。请勿转载和采集!