可以使用泛型来解决这个问题。将元组类型改为一个泛型类型,然后在类定义时指定泛型参数类型。例如:

class MyMysqlMapper[T] extends RichSinkFunction[T] { var conn: java.sql.Connection = _ var prepinsert: PreparedStatement = _ var prepupdate: PreparedStatement = _

override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://192.168.1.128:3306/test?useSSL=false&characterEncoding=utf8", "root", "123456") }

override def invoke(value: T, context: SinkFunction.Context): Unit = { value match { case (is_click: String, count: Int) => val insertSql = "insert into count_click value(?, ?)" val updateSql = "update count_click set count = ? where is_click = ?" prepupdate = conn.prepareStatement(updateSql) prepupdate.setObject(1, count) prepupdate.setObject(2, is_click) prepupdate.execute()

    if (prepupdate.getUpdateCount == 0){
      prepinsert = conn.prepareStatement(insertSql)
      prepinsert.setObject(1, is_click)
      prepinsert.setObject(2, count)
      prepinsert.execute()
    }
  case ... //其他元组类型的处理逻辑
}

}

override def close(): Unit = { if (prepinsert != null) { prepinsert.close() }

if (prepupdate != null) {
  prepupdate.close()
}
if (conn != null) {
  conn.close()
}

} }

在调用该类时,需要指定泛型参数类型,例如:

val mysqlMapper = new MyMysqlMapper(String, Int) val mysqlMapper2 = new MyMysqlMapper(String, String, Double) ...

class MyMysqlMapper extends RichSinkFunctionString Int var conn javasqlConnection = _ var prepinsert PreparedStatement = _ var prepupdate PreparedStatement = _ override def openparameters Configu

原文地址: https://www.cveoy.top/t/topic/IOU 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录