spring:
  datasource:
    mysql:
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/db_name
      username: root
      password: root
    hive:
      driver-class-name: org.apache.hive.jdbc.HiveDriver
      url: jdbc:hive2://localhost:10000/db_name;auth=noSasl
      username: hive
      password: hive
mybatis:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    default-executor-type: batch

在 mapper.xml 中编写 insert 语句,在 Service 层中调用 mapper,获取 mysql 数据,然后通过多线程、异步的方式插入 hive。

@Service
public class DataServiceImpl implements DataService {

    @Autowired
    private DataMapper dataMapper;

    @Autowired
    private HiveTemplate hiveTemplate;

    @Override
    public void insertDataToHive() {
        List<Data> dataList = dataMapper.selectAllData();
        List<Future> futures = new ArrayList<>();
        for (int i = 0; i < dataList.size(); i += 10000) {
            List<Data> subList = dataList.subList(i, Math.min(i + 10000, dataList.size()));
            Future future = hiveTemplate.submit(new InsertToHiveTask(subList));
            futures.add(future);
        }
        for (Future future : futures) {
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    private class InsertToHiveTask implements Callable<Void> {

        private final List<Data> subList;

        public InsertToHiveTask(List<Data> subList) {
            this.subList = subList;
        }

        @Override
        public Void call() throws Exception {
            Connection conn = hiveTemplate.getConnection();
            String sql = 'insert into table_name(id, name, age) values(?,?,?)';
            try (PreparedStatement stmt = conn.prepareStatement(sql)) {
                for (Data data : subList) {
                    stmt.setInt(1, data.getId());
                    stmt.setString(2, data.getName());
                    stmt.setInt(3, data.getAge());
                    stmt.addBatch();
                }
                stmt.executeBatch();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
}
MyBatis 批量插入 Hive 数据:多线程异步优化百万级数据

原文地址: https://www.cveoy.top/t/topic/od7x 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录