使用泛型处理不同类型数据插入MySQL
使用泛型来解决这个问题。将元组类型改为一个泛型类型,然后在类定义时指定泛型参数类型。例如:
class MyMysqlMapper[T] extends RichSinkFunction[T] {
var conn: java.sql.Connection = _
var prepinsert: PreparedStatement = _
var prepupdate: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection('jdbc:mysql://192.168.1.128:3306/test?useSSL=false&characterEncoding=utf8', 'root', '123456')
}
override def invoke(value: T, context: SinkFunction.Context): Unit = {
value match {
case (is_click: String, count: Int) =>
val insertSql = 'insert into count_click value(?, ?)'
val updateSql = 'update count_click set `count` = ? where `is_click` = ?'
prepupdate = conn.prepareStatement(updateSql)
prepupdate.setObject(1, count)
prepupdate.setObject(2, is_click)
prepupdate.execute()
if (prepupdate.getUpdateCount == 0){
prepinsert = conn.prepareStatement(insertSql)
prepinsert.setObject(1, is_click)
prepinsert.setObject(2, count)
prepinsert.execute()
}
case ... //其他元组类型的处理逻辑
}
}
override def close(): Unit = {
if (prepinsert != null) {
prepinsert.close()
}
if (prepupdate != null) {
prepupdate.close()
}
if (conn != null) {
conn.close()
}
}
}
在调用该类时,需要指定泛型参数类型,例如:
val mysqlMapper = new MyMysqlMapper[(String, Int)]()
val mysqlMapper2 = new MyMysqlMapper[(String, String, Double)]()
...
原文地址: https://www.cveoy.top/t/topic/lZLD 著作权归作者所有。请勿转载和采集!