Java 定时任务分片处理数据,优化代码解决数据漏掉和重复处理问题

在使用多实例运行定时任务,并使用分片的方式处理数据时,可能会出现由于某个实例休眠失败导致分片被其他实例占用,从而导致数据漏掉或者重复处理的问题。本文将探讨如何优化代码,使用 Redis 分布式锁来解决这个问题。

原始代码:

@Scheduled(cron = "${cs.task.cron.custtag.sync:0 0/15 * * * ? }")
public void localRun() {
    Date beginDate = new Date();
    log.info("localRun start currentTime={}", beginDate);
    String currentDate = DateUtil.formatDateYMDH(beginDate);

    String lockKey = RedisKeyConstants.LOCK_KEY_PREFIX_ + "cust_tag_sync:" + currentDate;
    Long currentSplit = RedisUtil.incrBy(lockKey, 1, 60);
    try {
        TimeUnit.SECONDS.sleep(30);
        runByOneDay(beginDate, currentSplit, NumberUtils.createLong(RedisUtil.get(lockKey)));
    } catch (InterruptedException e) {
        // 休眠失败怎么办?分片已经占用了
        log.error("本地定时任务获取分片休眠失败", e);
    }
}

优化后的代码:

@Scheduled(cron = "${cs.task.cron.custtag.sync:0 0/15 * * * ? }")
public void localRun() {
    Date beginDate = new Date();
    log.info('localRun start currentTime={}', beginDate);
    String currentDate = DateUtil.formatDateYMDH(beginDate);

    String lockKey = RedisKeyConstants.LOCK_KEY_PREFIX_ + 'cust_tag_sync:' + currentDate;
    Long currentSplit = null;
    boolean lockAcquired = false;
    try {
        // 尝试获取锁
        lockAcquired = RedisUtil.tryLock(lockKey, 60);
        if (!lockAcquired) {
            // 获取锁失败,分片已经被占用,直接返回
            log.warn('本地定时任务获取分片失败,分片已被占用,lockKey={}', lockKey);
            return;
        }
        currentSplit = RedisUtil.incrBy(lockKey, 1);
    } finally {
        if (lockAcquired) {
            // 释放锁
            RedisUtil.unlock(lockKey);
        }
    }
    try {
        TimeUnit.SECONDS.sleep(30);
        // 重新获取分片,确保获取到的值是最新的
        Long currentSplitFromRedis = RedisUtil.get(lockKey);
        if (currentSplit.equals(currentSplitFromRedis)) {
            // 分片数没有改变,可以处理数据
            runByOneDay(beginDate, currentSplit, currentSplitFromRedis);
        } else {
            // 分片数已经改变,可能有数据被漏掉,需要重新分配
            log.warn('本地定时任务分片数已经改变,currentSplit={}, currentSplitFromRedis={}', currentSplit, currentSplitFromRedis);
        }
    } catch (InterruptedException e) {
        log.error('本地定时任务获取分片休眠失败', e);
    }
}

优化说明:

  1. 使用 Redis 分布式锁:优化后的代码使用 Redis 的 tryLock 方法来获取锁,只有获取到锁的实例才能进行分片操作。
  2. 重新获取分片:在处理数据之前,重新获取分片数,确保获取到的值是最新的,避免数据漏掉的问题。
  3. 释放锁:将释放锁的操作放在 finally 块中,即使出现异常也能够释放锁,避免死锁。

总结:

通过使用 Redis 分布式锁,优化后的代码能够有效地解决定时任务分片处理数据时出现的数据漏掉和重复处理的问题,确保每个实例都能获取到最新的分片数据,并避免数据重复处理。

注意:

  • 需要根据实际情况调整 tryLock 方法的超时时间。
  • 如果 Redis 连接失败,需要进行相应的处理,例如重试或者记录日志。
  • 如果分片数发生变化,需要重新分配分片,并确保每个实例都能获取到新的分片信息。

希望本文能够帮助你更好地理解如何使用 Redis 分布式锁优化 Java 定时任务。如有任何问题,欢迎留言讨论。

Java 定时任务分片处理数据,优化代码解决数据漏掉和重复处理问题

原文地址: https://www.cveoy.top/t/topic/ozx8 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录