要连接 K8s 集群中的 Kafka 服务,您可以使用 Kafka 的 Java 客户端来实现。以下是一个简单的 Java 代码示例,显示如何连接到 K8s 集群中的 Kafka 服务并发送消息:\n\njava\nimport org.apache.kafka.clients.producer.*;\n\nimport java.util.Properties;\n\npublic class KafkaProducerExample {\n public static void main(String[] args) {\n // 设置 Kafka 服务的地址和端口\n String bootstrapServers = "kafka-service:9092";\n\n // 创建生产者的配置属性\n Properties properties = new Properties();\n properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");\n properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");\n\n // 创建 Kafka 生产者\n KafkaProducer<String, String> producer = new KafkaProducer<>(properties);\n\n // 创建要发送的消息\n String topic = "my-topic";\n String key = "my-key";\n String value = "Hello, Kafka!";\n\n // 创建消息记录\n ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);\n\n // 发送消息\n producer.send(record, new Callback() {\n @Override\n public void onCompletion(RecordMetadata metadata, Exception exception) {\n if (exception != null) {\n System.err.println("Failed to send message: " + exception.getMessage());\n } else {\n System.out.println("Message sent successfully! " +\n "Topic: " + metadata.topic() +\n ", Partition: " + metadata.partition() +\n ", Offset: " + metadata.offset());\n }\n }\n });\n\n // 关闭 Kafka 生产者\n producer.close();\n }\n}\n\n\n在上面的示例中,我们使用org.apache.kafka.clients.producer.KafkaProducer类来创建 Kafka 生产者。我们指定了 Kafka 服务的地址和端口,并设置了序列化器来序列化键和值。然后,我们创建要发送的消息记录,并使用producer.send()方法发送消息。在回调函数中,我们可以处理发送结果。最后,我们关闭 Kafka 生产者。\n\n请注意,您需要替换kafka-service:9092为您 K8s 集群中 Kafka 服务的实际地址和端口,并将my-topic替换为您要发送消息的实际主题。另外,您可能还需要添加 Kafka 客户端的依赖项到您的项目中。

Java 代码连接 K8s 集群中的 Kafka 服务

原文地址: https://www.cveoy.top/t/topic/p7QS 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录