这段代码是 Flink 实现动态配置的关键部分,主要是将配置信息封装为 kv 并放到广播状态中,以便在整个 Flink 应用程序中进行共享和访问。具体实现中,首先获取操作类型和广播状态,如果是删除操作,则将对应的配置信息从广播状态和配置 Map 中删除掉;否则,将配置信息封装为 kv 放到广播状态和配置 Map 中。其中,key 是由 sourceTable 和 sourceType 拼接而成的字符串,用于在广播状态和配置 Map 中进行唯一标识和访问。这样,当配置信息发生变化时,只需要修改配置表中的数据,Flink 应用程序就能自动获取到最新的配置信息,从而实现了动态配置。

@Override public void processBroadcastElement(String jsonStr, Context ctx, Collector out) throws Exception { //为了操作方便,将jsonstr转换为jsonObj JSONObject jsonObj = JSON.parseObject(jsonStr); //获取操作类型 String op = jsonObj.getString('op'); //获取广播状态 BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

if('d'.equals(op)){
    //删除操作  将配置信息从广播状态中删除掉
    TableProcess before = jsonObj.getObject('before', TableProcess.class);
    String sinkType = before.getSinkType();
    if('dwd'.equals(sinkType)){
        String sourceTable = before.getSourceTable();
        String sourceType = before.getSourceType();
        String key = sourceTable + ':' + sourceType;
        broadcastState.remove(key);
        configMap.remove(key);
    }
}else{
    //删除外的其它操作   将配置信息封装为kv放到广播状态中
    TableProcess after = jsonObj.getObject('after', TableProcess.class);
    String sinkType = after.getSinkType();
    if('dwd'.equals(sinkType)){
        String sourceTable = after.getSourceTable();
        String sourceType = after.getSourceType();
        String key = sourceTable + ':' + sourceType;
        broadcastState.put(key,after);
        configMap.put(key,after);
    }
}

}


原文地址: https://www.cveoy.top/t/topic/nyzG 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录