Java Kafka 客户端发送消息示例代码 - 异步启动 Shell 脚本
以下是一个使用 Java 实现的 Kafka 客户端发送消息的例子:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 设置 Kafka 服务器地址和端口
String bootstrapServers = "localhost:9092";
// 创建 Kafka 生产者的配置
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建 Kafka 生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 创建 Kafka 消息
String topic = "my-topic";
String key = "my-key";
String value = "Hello, Kafka!";
// 创建 Kafka 记录
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// 发送 Kafka 消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功,偏移量为:" + metadata.offset());
} else {
System.out.println("消息发送失败:" + exception.getMessage());
}
}
});
// 关闭 Kafka 生产者
producer.close();
}
}
要异步启动一个 shell 脚本,可以使用 Java 的 ProcessBuilder 类来执行命令。以下是一个示例代码:
import java.io.IOException;
public class ShellScriptExample {
public static void main(String[] args) {
try {
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder("sh", "/path/to/script.sh");
// 启动进程
Process process = processBuilder.start();
// 等待进程执行完成
int exitCode = process.waitFor();
// 检查进程的退出状态
if (exitCode == 0) {
System.out.println("脚本执行成功");
} else {
System.out.println("脚本执行失败");
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
请注意替换 /path/to/script.sh 为您实际的 shell 脚本路径。
原文地址: https://www.cveoy.top/t/topic/qkJD 著作权归作者所有。请勿转载和采集!