MyBatis+SpringBoot 多数据源 批量插入 Hive 3.1.2 - 高效处理百万级数据
MyBatis+SpringBoot 多数据源 批量插入 Hive 3.1.2 - 高效处理百万级数据
本文将介绍如何使用 MyBatis 和 SpringBoot 配置多数据源,结合 BatchExecutor 实现批量插入,并通过多线程和异步操作,高效地将百万级数据插入到 Hive 3.1.2 中。
场景:
假设我们要向两个不同的数据源 (DataSource1 和 DataSource2) 中插入数据,并且要使用 BatchExecutor 来实现批量插入。
步骤:
-
配置多数据源:
在
application.yaml中配置多数据源:spring: datasource: # 数据源1 datasource1: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/test1 username: root password: root # 数据源2 datasource2: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/test2 username: root password: root -
定义数据源对象和 SqlSessionFactory 对象:
@Configuration @MapperScan(basePackages = "com.example.mapper1", sqlSessionTemplateRef = "sqlSessionTemplate1") public class DataSource1Config { @Bean @ConfigurationProperties(prefix = "spring.datasource.datasource1") public DataSource dataSource1() { return DataSourceBuilder.create().build(); } @Bean public SqlSessionFactory sqlSessionFactory1() throws Exception { SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dataSource1()); return sessionFactoryBean.getObject(); } @Bean public SqlSessionTemplate sqlSessionTemplate1() throws Exception { return new SqlSessionTemplate(sqlSessionFactory1()); } } @Configuration @MapperScan(basePackages = "com.example.mapper2", sqlSessionTemplateRef = "sqlSessionTemplate2") public class DataSource2Config { @Bean @ConfigurationProperties(prefix = "spring.datasource.datasource2") public DataSource dataSource2() { return DataSourceBuilder.create().build(); } @Bean public SqlSessionFactory sqlSessionFactory2() throws Exception { SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dataSource2()); return sessionFactoryBean.getObject(); } @Bean public SqlSessionTemplate sqlSessionTemplate2() throws Exception { return new SqlSessionTemplate(sqlSessionFactory2()); } } -
定义 Mapper 接口:
@Mapper public interface Mapper1 { void insertData(List<Data> dataList); } @Mapper public interface Mapper2 { void insertData(List<Data> dataList); } -
编写 Service 类:
@Service public class DataInsertService { @Autowired private Mapper1 mapper1; @Autowired private Mapper2 mapper2; @Async public void insertData(List<Data> dataList) { BatchExecutor executor1 = new BatchExecutor(mapper1::insertData, 1000); BatchExecutor executor2 = new BatchExecutor(mapper2::insertData, 1000); for (Data data : dataList) { if (data.getId() % 2 == 0) { executor1.add(data); } else { executor2.add(data); } } executor1.execute(); executor2.execute(); } } -
编写 Controller 类:
@RestController public class DataInsertController { @Autowired private DataInsertService dataInsertService; @PostMapping("/insert") public String insertData(@RequestBody List<Data> dataList) { dataInsertService.insertData(dataList); return "success"; } }
注意:
BatchExecutor是一个自定义的类,用于实现批量插入功能,可以根据实际情况进行调整。Data是一个自定义的类,代表要插入的数据对象。- 代码中使用
@Async注解来使insertData方法异步执行,提高效率。 - 可以根据实际情况调整
BatchExecutor的批量大小,以获得最佳性能。
通过以上步骤,您就可以实现多数据源、批量插入、多线程、异步的百万级数据插入 Hive 3.1.2。
更多优化:
- 使用连接池,提高数据库连接效率。
- 优化 SQL 语句,提高执行效率。
- 使用缓存,减少数据库查询次数。
- 监控系统资源,及时发现并解决性能瓶颈。
希望本文能够帮助您快速高效地将百万级数据插入到 Hive 3.1.2 中。
代码示例:
public class BatchExecutor<T> {
private final Consumer<List<T>> consumer;
private final int batchSize;
private final List<T> batch = new ArrayList<>();
public BatchExecutor(Consumer<List<T>> consumer, int batchSize) {
this.consumer = consumer;
this.batchSize = batchSize;
}
public void add(T data) {
batch.add(data);
if (batch.size() >= batchSize) {
execute();
}
}
public void execute() {
if (!batch.isEmpty()) {
consumer.accept(new ArrayList<>(batch));
batch.clear();
}
}
}
原文地址: https://www.cveoy.top/t/topic/od62 著作权归作者所有。请勿转载和采集!