Flink 动态配置实现:使用广播状态共享配置信息
这段代码是 Flink 实现动态配置的关键部分,主要是将配置信息封装为 kv 并放到广播状态中,以便在整个 Flink 应用程序中进行共享和访问。具体实现中,首先获取操作类型和广播状态,如果是删除操作,则将对应的配置信息从广播状态和配置 Map 中删除掉;否则,将配置信息封装为 kv 放到广播状态和配置 Map 中。其中,key 是由 sourceTable 和 sourceType 拼接而成的字符串,用于在广播状态和配置 Map 中进行唯一标识和访问。这样,当配置信息发生变化时,只需要修改配置表中的数据,Flink 应用程序就能自动获取到最新的配置信息,从而实现了动态配置。
@Override
public void processBroadcastElement(String jsonStr, Context ctx, Collector
if('d'.equals(op)){
//删除操作 将配置信息从广播状态中删除掉
TableProcess before = jsonObj.getObject('before', TableProcess.class);
String sinkType = before.getSinkType();
if('dwd'.equals(sinkType)){
String sourceTable = before.getSourceTable();
String sourceType = before.getSourceType();
String key = sourceTable + ':' + sourceType;
broadcastState.remove(key);
configMap.remove(key);
}
}else{
//删除外的其它操作 将配置信息封装为kv放到广播状态中
TableProcess after = jsonObj.getObject('after', TableProcess.class);
String sinkType = after.getSinkType();
if('dwd'.equals(sinkType)){
String sourceTable = after.getSourceTable();
String sourceType = after.getSourceType();
String key = sourceTable + ':' + sourceType;
broadcastState.put(key,after);
configMap.put(key,after);
}
}
}
原文地址: https://www.cveoy.top/t/topic/nyzG 著作权归作者所有。请勿转载和采集!