1. 配置mysql数据源

application.yaml中添加以下配置:

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
  1. 配置hive数据源

application.yaml中添加以下配置:

hive:
  datasource:
    url: jdbc:hive2://localhost:10000/default
    username: hive
    password: hive
    driver-class-name: org.apache.hive.jdbc.HiveDriver
  1. 配置BatchExecutor

application.yaml中添加以下配置:

mybatis:
  configuration:
    default-executor-type: batch
  1. 实现批量插入、多线程、异步的方式

首先创建一个DataService类,用来操作数据。

@Service
public class DataService {

    private final MysqlMapper mysqlMapper;
    private final HiveMapper hiveMapper;

    public DataService(MysqlMapper mysqlMapper, HiveMapper hiveMapper) {
        this.mysqlMapper = mysqlMapper;
        this.hiveMapper = hiveMapper;
    }

    public void insertData(int start, int end) {
        List<Data> dataList = generateData(start, end);
        mysqlMapper.insertBatch(dataList);

        CompletableFuture.runAsync(() -> {
            hiveMapper.insertBatch(dataList);
        });
    }

    private List<Data> generateData(int start, int end) {
        List<Data> dataList = new ArrayList<>();
        for (int i = start; i <= end; i++) {
            Data data = new Data();
            data.setId(i);
            data.setName("name-" + i);
            data.setValue("value-" + i);
            dataList.add(data);
        }
        return dataList;
    }
}

其中MysqlMapperHiveMapper是分别对应mysql和hive的Mapper接口,Data是数据实体类。

在Controller中调用DataServiceinsertData方法实现批量插入、多线程、异步的方式。

@RestController
public class DataController {

    private final DataService dataService;

    public DataController(DataService dataService) {
        this.dataService = dataService;
    }

    @GetMapping("/insert")
    public String insertData() {
        long startTime = System.currentTimeMillis();
        for (int i = 1; i <= 10; i++) {
            final int start = (i - 1) * 100000 + 1;
            final int end = i * 100000;
            dataService.insertData(start, end);
        }
        long endTime = System.currentTimeMillis();
        return "总共用时:" + (endTime - startTime) + "ms";
    }
}

上述代码中,循环10次,每次插入10万条数据,总共插入100万条数据。同时,在hive的插入操作中使用CompletableFuture.runAsync实现异步执行,提高效率。

最后,需要注意的是,要在pom.xml文件中添加hive的依赖和驱动:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>3.1.2</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>3.1.2</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-service</artifactId>
    <version>3.1.2</version>
</dependency>
``
使用mybatis346+springboot在applicationyaml中配置一个mysql数据源、一个hive数据源通过BatchExecutor来实现批量插入、多线程、异步的方式实现百万级数据插入hive数据库中其中hive的版本为312

原文地址: https://www.cveoy.top/t/topic/fot6 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录