import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public final class Scheduler {
    private static final int DEFAULT_TICK_DURATION_IN_MILLIS = 1;
    private static final int DEFAULT_TICKS_PER_WHEEL = 512;

    private final HashedWheelTimer hashedWheelTimer;

    private final ConcurrentHashMap<Object, Timeout> timerKey2Timeouts = new ConcurrentHashMap<>();

    @Autowired
    public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
        final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
        final int tickInMillis = Math.max(DEFAULT_TICK_DURATION_IN_MILLIS, schedulerConfig.getTickDurationInMillis());
        final int ticksPerWheel = Math.max(DEFAULT_TICKS_PER_WHEEL, schedulerConfig.getTicksPerWheel());
        this.hashedWheelTimer = new HashedWheelTimer(
            new NamedThreadFactory("localserver-scheduler"),
            tickInMillis, TimeUnit.MILLISECONDS,
            ticksPerWheel
        );
    }

    public boolean cancel(final Object timerKey) {
        final Timeout timeout = timerKey2Timeouts.get(timerKey);
        return tryCancel(timeout);
    }

    public Long scheduleOnce(final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        return scheduleOnce(null, delay, runnable, context);
    }

    public Long scheduleOnce(final Object timerKey,
                             final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        return scheduleWithTimerKey(timerKey, delay, runnable, context, false);
    }

    public Long scheduleWithFixDelay(final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        return scheduleWithFixDelay(null, initialDelay, delay, runnable, context);
    }

    public Long scheduleWithFixDelay(final Object timerKey,
                                     final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        return scheduleWithTimerKey(timerKey, initialDelay, runnable, context, true);
    }

    private Long scheduleWithTimerKey(final Object timerKey,
                                         final Duration delay,
                                         final Runnable runnable,
                                         final Context context,
                                         final boolean isFixedDelay) {
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");

        final Runnable scheduledRunnable = () -> {
            try {
                runnable.run();
            } catch (Exception e) {
                log.error("Error when trigger task of runnable: [{}]", runnable, e);
            }
            if (isFixedDelay) {
                scheduleWithTimerKey(timerKey, delay, scheduledRunnable, context, true);
            }
        };

        return timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
            tryCancel(timeout);
            return hashedWheelTimer.newTimeout(
                t -> context.runOnContext(event -> scheduledRunnable.run()),
                delay.toNanos(),
                TimeUnit.NANOSECONDS
            );
        });
    }

    private boolean tryCancel(final Timeout timeout) {
        if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
            return true;
        }
        return timeout.cancel();
    }

    @PostConstruct
    protected void init() {
        hashedWheelTimer.start();
    }

    @PreDestroy
    protected void stop() {
        final Set<Timeout> timeouts = hashedWheelTimer.stop();
        if (CollectionUtils.isNotEmpty(timeouts)) {
            timeouts.stream()
                .filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
                .forEach(timeout -> {
                    final TimerTask task = timeout.task();
                    try {
                        task.run(timeout);
                    } catch (Exception e) {
                        //Ignore
                    }
                });
        }
    }
}
PowerMsg3 Local Server SDK Scheduler:  Task Scheduling and Management

原文地址: https://www.cveoy.top/t/topic/lH6m 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录