Java Flink Oracle 批量新增内容 - 使用 Apache Flink 向 Oracle 数据库批量插入数据
使用 Apache Flink 在 Java 中执行批量插入到 Oracle 数据库的操作,您可以按照以下步骤进行:
- 将必要的依赖项添加到您的 Maven 或 Gradle 项目中:
对于 Flink:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
对于 Oracle JDBC 驱动程序:
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${oracle.version}</version>
</dependency>
用适当的版本替换 ${flink.version} 和 ${oracle.version}。
- 创建一个执行批量插入的 Flink 作业:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class BatchInsertJob {
public static void main(String[] args) throws Exception {
// 解析命令行参数
ParameterTool params = ParameterTool.fromArgs(args);
// 设置执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建要插入的记录数据集
DataSet<Tuple3<Integer, String, Double>> records = env.fromElements(
new Tuple3<>(1, 'A', 10.0),
new Tuple3<>(2, 'B', 20.0),
new Tuple3<>(3, 'C', 30.0)
);
// 配置 Oracle JDBC Sink
JdbcSink.sink(
"INSERT INTO your_table (id, name, value) VALUES (?, ?, ?)",
(ps, record) -> {
ps.setInt(1, record.f0);
ps.setString(2, record.f1);
ps.setDouble(3, record.f2);
},
JdbcSink.sinkBuilder()
.setDrivername("oracle.jdbc.driver.OracleDriver")
.setDBUrl(params.get("dbUrl"))
.setUsername(params.get("username"))
.setPassword(params.get("password"))
.build()
);
// 执行批量插入作业
records.addSink(JdbcSink.sink(
"INSERT INTO your_table (id, name, value) VALUES (?, ?, ?)",
(ps, record) -> {
ps.setInt(1, record.f0);
ps.setString(2, record.f1);
ps.setDouble(3, record.f2);
},
JdbcSink.sinkBuilder()
.setDrivername("oracle.jdbc.driver.OracleDriver")
.setDBUrl(params.get("dbUrl"))
.setUsername(params.get("username"))
.setPassword(params.get("password"))
.build()
));
// 执行批量插入作业
env.execute("Batch Insert Job");
}
}
用实际的表名替换 your_table,并使用您想要的数据修改 records 数据集。
- 构建并运行 Flink 作业:
您可以使用 Maven 或 Gradle 构建项目,然后将作业提交到您的 Flink 集群。在运行作业时,请确保传递必要的命令行参数,例如 dbUrl、username 和 password。
例如,使用 Maven:
mvn clean package
flink run -c com.example.BatchInsertJob target/my-project.jar --dbUrl jdbc:oracle:thin:@//localhost:1521/XEPDB1 --username your_username --password your_password
用实际的包名和类名替换 com.example.BatchInsertJob,并使用您的 Oracle 数据库连接详细信息修改 dbUrl、username 和 password 参数。
这将执行 Flink 作业并将批量插入数据到 Oracle 数据库中。
原文地址: https://www.cveoy.top/t/topic/pWEH 著作权归作者所有。请勿转载和采集!