class MyMysqlMapper extends RichSinkFunctionString Int var conn javasqlConnection = _ var prepinsert PreparedStatement = _ var prepupdate PreparedStatement = _ override def openparameters Configu
可以使用泛型来解决这个问题。将元组类型改为一个泛型类型,然后在类定义时指定泛型参数类型。例如:
class MyMysqlMapper[T] extends RichSinkFunction[T] { var conn: java.sql.Connection = _ var prepinsert: PreparedStatement = _ var prepupdate: PreparedStatement = _
override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://192.168.1.128:3306/test?useSSL=false&characterEncoding=utf8", "root", "123456") }
override def invoke(value: T, context: SinkFunction.Context): Unit = {
value match {
case (is_click: String, count: Int) =>
val insertSql = "insert into count_click value(?, ?)"
val updateSql = "update count_click set count = ? where is_click = ?"
prepupdate = conn.prepareStatement(updateSql)
prepupdate.setObject(1, count)
prepupdate.setObject(2, is_click)
prepupdate.execute()
if (prepupdate.getUpdateCount == 0){
prepinsert = conn.prepareStatement(insertSql)
prepinsert.setObject(1, is_click)
prepinsert.setObject(2, count)
prepinsert.execute()
}
case ... //其他元组类型的处理逻辑
}
}
override def close(): Unit = { if (prepinsert != null) { prepinsert.close() }
if (prepupdate != null) {
prepupdate.close()
}
if (conn != null) {
conn.close()
}
} }
在调用该类时,需要指定泛型参数类型,例如:
val mysqlMapper = new MyMysqlMapper(String, Int) val mysqlMapper2 = new MyMysqlMapper(String, String, Double) ...
原文地址: https://www.cveoy.top/t/topic/IOU 著作权归作者所有。请勿转载和采集!