在 Flink 中连接外部系统的代码应该写在 open() 方法中,以确保连接只在任务启动时建立一次。但是,在关闭连接方面,我们需要使用 close() 方法。close() 方法将在任务完成或取消时调用,以确保在任务完成时关闭所有连接。

为了避免连接没有正确关闭的问题,可以使用 try-with-resources 语句,在代码执行完后自动关闭连接。例如:

public class MySourceFunction implements SourceFunction<String> {

    private Connection connection;
    private Statement statement;

    @Override
    public void open(Configuration parameters) throws Exception {
        connection = DriverManager.getConnection('jdbc:mysql://localhost:3306/mydb', 'username', 'password');
        statement = connection.createStatement();
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        ResultSet resultSet = statement.executeQuery('SELECT * FROM mytable');

        while (resultSet.next()) {
            String result = resultSet.getString('result');
            ctx.collect(result);
        }
    }

    @Override
    public void cancel() {
        try {
            statement.close();
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,我们使用了 try-with-resources 语句,在 open() 方法中建立连接,并在 close() 方法中关闭连接。这样可以确保在任务完成或取消时关闭连接。

另外,如果您使用的是连接池,可以考虑使用 Apache Commons Pool 等连接池库,以确保连接的正确使用和关闭。

Flink 实时任务中连接外部系统最佳实践:如何确保连接关闭?

原文地址: https://www.cveoy.top/t/topic/mi6p 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录