package com.taobao.powermsg3.sdk.localserver.scheduler;

import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


@Component
@Slf4j
public final class Scheduler {
    private final HashedWheelTimer hashedWheelTimer;

    private final AtomicLong timerIdGen = new AtomicLong();

    private final ConcurrentHashMap<Long, Timeout> timerIdToTimeout = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Object, Timeout> timerKeyToTimeouts = new ConcurrentHashMap<>();

    @Autowired
    public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
        final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
        final int tickInMillis = Math.max(1, schedulerConfig.getTickDurationInMillis());
        final int ticksPerWheel = Math.max(512, schedulerConfig.getTicksPerWheel());
        this.hashedWheelTimer = new HashedWheelTimer(
            new NamedThreadFactory("localserver-scheduler"),
            tickInMillis, TimeUnit.MILLISECONDS,
            ticksPerWheel
        );
    }

    /**
     * Cancels the scheduled task associated with the given timerId.
     *
     * @param timerId The timerId of the task to be cancelled.
     * @return True if the task is successfully cancelled, false otherwise.
     */
    public boolean cancel(final Long timerId) {
        Objects.requireNonNull(timerId, "timerId should not be null.");
        final Timeout timeout = timerIdToTimeout.get(timerId);
        return tryCancel(timeout);
    }

    /**
     * Cancels the scheduled task associated with the given timerKey.
     *
     * @param timerKey The timerKey of the task to be cancelled.
     * @return True if the task is successfully cancelled, false otherwise.
     */
    public boolean cancel(final Object timerKey) {
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        final Timeout timeout = timerKeyToTimeouts.get(timerKey);
        return tryCancel(timeout);
    }

    /**
     * Schedules a task to be executed once after the specified delay.
     *
     * @param delay The delay before the task is executed.
     * @param runnable The task to be executed.
     * @param context The Vert.x context to execute the task in.
     * @return The timerId of the scheduled task.
     */
    public Long scheduleOnce(final Duration delay, final Runnable runnable, final Context context) {
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");
        final Long timerId = timerIdGen.incrementAndGet();
        timerIdToTimeout.compute(timerId, (key, timeout) ->
            hashedWheelTimer.newTimeout(t -> {
                context.runOnContext(event -> {
                    timerIdToTimeout.remove(timerId);
                    runnable.run();
                });
            }, delay.toNanos(), TimeUnit.NANOSECONDS));
        return timerId;
    }

    /**
     * Schedules a task to be executed once after the specified delay, associated with a timerKey.
     *
     * @param timerKey The timerKey to associate with the task.
     * @param delay The delay before the task is executed.
     * @param runnable The task to be executed.
     * @param context The Vert.x context to execute the task in.
     */
    public void scheduleOnce(final Object timerKey, final Duration delay, final Runnable runnable, final Context context) {
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");
        timerKeyToTimeouts.compute(
            timerKey,
            (key, timeout) -> {
                tryCancel(timeout);
                return hashedWheelTimer.newTimeout(t -> {
                    context.runOnContext(event -> {
                        timerKeyToTimeouts.remove(timerKey);
                        runnable.run();
                    });
                }, delay.toNanos(), TimeUnit.NANOSECONDS);
            }
        );
    }

    /**
     * Schedules a task to be executed repeatedly with a fixed delay between executions.
     *
     * @param initialDelay The delay before the first execution.
     * @param delay The delay between subsequent executions.
     * @param runnable The task to be executed.
     * @param context The Vert.x context to execute the task in.
     * @return The timerId of the scheduled task.
     */
    public Long scheduleWithFixDelay(final Duration initialDelay, final Duration delay, final Runnable runnable, final Context context) {
        Objects.requireNonNull(initialDelay, "initialDelay should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");
        final Long timerId = timerIdGen.incrementAndGet();
        final Runnable scheduledRunnable = new Runnable() {
            final Runnable self = this;

            @Override
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    log.error("Error when trigger task of runnable:[{}]", runnable, e);
                } finally {
                    timerIdToTimeout.compute(timerId, (key, timeout) -> {
                        tryCancel(timeout);
                        return scheduleWithTimer(delay, self, context);
                    });
                }
            }
        };
        timerIdToTimeout.compute(timerId, (key, timeout) ->
            scheduleWithTimer(initialDelay, scheduledRunnable, context));
        return timerId;
    }

    /**
     * Schedules a task to be executed repeatedly with a fixed delay between executions, associated with a timerKey.
     *
     * @param timerKey The timerKey to associate with the task.
     * @param initialDelay The delay before the first execution.
     * @param delay The delay between subsequent executions.
     * @param runnable The task to be executed.
     * @param context The Vert.x context to execute the task in.
     */
    public void scheduleWithFixDelay(final Object timerKey, final Duration initialDelay, final Duration delay, final Runnable runnable, final Context context) {
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        Objects.requireNonNull(initialDelay, "initialDelay should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");
        final Runnable scheduledRunnable = new Runnable() {
            final Runnable self = this;

            @Override
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    log.error("Error when trigger task of runnable:[{}]", runnable, e);
                } finally {
                    timerKeyToTimeouts.compute(
                        timerKey,
                        (key, timeout) -> {
                            tryCancel(timeout);
                            return scheduleWithTimer(delay, self, context);
                        });
                }
            }
        };

        timerKeyToTimeouts.compute(timerKey, (key, timeout) -> {
            tryCancel(timeout);
            return scheduleWithTimer(initialDelay, scheduledRunnable, context);
        });
    }

    /**
     * Attempts to cancel the given timeout.
     *
     * @param timeout The timeout to cancel.
     * @return True if the timeout is successfully cancelled, false otherwise.
     */
    private boolean tryCancel(final Timeout timeout) {
        return timeout == null || timeout.isCancelled() || timeout.isExpired() || timeout.cancel();
    }

    /**
     * Schedules a new task with the specified delay and runnable, using the provided context.
     *
     * @param delay The delay before the task is executed.
     * @param runnable The task to be executed.
     * @param context The Vert.x context to execute the task in.
     * @return The newly created Timeout object.
     */
    private Timeout scheduleWithTimer(final Duration delay, final Runnable runnable, final Context context) {
        return hashedWheelTimer.newTimeout(
            t -> context.runOnContext(runnable::run),
            delay.toNanos(),
            TimeUnit.NANOSECONDS);
    }

    @PostConstruct
    protected void init() {
        hashedWheelTimer.start();
    }

    /**
     * Stops the scheduler and executes any pending tasks.
     */
    @PreDestroy
    protected void stop() {
        final Set<Timeout> timeouts = hashedWheelTimer.stop();
        if (CollectionUtils.isNotEmpty(timeouts)) {
            timeouts.stream()
                .filter(timeout -> timeout.isCancelled() || timeout.isExpired())
                .forEach(Timeout::task::run);
        }
    }
}
PowerMsg3 Local Server SDK Scheduler: A Comprehensive Guide

原文地址: https://www.cveoy.top/t/topic/lH5N 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录