首先,需要在pom.xml文件中添加相应的依赖:

<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>2.1.4</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>3.1.2</version>
</dependency>

然后,需要配置MyBatis和Hive的数据源。这里使用的是Spring Boot自带的HikariCP连接池。在application.properties文件中添加以下内容:

# MyBatis
mybatis.mapper-locations=classpath:mapper/*.xml

# Hive
hive.url=jdbc:hive2://localhost:10000/default
hive.username=hive
hive.password=hive

接下来,创建一个User类作为实体类:

public class User {
    private Long id;
    private String name;
    private Integer age;
    // 省略getter和setter方法
}

创建一个UserMapper接口用于定义操作数据库的方法:

public interface UserMapper {
    void insert(User user);
}

resources目录下创建mapper/UserMapper.xml文件,编写insert方法的SQL语句:

<mapper namespace="com.example.mapper.UserMapper">
    <insert id="insert" parameterType="com.example.entity.User">
        insert into user (name, age) values (#{name}, #{age})
    </insert>
</mapper>

接下来,创建一个UserService类,用于生成随机数据并调用UserMapperinsert方法插入数据:

@Service
public class UserService {
    @Autowired
    private UserMapper userMapper;

    public void addUser(User user) {
        userMapper.insert(user);
    }

    public List<User> generateUsers(int count) {
        List<User> users = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < count; i++) {
            User user = new User();
            user.setName("user" + i);
            user.setAge(random.nextInt(100));
            users.add(user);
        }
        return users;
    }
}

最后,在Application类中创建一个定时任务,每隔一段时间生成随机数据并使用BatchExecutor批量插入到Hive中:

@SpringBootApplication
@EnableScheduling
public class Application {
    @Autowired
    private UserService userService;

    @Autowired
    private DataSource dataSource;

    @Value("${hive.url}")
    private String hiveUrl;

    @Value("${hive.username}")
    private String hiveUsername;

    @Value("${hive.password}")
    private String hivePassword;

    @Scheduled(fixedRate = 10000)
    public void insertData() throws SQLException {
        List<User> users = userService.generateUsers(100000);
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(false);
            try (PreparedStatement stmt = conn.prepareStatement("insert into user (name, age) values (?, ?)")) {
                for (User user : users) {
                    stmt.setString(1, user.getName());
                    stmt.setInt(2, user.getAge());
                    stmt.addBatch();
                }
                stmt.executeBatch();
                conn.commit();
            }
        }

        try (Connection conn = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword)) {
            try (PreparedStatement stmt = conn.prepareStatement("insert into user (name, age) values (?, ?)")) {
                BatchExecutor executor = new BatchExecutor(stmt);
                for (User user : users) {
                    executor.addBatch(user.getName(), user.getAge());
                }
                executor.executeBatch();
            }
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

在定时任务中,首先使用JDBC批量插入到MySQL中,然后使用BatchExecutor批量插入到Hive中。这样可以保证数据在Hive中的一致性。同时,由于使用了多线程和异步插入,可以大大提高插入数据的速度

使用mybatis+springboot通过BatchExecutor来实现批量插入、多线程、异步的方式实现百万级数据插入hive中写一个例子

原文地址: https://www.cveoy.top/t/topic/for1 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录