以下是一个 Java 程序示例,使用 Stream 和 Load 工具来解析 CSV 文件并将数据写入 Doris 数据库。在这个示例中,我们使用了 OpenCSV 库来解析 CSV 文件,并使用 JDBC 连接到 Doris 数据库。

import com.opencsv.CSVReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;

public class DorisCsvLoader {
    private static final String JDBC_URL = 'jdbc:mysql://localhost:9030/mydb';
    private static final String JDBC_USERNAME = 'root';
    private static final String JDBC_PASSWORD = 'password';
    private static final String TABLE_NAME = 'my_table';
    private static final String CSV_FILE_PATH = 'data.csv';

    public static void main(String[] args) {
        try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USERNAME, JDBC_PASSWORD);
             CSVReader csvReader = new CSVReader(new FileReader(CSV_FILE_PATH))) {
            String[] columnNames = csvReader.readNext();
            if (columnNames != null) {
                createTable(connection, columnNames);
                loadData(connection, columnNames, csvReader);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void createTable(Connection connection, String[] columnNames) throws SQLException {
        String createTableSQL = 'CREATE TABLE IF NOT EXISTS ' + TABLE_NAME + ' (' +
                Arrays.stream(columnNames).map(columnName -> columnName + ' VARCHAR(255)').reduce((a, b) -> a + ', ' + b).orElse('') +
                ')';
        try (PreparedStatement statement = connection.prepareStatement(createTableSQL)) {
            statement.execute();
        }
    }

    private static void loadData(Connection connection, String[] columnNames, CSVReader csvReader) throws SQLException {
        String insertSQL = 'INSERT INTO ' + TABLE_NAME + ' (' +
                Arrays.stream(columnNames).reduce((a, b) -> a + ', ' + b).orElse('') +
                ') VALUES (' +
                Arrays.stream(columnNames).map(columnName -> '?').reduce((a, b) -> a + ', ' + b).orElse('') +
                ')';
        try (PreparedStatement statement = connection.prepareStatement(insertSQL)) {
            String[] rowData;
            while ((rowData = csvReader.readNext()) != null) {
                for (int i = 0; i < rowData.length; i++) {
                    statement.setString(i + 1, rowData[i]);
                }
                statement.addBatch();
            }
            statement.executeBatch();
        }
    }
}

在上面的示例中,我们首先建立了与 Doris 数据库的连接,并创建了一个 CSVReader 对象来读取 CSV 文件。然后,我们使用 readNext() 方法读取 CSV 文件的第一行,也就是字段名。根据字段名,我们使用 createTable() 方法创建了一个 Doris 表。接下来,我们使用 loadData() 方法将 CSV 文件中的数据插入到 Doris 表中。

请确保在运行此示例之前,已经将相应的 JDBC 驱动程序和 OpenCSV 库添加到项目的依赖项中,并将 JDBC 连接 URL、用户名和密码、表名和 CSV 文件路径替换为实际的值。

Java 使用 Stream 和 Load 工具解析 CSV 文件并写入 Doris 数据库

原文地址: https://www.cveoy.top/t/topic/fJSA 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录