3使用Flink CDC 将用户表user_info、省份表base_provice同步到Redis中做旁路缓存。5分
需要先按照以下步骤进行配置:
-
配置Flink CDC连接到MySQL数据库,读取数据变化。
-
配置Flink连接到Redis,将数据同步到Redis中。
-
配置好CDC和Flink的运行环境,启动CDC和Flink任务。
代码示例:
// CDC配置
bin/flink-cdc.sh \
--mode mysql-binlog \
--mysql.server-id=1 \
--mysql.host=localhost \
--mysql.port=3306 \
--mysql.username=root \
--mysql.password=123456 \
--mysql.database.name=user_db \
--cdc.table.whitelist=user_info,base_province \
--cdc.startup.mode=latest-offset \
--cdc.startup.timestamp-millis=0 \
--sink.format=changelog-json \
--sink.topic=user_info,base_province \
--sink.partition-key-fields=user_id,province_id \
--properties-file /path/to/flink-cdc.properties
// Flink配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-cdc-redis");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("user_info,base_province", new SimpleStringSchema(), properties);
consumer.setStartFromLatest();
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
JSONObject json = JSON.parseObject(value);
String table = json.getString("table");
String key = json.getString("key");
String type = json.getString("type");
String data = json.getString("data");
if ("user_info".equals(table)) {
JSONObject user = JSON.parseObject(data);
String userId = user.getString("user_id");
String userName = user.getString("user_name");
String userAge = user.getString("user_age");
String userGender = user.getString("user_gender");
return new Tuple2<>("user:" + userId, userName + "," + userAge + "," + userGender);
} else if ("base_province".equals(table)) {
JSONObject province = JSON.parseObject(data);
String provinceId = province.getString("province_id");
String provinceName = province.getString("province_name");
return new Tuple2<>("province:" + provinceId, provinceName);
} else {
return null;
}
}
})
.filter(Objects::nonNull)
.addSink(new RedisSink<>(new FlinkJedisPoolConfig.Builder().setHost("localhost").build(), new RedisMapper<Tuple2<String, String>>() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
})).setParallelism(1);
env.execute("Flink CDC to Redis");
上述代码中,首先使用Flink CDC从MySQL中读取user_info和base_province表的数据,并将其写入Kafka中。然后使用Flink从Kafka中读取数据,将其解析后写入Redis中,缓存到Redis中。其中,使用了Flink提供的RedisSink来将数据写入Redis中
原文地址: http://www.cveoy.top/t/topic/gT9v 著作权归作者所有。请勿转载和采集!