springboot applicationyaml中配置apache hudi数据源通过java操作apache hudi 进行新增、修改、查询的示例
- 在application.yaml中配置Apache Hudi数据源
spring:
datasource:
url: jdbc:hive2://localhost:10000/default
username: root
password: root
driver-class-name: org.apache.hive.jdbc.HiveDriver
- 使用Java代码操作Apache Hudi进行新增、修改、查询
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
public class HudiExample {
private static final String TABLE_NAME = "hudi_test";
private static final String BASE_PATH = "/tmp/hudi_test";
public static void main(String[] args) {
// Create SparkSession
SparkSession spark = SparkSession.builder()
.appName("HudiExample")
.master("local[*]")
.getOrCreate();
// Set HoodieWriteConfig
HoodieWriteConfig hoodieWriteConfig = HoodieWriteConfig.newBuilder()
.withPath(BASE_PATH)
.withSchema(HoodieDataSourceHelpers.createHoodieSchema("id", "name"))
.withParallelism(2, 2)
.forTable(TABLE_NAME)
.withKeyGenerator(SimpleKeyGenerator.class.getName())
.withBulkInsertParallelism(2)
.withWriteConcurrency(2)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(SimpleAvroPayload.class.getName()).build())
.build();
// Create HoodieDataSourceWriter
HoodieDataSourceWriter hoodieDataSourceWriter = HoodieDataSourceHelpers.createDataSourceWriter(hoodieWriteConfig);
// Create HoodieDataSourceReader
HoodieDataSourceReader hoodieDataSourceReader = HoodieDataSourceHelpers.createDataSourceReader(spark.read(), BASE_PATH, TABLE_NAME, new ArrayList<>());
// Create records
List<HoodieRecord> records = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
HoodieRecord record = new HoodieRecord(new org.apache.hudi.avro.model.HoodieTestAvroPayload().newBuilder().setId(String.valueOf(i)).setName("name" + i).build(), String.valueOf(i));
records.add(record);
}
// Insert records
hoodieDataSourceWriter.writeBatch(records.iterator());
// Update records
List<HoodieRecord> updateRecords = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
HoodieRecord record = new HoodieRecord(new org.apache.hudi.avro.model.HoodieTestAvroPayload().newBuilder().setId(String.valueOf(i)).setName("name" + i + "_updated").build(), String.valueOf(i));
updateRecords.add(record);
}
hoodieDataSourceWriter.writeBatch(updateRecords.iterator());
// Query records
Dataset<Row> df = hoodieDataSourceReader.read(spark, DataSourceReadOptions.newBuilder().build());
df.show();
spark.stop();
}
}
上述代码中,首先创建了一个SparkSession,然后设置了HoodieWriteConfig,其中包括路径、schema、并行度、表名、Key生成器、批量插入并行度、写入并发度、索引配置和压缩配置等信息。接着创建了一个HoodieDataSourceWriter和一个HoodieDataSourceReader,分别用于写入和读取数据。然后创建了一些记录,并使用writeBatch方法将它们插入到数据源中。接着更新一些记录,并再次使用writeBatch方法将其更新到数据源中。最后,使用HoodieDataSourceReader读取数据,并将结果显示在控制台上
原文地址: https://www.cveoy.top/t/topic/flf8 著作权归作者所有。请勿转载和采集!