精准一次写入 Kafka 的方式主要有三种:

  1. 同步发送方式

在同步发送方式中,发送者会等待 Kafka 确认收到消息后再继续发送下一条消息。如果 Kafka 没有确认收到消息,发送者会一直等待确认,直到确认为止。这种方式可以保证消息的精准一次性。

Java 实现:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "key", "value");

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("Sent message: (" + record.key() + ", " + record.value() + ") at offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
  1. 事务方式

在事务方式中,发送者会将消息写入一个事务,然后提交事务。如果事务提交成功,消息就会被写入 Kafka,否则事务会回滚。这种方式可以保证消息的原子性和一致性,也可以保证消息的精准一次性。

Java 实现:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-id");

Producer<String, String> producer = new KafkaProducer<>(props);

producer.initTransactions();

try {
    producer.beginTransaction();

    ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_name", "key1", "value1");
    producer.send(record1);

    ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_name", "key2", "value2");
    producer.send(record2);

    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
} finally {
    producer.close();
}
  1. 幂等性方式

在幂等性方式中,发送者会在消息中携带一个序列号,Kafka 会根据序列号判断消息是否已经被写入过。如果消息已经被写入过,Kafka 就会忽略这条消息。这种方式可以保证消息的幂等性和精准一次性。

Java 实现:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");

Producer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "key", "value");

producer.send(record);
写入kafka精准一次的方式有几种?分别举例实现方式

原文地址: https://www.cveoy.top/t/topic/qos 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录