MyBatis+SpringBoot 多数据源 批量插入 Hive 3.1.2 - 高效处理百万级数据

本文将介绍如何使用 MyBatis 和 SpringBoot 配置多数据源,结合 BatchExecutor 实现批量插入,并通过多线程和异步操作,高效地将百万级数据插入到 Hive 3.1.2 中。

场景:

假设我们要向两个不同的数据源 (DataSource1 和 DataSource2) 中插入数据,并且要使用 BatchExecutor 来实现批量插入。

步骤:

  1. 配置多数据源:

    application.yaml 中配置多数据源:

    spring:
      datasource:
        # 数据源1
        datasource1:
          driver-class-name: com.mysql.jdbc.Driver
          url: jdbc:mysql://localhost:3306/test1
          username: root
          password: root
        # 数据源2
        datasource2:
          driver-class-name: com.mysql.jdbc.Driver
          url: jdbc:mysql://localhost:3306/test2
          username: root
          password: root
    
  2. 定义数据源对象和 SqlSessionFactory 对象:

    @Configuration
    @MapperScan(basePackages = "com.example.mapper1", sqlSessionTemplateRef = "sqlSessionTemplate1")
    public class DataSource1Config {
        @Bean
        @ConfigurationProperties(prefix = "spring.datasource.datasource1")
        public DataSource dataSource1() {
            return DataSourceBuilder.create().build();
        }
    
        @Bean
        public SqlSessionFactory sqlSessionFactory1() throws Exception {
            SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
            sessionFactoryBean.setDataSource(dataSource1());
            return sessionFactoryBean.getObject();
        }
    
        @Bean
        public SqlSessionTemplate sqlSessionTemplate1() throws Exception {
            return new SqlSessionTemplate(sqlSessionFactory1());
        }
    }
    
    @Configuration
    @MapperScan(basePackages = "com.example.mapper2", sqlSessionTemplateRef = "sqlSessionTemplate2")
    public class DataSource2Config {
        @Bean
        @ConfigurationProperties(prefix = "spring.datasource.datasource2")
        public DataSource dataSource2() {
            return DataSourceBuilder.create().build();
        }
    
        @Bean
        public SqlSessionFactory sqlSessionFactory2() throws Exception {
            SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
            sessionFactoryBean.setDataSource(dataSource2());
            return sessionFactoryBean.getObject();
        }
    
        @Bean
        public SqlSessionTemplate sqlSessionTemplate2() throws Exception {
            return new SqlSessionTemplate(sqlSessionFactory2());
        }
    }
    
  3. 定义 Mapper 接口:

    @Mapper
    public interface Mapper1 {
        void insertData(List<Data> dataList);
    }
    
    @Mapper
    public interface Mapper2 {
        void insertData(List<Data> dataList);
    }
    
  4. 编写 Service 类:

    @Service
    public class DataInsertService {
        @Autowired
        private Mapper1 mapper1;
    
        @Autowired
        private Mapper2 mapper2;
    
        @Async
        public void insertData(List<Data> dataList) {
            BatchExecutor executor1 = new BatchExecutor(mapper1::insertData, 1000);
            BatchExecutor executor2 = new BatchExecutor(mapper2::insertData, 1000);
    
            for (Data data : dataList) {
                if (data.getId() % 2 == 0) {
                    executor1.add(data);
                } else {
                    executor2.add(data);
                }
            }
    
            executor1.execute();
            executor2.execute();
        }
    }
    
  5. 编写 Controller 类:

    @RestController
    public class DataInsertController {
        @Autowired
        private DataInsertService dataInsertService;
    
        @PostMapping("/insert")
        public String insertData(@RequestBody List<Data> dataList) {
            dataInsertService.insertData(dataList);
            return "success";
        }
    }
    

注意:

  • BatchExecutor 是一个自定义的类,用于实现批量插入功能,可以根据实际情况进行调整。
  • Data 是一个自定义的类,代表要插入的数据对象。
  • 代码中使用 @Async 注解来使 insertData 方法异步执行,提高效率。
  • 可以根据实际情况调整 BatchExecutor 的批量大小,以获得最佳性能。

通过以上步骤,您就可以实现多数据源、批量插入、多线程、异步的百万级数据插入 Hive 3.1.2。

更多优化:

  • 使用连接池,提高数据库连接效率。
  • 优化 SQL 语句,提高执行效率。
  • 使用缓存,减少数据库查询次数。
  • 监控系统资源,及时发现并解决性能瓶颈。

希望本文能够帮助您快速高效地将百万级数据插入到 Hive 3.1.2 中。

代码示例:

public class BatchExecutor<T> {
    private final Consumer<List<T>> consumer;
    private final int batchSize;
    private final List<T> batch = new ArrayList<>();

    public BatchExecutor(Consumer<List<T>> consumer, int batchSize) {
        this.consumer = consumer;
        this.batchSize = batchSize;
    }

    public void add(T data) {
        batch.add(data);
        if (batch.size() >= batchSize) {
            execute();
        }
    }

    public void execute() {
        if (!batch.isEmpty()) {
            consumer.accept(new ArrayList<>(batch));
            batch.clear();
        }
    }
}
MyBatis+SpringBoot 多数据源 批量插入 Hive 3.1.2 - 高效处理百万级数据

原文地址: https://www.cveoy.top/t/topic/od62 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录