写入kafka精准一次的方式有几种?分别举例实现方式
精准一次写入 Kafka 的方式主要有三种:
- 同步发送方式
在同步发送方式中,发送者会等待 Kafka 确认收到消息后再继续发送下一条消息。如果 Kafka 没有确认收到消息,发送者会一直等待确认,直到确认为止。这种方式可以保证消息的精准一次性。
Java 实现:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "key", "value");
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Sent message: (" + record.key() + ", " + record.value() + ") at offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
- 事务方式
在事务方式中,发送者会将消息写入一个事务,然后提交事务。如果事务提交成功,消息就会被写入 Kafka,否则事务会回滚。这种方式可以保证消息的原子性和一致性,也可以保证消息的精准一次性。
Java 实现:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_name", "key1", "value1");
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_name", "key2", "value2");
producer.send(record2);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} finally {
producer.close();
}
- 幂等性方式
在幂等性方式中,发送者会在消息中携带一个序列号,Kafka 会根据序列号判断消息是否已经被写入过。如果消息已经被写入过,Kafka 就会忽略这条消息。这种方式可以保证消息的幂等性和精准一次性。
Java 实现:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "key", "value");
producer.send(record);
原文地址: https://www.cveoy.top/t/topic/qos 著作权归作者所有。请勿转载和采集!