import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 定时任务调度器,用于管理和执行定时任务。
 * 该调度器基于Netty的HashedWheelTimer实现,提供高效可靠的定时任务执行功能。
 */
@Component
@Slf4j
public final class Scheduler {
    // 最大轮次大小
    public static final int MAX_TICKS_PER_WHEEL = 2048;
    // 最小时间间隔(毫秒)
    public static final int MINIMAL_TICK_IN_MILLIS = 1;

    // 定时器
    private final HashedWheelTimer hashedWheelTimer;

    // 任务ID生成器
    private final AtomicLong timerIdGen = new AtomicLong();

    // 任务ID到Timeout的映射
    private final ConcurrentHashMap<Object, Timeout> timerKey2Timeouts = new ConcurrentHashMap<>();

    @Autowired
    public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
        // 获取配置信息
        final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
        // 设置时间间隔,至少为1毫秒
        final int tickInMillis = Math.max(MINIMAL_TICK_IN_MILLIS, schedulerConfig.getTickDurationInMillis());
        // 设置轮次大小,至少为MAX_TICKS_PER_WHEEL
        final int ticksPerWheel = Math.max(MAX_TICKS_PER_WHEEL, schedulerConfig.getTicksPerWheel());

        // 初始化HashedWheelTimer
        this.hashedWheelTimer = new HashedWheelTimer(
            new NamedThreadFactory("localserver-scheduler"),
            tickInMillis, TimeUnit.MILLISECONDS,
            ticksPerWheel
        );
    }

    /**
     * 取消任务
     * @param timerKey 任务ID
     * @return 是否取消成功
     */
    public boolean cancel(final Object timerKey) {
        final Timeout timeout = timerKey2Timeouts.get(timerKey);
        return tryCancel(timeout);
    }

    /**
     * 延迟执行一次性任务
     * @param delay 延迟时间
     * @param runnable 任务执行体
     * @param context 上下文
     * @return 任务ID
     */
    public Long scheduleOnce(final Duration delay, final Runnable runnable, final Context context) {
        return scheduleOnce(timerIdGen.incrementAndGet(), delay, runnable, context);
    }

    /**
     * 延迟执行一次性任务
     * @param timerKey 任务ID
     * @param delay 延迟时间
     * @param runnable 任务执行体
     * @param context 上下文
     */
    public void scheduleOnce(final Object timerKey, final Duration delay, final Runnable runnable, final Context context) {
        // 参数校验
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");

        // 计算任务并放入定时器
        timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
            tryCancel(timeout);
            return hashedWheelTimer.newTimeout(t -> {
                context.runOnContext(event -> {
                    // 执行任务并从定时器中移除
                    timerKey2Timeouts.remove(timerKey);
                    runnable.run();
                });
            }, delay.toNanos(), TimeUnit.NANOSECONDS);
        });
    }

    /**
     * 延迟执行固定间隔任务
     * @param initialDelay 初始延迟时间
     * @param delay 固定间隔时间
     * @param runnable 任务执行体
     * @param context 上下文
     * @return 任务ID
     */
    public Long scheduleWithFixDelay(final Duration initialDelay, final Duration delay, final Runnable runnable, final Context context) {
        return scheduleWithFixDelay(timerIdGen.incrementAndGet(), initialDelay, delay, runnable, context);
    }

    /**
     * 延迟执行固定间隔任务
     * @param timerKey 任务ID
     * @param initialDelay 初始延迟时间
     * @param delay 固定间隔时间
     * @param runnable 任务执行体
     * @param context 上下文
     */
    public void scheduleWithFixDelay(final Object timerKey, final Duration initialDelay, final Duration delay, final Runnable runnable, final Context context) {
        // 参数校验
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        Objects.requireNonNull(initialDelay, "initialDelay should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");

        // 创建任务执行体,实现固定间隔的自动执行
        final Runnable scheduledRunnable = new Runnable() {
            final Runnable self = this;

            @Override
            public void run() {
                try {
                    // 执行任务
                    runnable.run();
                } catch (Exception e) {
                    // 记录错误日志
                    log.error("Error when trigger task of runnable:[{}]", runnable, e);
                } finally {
                    // 重新调度任务
                    timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
                        tryCancel(timeout);
                        return scheduleWithTimerKey(delay, self, context);
                    });
                }
            }
        };

        // 计算任务并放入定时器
        timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
            tryCancel(timeout);
            return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
        });
    }

    /**
     * 尝试取消任务
     * @param timeout 任务的Timeout对象
     * @return 是否取消成功
     */
    private boolean tryCancel(final Timeout timeout) {
        if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
            return true;
        }
        return timeout.cancel();
    }

    /**
     * 使用任务ID创建定时任务
     * @param delay 延迟时间
     * @param runnable 任务执行体
     * @param context 上下文
     * @return 任务的Timeout对象
     */
    private Timeout scheduleWithTimerKey(final Duration delay, final Runnable runnable, final Context context) {
        return hashedWheelTimer.newTimeout(t -> context.runOnContext(event -> runnable.run()), delay.toNanos(), TimeUnit.NANOSECONDS);
    }

    /**
     * 初始化定时器
     */
    @PostConstruct
    protected void init() {
        hashedWheelTimer.start();
    }

    /**
     * 停止定时器
     */
    @PreDestroy
    protected void stop() {
        final Set<Timeout> timeouts = hashedWheelTimer.stop();
        if (CollectionUtils.isNotEmpty(timeouts)) {
            timeouts.stream()
                .filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
                .forEach(timeout -> {
                    final TimerTask task = timeout.task();
                    try {
                        // 执行任务
                        task.run(timeout);
                    } catch (Exception e) {
                        // 忽略异常
                    }
                });
        }
    }
}
PowerMsg3 Local Server SDK Scheduler - 高性能定时任务管理

原文地址: https://www.cveoy.top/t/topic/lH7I 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录