MyBatis+SpringBoot 批量插入百万级数据到 Hive:多线程异步优化
首先,需要在pom.xml中添加MyBatis和Hive相关的依赖:
<!-- MyBatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
<!-- Hive -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
接下来,定义一个User实体类,用于表示要插入的数据:
public class User {
private Long id;
private String name;
private String email;
// getter and setter
}
然后,在application.properties中配置Hive数据源:
spring.datasource.url=jdbc:hive2://localhost:10000/default
spring.datasource.username=your_username
spring.datasource.password=your_password
spring.datasource.driver-class-name=org.apache.hive.jdbc.HiveDriver
接着,定义一个UserMapper接口,用于定义插入数据的SQL语句:
@Mapper
public interface UserMapper {
@Insert('INSERT INTO user(id, name, email) VALUES(#{id}, #{name}, #{email})')
int insertUser(User user);
}
最后,在UserService中实现批量插入、多线程、异步的方式实现百万级数据插入hive中:
@Service
public class UserService {
private static final int BATCH_SIZE = 10000; // 每批次插入的数据量
private static final int THREAD_COUNT = 4; // 线程数
@Autowired
private UserMapper userMapper;
@Autowired
private Executor executor;
public void insertBatch(List<User> userList) {
int size = userList.size();
int count = size / BATCH_SIZE + (size % BATCH_SIZE == 0 ? 0 : 1); // 计算批次数
// 创建多个批量插入的任务
List<Callable<Integer>> tasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
final List<User> batchList = userList.subList(i * BATCH_SIZE, Math.min((i + 1) * BATCH_SIZE, size));
tasks.add(() -> {
for (User user : batchList) {
userMapper.insertUser(user);
}
return batchList.size();
});
}
// 执行批量插入任务
try {
List<Future<Integer>> futures = executor.invokeAll(tasks);
int total = 0;
for (Future<Integer> future : futures) {
total += future.get();
}
System.out.println('插入数据总数:' + total);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上面的代码中,我们使用Executor来创建一个线程池,然后将批量插入任务分配给多个线程去执行。每个线程插入一批次的数据,插入完成后返回插入的数据量,最后将所有线程插入的数据量加起来,得到插入数据的总数。这样,就可以实现批量插入、多线程、异步的方式实现百万级数据插入hive中。
使用UserService的例子:
@Service
public class SomeService {
@Autowired
private UserService userService;
public void insertUsers(List<User> userList) {
userService.insertBatch(userList);
}
}
原文地址: https://www.cveoy.top/t/topic/od6t 著作权归作者所有。请勿转载和采集!