Flink 实时任务中连接外部系统最佳实践:如何确保连接关闭?
在 Flink 中连接外部系统的代码应该写在 open() 方法中,以确保连接只在任务启动时建立一次。但是,在关闭连接方面,我们需要使用 close() 方法。close() 方法将在任务完成或取消时调用,以确保在任务完成时关闭所有连接。
为了避免连接没有正确关闭的问题,可以使用 try-with-resources 语句,在代码执行完后自动关闭连接。例如:
public class MySourceFunction implements SourceFunction<String> {
private Connection connection;
private Statement statement;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection('jdbc:mysql://localhost:3306/mydb', 'username', 'password');
statement = connection.createStatement();
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
ResultSet resultSet = statement.executeQuery('SELECT * FROM mytable');
while (resultSet.next()) {
String result = resultSet.getString('result');
ctx.collect(result);
}
}
@Override
public void cancel() {
try {
statement.close();
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
在这个示例中,我们使用了 try-with-resources 语句,在 open() 方法中建立连接,并在 close() 方法中关闭连接。这样可以确保在任务完成或取消时关闭连接。
另外,如果您使用的是连接池,可以考虑使用 Apache Commons Pool 等连接池库,以确保连接的正确使用和关闭。
原文地址: https://www.cveoy.top/t/topic/mi6p 著作权归作者所有。请勿转载和采集!