public class CallStatusThread implements Runnable { private static final Logger LOGGER = Logger.getLogger(CallStatusThread.class); private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Constants.UpdateCall.POOL_SIZE);

private final BotServiceConsumer consumer;
private final Map<Integer, List<CampaignBO>> campaignMap;
private final Map<Integer, String> templateMap;

public CallStatusThread() {
    campaignMap = new HashMap<>();
    templateMap = new HashMap<>();

    // get list of botId
    List<CampaignBO> campaignBOS = new CampaignDAL().getCampaignsConfig();
    Set<Integer> botIdSet = campaignBOS.stream().map(CampaignBO::getBotId).collect(Collectors.toSet());

    // map botId to campaign list
    botIdSet.forEach(botId -> campaignMap.put(botId, new CampaignDAL().getCampaignsConfig(botId)));

    // map botId to template
    new CallOutMessageDAL().getMessagesBO().forEach(callOutMessageBO -> {
        int botId = callOutMessageBO.getBotId();
        templateMap.computeIfAbsent(botId, k -> callOutMessageBO.getTemplatePostCall());
    });

    consumer = new BotServiceConsumer(Constants.UpdateCall.Kafka.BROKER, Constants.UpdateCall.Kafka.STATUS_TOPIC, Constants.UpdateCall.Kafka.STATUS_GROUP, Constants.UpdateCall.Kafka.MAX_POLL_RECORDS);
}

private void processingCallStatus() throws ParseException {
    consumer.subscribe();
    CallOutInfoDAL connector = new CallOutInfoDAL();
    LOGGER.info("Starting UpdateCallStatusThread ...");

    while (true) {
        ConsumerRecords<String, String> records = consumer.getRecords();
        if (records == null || records.isEmpty()) {
            continue;
        }
        LOGGER.info("Consume " + records.count() + " messages");
        List<CallOutInfoBO> callStatusBOS = new ArrayList<>();

        for (ConsumerRecord<String, String> record : records) {
            try {
                CallOutInfoBO bo = MappingObjectJson.convertJsonToObject(record.value(), CallOutInfoBO.class);
                callStatusBOS.add(bo);

                int botId = bo.getBotId();
                int maxRetry = campaignMap.get(botId).get(0).getRetryTime();
                int callTimes = bo.getTimeCallOfDay();

                if (callTimes > maxRetry && !templateMap.getOrDefault(botId, "").isEmpty()) {
                    EXECUTOR_SERVICE.execute(() -> {
                        try {
                            boolean isUpdated = update(bo, templateMap.get(botId), botId);
                            LOGGER.info("Update result : " + isUpdated + ", botId=" + botId + ", " + bo);
                        } catch (Exception e) {
                            LOGGER.error("Update ERROR, botId = " + botId + ", " + bo, e);
                        }
                    });
                } else {
                    LOGGER.info("Don't Update with botId = " + botId + ", " + bo);
                }
            } catch (Exception e) {
                LOGGER.error("Process message " + record.value() + " error", e);
            }
        }

        connector.insert(callStatusBOS.toArray(new CallOutInfoBO[0]));
    }
}

public boolean update(CallOutInfoBO bo, String template, int botId) throws Exception {
    if (botId == 9) {
        return HttpClientFactory.updateCallStatusToOrder(bo);
    }
    return true;
}

public void start() {
    new Thread(this).start();
}

@Override
public void run() {
    try {
        processingCallStatus();
    } catch (ParseException e) {
        LOGGER.error("processingCallStatus error", e);
    }
}

}

Tối ưu mã nguồnpublic class CallStatusThread implements Runnable map botId voi campaign private HashMapInteger ListCampaignBO campaignMap; map botId voi template cap nhat trang thai cuoc g

原文地址: https://www.cveoy.top/t/topic/0oF 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录