该类是一个调度器,使用 Netty 的 HashedWheelTimer 实现定时任务的调度。具体来说,它提供以下功能:

  • 可以根据 'timerId' 或 'timerKey' 取消指定的定时任务。
  • 可以执行一次性任务或以固定延迟执行任务。
  • 可以根据 'timerId' 或 'timerKey' 执行一次性任务或以固定延迟执行任务。

为了方便使用,该类提供了两个 ConcurrentHashMap 对象来存储定时任务的信息。其中,'timerId2Timeout' 存储的是 'timerId' 与 Timeout 对象的映射关系;'timerKey2Timeouts' 存储的是 'timerKey' 与 Timeout 对象的映射关系。

该类的构造函数接收一个 PmLocalServerSDKConfig 对象作为参数,用于获取调度器的配置信息。在构造函数中,它使用 HashedWheelTimer 配置对象创建了一个 HashedWheelTimer 对象,用于执行定时任务。

该类提供了 'init' 和 'stop' 方法,在 'init' 方法中启动了 HashedWheelTimer,在 'stop' 方法中停止了 HashedWheelTimer,并执行了所有未取消的定时任务。

下面是该类的完整代码:

import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


@Component
@Slf4j
public class Scheduler {
    private final HashedWheelTimer hashedWheelTimer;

    private final AtomicLong timerIdGen = new AtomicLong();

    private final ConcurrentHashMap<Long, Timeout> timerId2Timeout = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Object, Timeout> timerKey2Timeouts = new ConcurrentHashMap<>();

    @Autowired
    public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
        final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
        final int tickInMillis = Math.max(1, schedulerConfig.getTickDurationInMillis());
        final int ticksPerWheel = Math.max(512, schedulerConfig.getTicksPerWheel());
        this.hashedWheelTimer = new HashedWheelTimer(
            new NamedThreadFactory('localserver-scheduler'),
            tickInMillis, TimeUnit.MILLISECONDS,
            ticksPerWheel
        );
    }

    public boolean cancel(final Long timerId) {
        final Timeout timeout = timerId2Timeout.get(timerId);
        tryCancel(timeout);
        return true;
    }

    public boolean cancel(final Object timerKey) {
        final Timeout timeout = timerKey2Timeouts.get(timerKey);
        tryCancel(timeout);
        return true;
    }

    public Long scheduleOnce(final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        Objects.requireNonNull(delay, 'delay should not be null.');
        Objects.requireNonNull(runnable, 'runnable should not be null.');
        Objects.requireNonNull(context, 'context should not be null.');
        final Long timerId = timerIdGen.incrementAndGet();
        timerId2Timeout.compute(timerId, (key, timeout) ->
            hashedWheelTimer.newTimeout(t -> {
                context.runOnContext(event -> {
                    timerId2Timeout.remove(timerId);
                    runnable.run();
                });
            }, delay.toNanos(), TimeUnit.NANOSECONDS));
        return timerId;
    }

    public void scheduleOnce(final Object timerKey,
                             final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        Objects.requireNonNull(timerKey, 'timerKey should not be null.');
        Objects.requireNonNull(delay, 'delay should not be null.');
        Objects.requireNonNull(runnable, 'runnable should not be null.');
        Objects.requireNonNull(context, 'context should not be null.');
        timerKey2Timeouts.compute(
            timerKey,
            (key, timeout) -> {
                tryCancel(timeout);
                return hashedWheelTimer.newTimeout(t -> {
                    context.runOnContext(event -> {
                        timerKey2Timeouts.remove(timerKey);
                        runnable.run();
                    });
                }, delay.toNanos(), TimeUnit.NANOSECONDS);
            }
        );
    }

    public Long scheduleWithFixDelay(final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        Objects.requireNonNull(initialDelay, 'initialDelay should not be null.');
        Objects.requireNonNull(delay, 'delay should not be null.');
        Objects.requireNonNull(runnable, 'runnable should not be null.');
        Objects.requireNonNull(context, 'context should not be null.');
        final Long timerId = timerIdGen.incrementAndGet();
        final Runnable scheduledRunnable = new Runnable() {
            final Runnable self = this;

            @Override
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    log.error('Error when trigger task of runnable:[{}]', runnable, e);
                } finally {
                    timerId2Timeout.compute(timerId, (key, timeout) -> {
                        tryCancel(timeout);
                        return scheduleWithTimerId(delay, self, context);
                    });
                }
            }
        };
        timerId2Timeout.compute(timerId, (key, timeout) ->
            scheduleWithTimerId(initialDelay, scheduledRunnable, context));
        return timerId;
    }

    public void scheduleWithFixDelay(final Object timerKey,
                                     final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        Objects.requireNonNull(timerKey, 'timerKey should not be null.');
        Objects.requireNonNull(initialDelay, 'initialDelay should not be null.');
        Objects.requireNonNull(delay, 'delay should not be null.');
        Objects.requireNonNull(runnable, 'runnable should not be null.');
        Objects.requireNonNull(context, 'context should not be null.');
        final Runnable scheduledRunnable = new Runnable() {
            final Runnable self = this;
            @Override
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    log.error('Error when trigger task of runnable:[{}]', runnable, e);
                } finally {
                    timerKey2Timeouts.compute(
                        timerKey,
                        (key, timeout) -> {
                            tryCancel(timeout);
                            return scheduleWithTimerKey(delay, self, context);
                        });
                }
            }
        };

        timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
            tryCancel(timeout);
            return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
        });
    }

    private boolean tryCancel(final Timeout timeout) {
        if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
            return true;
        }
        return timeout.cancel();
    }

    private Timeout scheduleWithTimerId(final Duration delay,
                                        final Runnable runnable,
                                        final Context context) {
        return hashedWheelTimer.newTimeout(
            t -> context.runOnContext(event -> runnable.run()),
            delay.toNanos(),
            TimeUnit.NANOSECONDS);
    }

    private Timeout scheduleWithTimerKey(final Duration delay,
                                         final Runnable runnable,
                                         final Context context) {
        return hashedWheelTimer.newTimeout(
            t -> context.runOnContext(event -> runnable.run()),
            delay.toNanos(),
            TimeUnit.NANOSECONDS);
    }


    @PostConstruct
    protected void init() {
        hashedWheelTimer.start();
    }

    @PreDestroy
    protected void stop() {
        final Set<Timeout> timeouts = hashedWheelTimer.stop();
        if (CollectionUtils.isNotEmpty(timeouts)) {
            for (Timeout timeout : timeouts) {
                if (timeout.isCancelled() || timeout.isExpired()) {
                    continue;
                }
                final TimerTask task = timeout.task();
                try {
                    task.run(timeout);
                } catch (Exception e) {
                    //Ignore
                }
            }
        }
    }
}
Java 定时任务调度器:使用 Netty HashedWheelTimer 实现

原文地址: https://www.cveoy.top/t/topic/lH5o 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录