使用 Flink 将数据写入 MySQL 数据库
val data: DataSet[Row] = env.fromCollection(arr)
data.output(JDBCOutputFormat.buildJDBCOutputFormat()
// 数据库连接驱动名称
.setDrivername("com.mysql.jdbc.Driver")
// 数据库连接地址
.setDBUrl("jdbc:mysql://localhost:3306/flink")
// 数据库连接用户名
.setUsername("root")
// 数据库连接密码
.setPassword("123456")
// 数据库插入SQL
.setQuery("insert into student (sno,cno,grade) values(?,?,?)")
.finish())
env.execute("insert data to mysql")
System.out.println("MySQL写入成功!")
}```
上述代码的含义是将一个数组转换为一个DataSet并将其插入到MySQL数据库中。
首先,通过fromCollection方法将数组arr转换为一个DataSet[Row]。然后,使用JDBCOutputFormat.buildJDBCOutputFormat()方法创建一个JDBCOutputFormat对象。
接下来,通过调用setDrivername方法设置数据库连接驱动名称为com.mysql.jdbc.Driver。然后,通过调用setDBUrl方法设置数据库连接地址为jdbc:mysql://localhost:3306/flink。接着,通过调用setUsername和setPassword方法设置数据库连接的用户名和密码。
然后,通过调用setQuery方法设置要执行的插入SQL语句为"insert into student (sno,cno,grade) values(?,?,?)"。
最后,通过调用finish方法完成JDBCOutputFormat的构建,并将其作为output方法的参数传递给DataSet。然后,调用env.execute方法执行任务,并输出"MySQL写入成功!"。
输出的结果将取决于具体的业务逻辑和数据。如果插入成功,将输出"MySQL写入成功!",否则将抛出异常。
原文地址: http://www.cveoy.top/t/topic/oUD9 著作权归作者所有。请勿转载和采集!