val data: DataSet[Row] = env.fromCollection(arr)
data.output(JDBCOutputFormat.buildJDBCOutputFormat()
  // 数据库连接驱动名称
  .setDrivername("com.mysql.jdbc.Driver")
  // 数据库连接地址
  .setDBUrl("jdbc:mysql://localhost:3306/flink")
  // 数据库连接用户名
  .setUsername("root")
  // 数据库连接密码
  .setPassword("123456")
  // 数据库插入SQL
  .setQuery("insert into student (sno,cno,grade) values(?,?,?)")
  .finish())

env.execute("insert data to mysql")
System.out.println("MySQL写入成功!")

}```

上述代码的含义是将一个数组转换为一个DataSet并将其插入到MySQL数据库中。

首先,通过fromCollection方法将数组arr转换为一个DataSet[Row]。然后,使用JDBCOutputFormat.buildJDBCOutputFormat()方法创建一个JDBCOutputFormat对象。

接下来,通过调用setDrivername方法设置数据库连接驱动名称为com.mysql.jdbc.Driver。然后,通过调用setDBUrl方法设置数据库连接地址为jdbc:mysql://localhost:3306/flink。接着,通过调用setUsername和setPassword方法设置数据库连接的用户名和密码。

然后,通过调用setQuery方法设置要执行的插入SQL语句为"insert into student (sno,cno,grade) values(?,?,?)"。

最后,通过调用finish方法完成JDBCOutputFormat的构建,并将其作为output方法的参数传递给DataSet。然后,调用env.execute方法执行任务,并输出"MySQL写入成功!"。

输出的结果将取决于具体的业务逻辑和数据。如果插入成功,将输出"MySQL写入成功!",否则将抛出异常。

原文地址: http://www.cveoy.top/t/topic/oUD9 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录