使用mybatis+springboot通过BatchExecutor来实现批量插入、多线程、异步的方式实现百万级数据插入hive中写一个例子
首先,需要在pom.xml文件中添加相应的依赖:
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
然后,需要配置MyBatis和Hive的数据源。这里使用的是Spring Boot自带的HikariCP连接池。在application.properties文件中添加以下内容:
# MyBatis
mybatis.mapper-locations=classpath:mapper/*.xml
# Hive
hive.url=jdbc:hive2://localhost:10000/default
hive.username=hive
hive.password=hive
接下来,创建一个User类作为实体类:
public class User {
private Long id;
private String name;
private Integer age;
// 省略getter和setter方法
}
创建一个UserMapper接口用于定义操作数据库的方法:
public interface UserMapper {
void insert(User user);
}
在resources目录下创建mapper/UserMapper.xml文件,编写insert方法的SQL语句:
<mapper namespace="com.example.mapper.UserMapper">
<insert id="insert" parameterType="com.example.entity.User">
insert into user (name, age) values (#{name}, #{age})
</insert>
</mapper>
接下来,创建一个UserService类,用于生成随机数据并调用UserMapper的insert方法插入数据:
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
public void addUser(User user) {
userMapper.insert(user);
}
public List<User> generateUsers(int count) {
List<User> users = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < count; i++) {
User user = new User();
user.setName("user" + i);
user.setAge(random.nextInt(100));
users.add(user);
}
return users;
}
}
最后,在Application类中创建一个定时任务,每隔一段时间生成随机数据并使用BatchExecutor批量插入到Hive中:
@SpringBootApplication
@EnableScheduling
public class Application {
@Autowired
private UserService userService;
@Autowired
private DataSource dataSource;
@Value("${hive.url}")
private String hiveUrl;
@Value("${hive.username}")
private String hiveUsername;
@Value("${hive.password}")
private String hivePassword;
@Scheduled(fixedRate = 10000)
public void insertData() throws SQLException {
List<User> users = userService.generateUsers(100000);
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
try (PreparedStatement stmt = conn.prepareStatement("insert into user (name, age) values (?, ?)")) {
for (User user : users) {
stmt.setString(1, user.getName());
stmt.setInt(2, user.getAge());
stmt.addBatch();
}
stmt.executeBatch();
conn.commit();
}
}
try (Connection conn = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword)) {
try (PreparedStatement stmt = conn.prepareStatement("insert into user (name, age) values (?, ?)")) {
BatchExecutor executor = new BatchExecutor(stmt);
for (User user : users) {
executor.addBatch(user.getName(), user.getAge());
}
executor.executeBatch();
}
}
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
在定时任务中,首先使用JDBC批量插入到MySQL中,然后使用BatchExecutor批量插入到Hive中。这样可以保证数据在Hive中的一致性。同时,由于使用了多线程和异步插入,可以大大提高插入数据的速度
原文地址: https://www.cveoy.top/t/topic/for1 著作权归作者所有。请勿转载和采集!