flink读取kafka和redis的数据写入mysql
下面是一个使用Flink读取Kafka和Redis的数据,并将数据写入MySQL的示例代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.util.Properties;
public class FlinkKafkaRedisToMysqlExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka消费者配置
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink-kafka-redis-mysql");
// 创建Kafka数据流
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), kafkaProps);
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
// 将Kafka数据写入Redis
FlinkJedisConfigBase redisConf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
kafkaStream.addSink(new RedisSink<>(redisConf, new RedisSinkMapper()));
// 从Redis读取数据
FlinkJedisConfigBase redisReadConf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
DataStream<Tuple2<String, String>> redisStream = env.addSource(new RedisSource(redisReadConf));
// 将Redis数据写入MySQL
redisStream.map(new RedisToMysqlMapper()).addSink(new MysqlSink());
// 执行任务
env.execute("Flink Kafka Redis MySQL Example");
}
// RedisSink映射器
public static class RedisSinkMapper implements RedisMapper<String> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET, null);
}
@Override
public String getKeyFromData(String data) {
return "redis-key";
}
@Override
public String getValueFromData(String data) {
return data;
}
}
// RedisSource
public static class RedisSource extends RichSourceFunction<Tuple2<String, String>> {
private FlinkJedisConfigBase redisConf;
private transient Jedis jedis;
public RedisSource(FlinkJedisConfigBase redisConf) {
this.redisConf = redisConf;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
jedis = new Jedis(redisConf.getHost(), redisConf.getPort());
}
@Override
public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
while (!ctx.isCancelled()) {
// 从Redis中读取数据
String data = jedis.get("redis-key");
Tuple2<String, String> tuple = new Tuple2<>("redis-key", data);
ctx.collect(tuple);
Thread.sleep(1000); // 每秒读取一次
}
}
@Override
public void cancel() {
jedis.close();
}
}
// RedisToMysqlMapper
public static class RedisToMysqlMapper implements MapFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(Tuple2<String, String> value) throws Exception {
return new Tuple2<>(value.f0, value.f1.length());
}
}
// MysqlSink
public static class MysqlSink extends RichSinkFunction<Tuple2<String, Integer>> {
private Connection conn;
private PreparedStatement stmt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 创建MySQL连接
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password");
stmt = conn.prepareStatement("INSERT INTO result (key, length) VALUES (?, ?)");
}
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
stmt.setString(1, value.f0);
stmt.setInt(2, value.f1);
stmt.executeUpdate();
}
@Override
public void close() throws Exception {
stmt.close();
conn.close();
}
}
}
请根据你的实际情况修改代码中的连接参数和数据表结构
原文地址: https://www.cveoy.top/t/topic/iVbL 著作权归作者所有。请勿转载和采集!