springboot 通过java 批量将数据插入Apache hudi的示例
下面是一个使用Spring Boot和Java批量将数据插入Apache Hudi的示例:
- 首先,您需要添加Hudi和相关依赖项到您的pom.xml文件中:
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9.3</version>
</dependency>
- 创建一个Spring Boot应用程序,并在您的主类中添加以下代码:
@SpringBootApplication
public class HudiBatchInsertApplication implements CommandLineRunner {
@Autowired
private DataSource dataSource;
@Override
public void run(String... args) throws Exception {
JdbcDFSDatasetConfigBuilder configBuilder = new JdbcDFSDatasetConfigBuilder();
HoodieWriteConfig writeConfig = configBuilder
.withPath("file:///tmp/hudi_test")
.forTable("test_table")
.withPreCombineField("id")
.withKeyGeneratorClass(NonpartitionedKeyGenerator.class)
.withRecordKeyField("id")
.withPartitionFields(Collections.emptyList())
.withSchema(HoodieAvroUtils
.addMetadataFields(new Schema.Parser().parse(
getClass().getResourceAsStream("/test.avsc"))))
.withBulkInsertParallelism(2)
.withFinalizeWriteParallelism(2)
.withWriteBufferLimitBytes(1024 * 1024 * 1024, 1024 * 1024 * 512, 3)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(HoodieAvroPayload.class)
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(2)
.withCommitsRetained(1)
.withAutoClean(false)
.build())
.build();
HoodieWriteClient writeClient = new HoodieWriteClient<>(new JavaSparkContext(new SparkConf().setAppName("HudiBatchInsert").setMaster("local[*]")),
writeConfig);
List<HoodieRecord> records = new ArrayList<>();
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement("select * from test_table limit 1000");
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String id = rs.getString("id");
String name = rs.getString("name");
String email = rs.getString("email");
HoodieRecord record = new HoodieRecord(new HoodieKey(id, "1"), new GenericRecordBuilder(writeConfig.getSchema()).set("id", id).set("name", name).set("email", email).build());
records.add(record);
}
}
writeClient.startCommit();
JavaRDD<HoodieRecord> recordRDD = new JavaSparkContext(writeClient.getJavaSparkContext()).parallelize(records);
writeClient.bulkInsert(recordRDD, writeConfig, writeClient.commit());
writeClient.commitCompaction(writeClient.scheduleCompaction(Option.empty()).get());
}
public static void main(String[] args) {
SpringApplication.run(HudiBatchInsertApplication.class, args);
}
}
-
上述代码将从您的数据库中选择前1000条记录,并将它们插入到Hudi表中。您可以根据需要更改查询和记录的数量。
-
运行该示例,您应该能够在/tmp/hudi_test目录下看到Hudi表的文件
原文地址: https://www.cveoy.top/t/topic/fleM 著作权归作者所有。请勿转载和采集!