MyBatis 批量插入 Hive 数据:多线程异步优化百万级数据
spring:
datasource:
mysql:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/db_name
username: root
password: root
hive:
driver-class-name: org.apache.hive.jdbc.HiveDriver
url: jdbc:hive2://localhost:10000/db_name;auth=noSasl
username: hive
password: hive
mybatis:
mapper-locations: classpath:mapper/*.xml
configuration:
default-executor-type: batch
在 mapper.xml 中编写 insert 语句,在 Service 层中调用 mapper,获取 mysql 数据,然后通过多线程、异步的方式插入 hive。
@Service
public class DataServiceImpl implements DataService {
@Autowired
private DataMapper dataMapper;
@Autowired
private HiveTemplate hiveTemplate;
@Override
public void insertDataToHive() {
List<Data> dataList = dataMapper.selectAllData();
List<Future> futures = new ArrayList<>();
for (int i = 0; i < dataList.size(); i += 10000) {
List<Data> subList = dataList.subList(i, Math.min(i + 10000, dataList.size()));
Future future = hiveTemplate.submit(new InsertToHiveTask(subList));
futures.add(future);
}
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
private class InsertToHiveTask implements Callable<Void> {
private final List<Data> subList;
public InsertToHiveTask(List<Data> subList) {
this.subList = subList;
}
@Override
public Void call() throws Exception {
Connection conn = hiveTemplate.getConnection();
String sql = 'insert into table_name(id, name, age) values(?,?,?)';
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
for (Data data : subList) {
stmt.setInt(1, data.getId());
stmt.setString(2, data.getName());
stmt.setInt(3, data.getAge());
stmt.addBatch();
}
stmt.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
}
}
原文地址: https://www.cveoy.top/t/topic/od7x 著作权归作者所有。请勿转载和采集!