百万级数据异步批量插入Hive:使用MyBatis、SpringBoot、BatchExecutor和多线程
首先在'application.yaml'中配置mysql和hive的数据源:
spring:
datasource:
mysql:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
username: root
password: root
hive:
url: jdbc:hive2://localhost:10000/default
username: hive
password: hive
driver-class-name: org.apache.hive.jdbc.HiveDriver
然后在代码中使用'@Autowired'注解自动注入'DataSource',并使用'@Qualifier'注解指定具体的数据源:
@Autowired
@Qualifier("mysqlDataSource")
private DataSource mysqlDataSource;
@Autowired
@Qualifier("hiveDataSource")
private DataSource hiveDataSource;
接下来定义一个数据插入的方法,并使用'@Transactional'注解开启事务管理:
@Transactional
public void batchInsert() throws Exception {
// 从mysql中查询数据
List<Data> dataList = getDataFromMysql();
if (dataList == null || dataList.isEmpty()) {
return;
}
// 使用BatchExecutor将数据批量插入hive
BatchExecutor executor = new BatchExecutor(hiveDataSource.getConnection(), "insert into table_name(col1, col2) values(?, ?)");
for (Data data : dataList) {
executor.addBatch(data.getCol1(), data.getCol2());
}
executor.execute();
}
最后使用多线程和异步的方式来调用数据插入方法,可以使用Java8中的CompletableFuture来实现:
CompletableFuture.runAsync(() -> {
try {
batchInsert();
} catch (Exception e) {
e.printStackTrace();
}
});
原文地址: https://www.cveoy.top/t/topic/od63 著作权归作者所有。请勿转载和采集!