使用泛型来解决这个问题。将元组类型改为一个泛型类型,然后在类定义时指定泛型参数类型。例如:

class MyMysqlMapper[T] extends RichSinkFunction[T] {
  var conn: java.sql.Connection = _
  var prepinsert: PreparedStatement = _
  var prepupdate: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection('jdbc:mysql://192.168.1.128:3306/test?useSSL=false&characterEncoding=utf8', 'root', '123456')
  }

  override def invoke(value: T, context: SinkFunction.Context): Unit = {
    value match {
      case (is_click: String, count: Int) =>
        val insertSql = 'insert into count_click value(?, ?)' 
        val updateSql = 'update count_click set `count` = ? where `is_click` = ?' 
        prepupdate = conn.prepareStatement(updateSql)
        prepupdate.setObject(1, count)
        prepupdate.setObject(2, is_click)
        prepupdate.execute()

        if (prepupdate.getUpdateCount == 0){
          prepinsert = conn.prepareStatement(insertSql)
          prepinsert.setObject(1, is_click)
          prepinsert.setObject(2, count)
          prepinsert.execute()
        }
      case ... //其他元组类型的处理逻辑
    }
  }

  override def close(): Unit = {
    if (prepinsert != null) {
      prepinsert.close()
    }

    if (prepupdate != null) {
      prepupdate.close()
    }
    if (conn != null) {
      conn.close()
    }
  }
}

在调用该类时,需要指定泛型参数类型,例如:

val mysqlMapper = new MyMysqlMapper[(String, Int)]()
val mysqlMapper2 = new MyMysqlMapper[(String, String, Double)]()
...

原文地址: https://www.cveoy.top/t/topic/lZLD 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录