以下是一个示例Java程序,用于解析CSV文件并使用StreamLoad端口将数据写入Doris数据库:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.stream.Stream;

public class CsvParser {

    public static void main(String[] args) {
        String csvFilePath = "path/to/csv/file.csv";
        String dorisHost = "localhost";
        int dorisPort = 9030;
        String dorisDatabase = "your_database";
        String dorisTable = "your_table";
        
        try (Stream<String> lines = new BufferedReader(new FileReader(csvFilePath)).lines();
             Connection connection = DriverManager.getConnection("jdbc:mysql://" + dorisHost + ":" + dorisPort + "/" + dorisDatabase, "username", "password")) {
            
            String[] columns = lines.findFirst().orElseThrow(() -> new RuntimeException("CSV file is empty")).split(",");
            String placeholders = Stream.generate(() -> "?").limit(columns.length).reduce((a, b) -> a + "," + b).orElseThrow();
            String insertQuery = "INSERT INTO " + dorisTable + " (" + String.join(",", columns) + ") VALUES (" + placeholders + ")";
            
            try (PreparedStatement statement = connection.prepareStatement(insertQuery)) {
                lines.skip(1).forEach(line -> {
                    String[] values = line.split(",");
                    
                    for (int i = 0; i < values.length; i++) {
                        try {
                            statement.setString(i + 1, values[i]);
                        } catch (SQLException e) {
                            throw new RuntimeException("Failed to set parameter value", e);
                        }
                    }
                    
                    try {
                        statement.addBatch();
                    } catch (SQLException e) {
                        throw new RuntimeException("Failed to add batch", e);
                    }
                });
                
                try {
                    statement.executeBatch();
                } catch (SQLException e) {
                    throw new RuntimeException("Failed to execute batch", e);
                }
            }
            
            System.out.println("Data has been loaded into Doris database successfully.");
        } catch (IOException | SQLException e) {
            e.printStackTrace();
        }
    }
}

请确保在运行程序之前,将以下内容替换为实际的数据库连接信息和CSV文件路径:

  • dorisHost:Doris数据库的主机名或IP地址。
  • dorisPort:Doris数据库的StreamLoad端口号(默认为9030)。
  • dorisDatabase:要写入的Doris数据库名称。
  • dorisTable:要写入的Doris表名称。
  • username:连接Doris数据库的用户名。
  • password:连接Doris数据库的密码。
  • csvFilePath:要解析的CSV文件的路径。

运行程序后,它将读取CSV文件的内容,并使用Stream API进行解析和批量插入操作。在插入完成后,将显示一条成功的消息。如果出现任何错误,将打印异常堆栈跟踪。

java 程序示例解析 csv 文件使用 streamload 端口将数据写入 doris数据库

原文地址: https://www.cveoy.top/t/topic/jdnB 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录