{ "title": "PowerMsg3 SDK Local Server Scheduler: A Comprehensive Guide", "description": "This document provides a comprehensive guide to the PowerMsg3 SDK Local Server Scheduler, detailing its functionalities, configurations, and best practices for its use.", "keywords": "PowerMsg3, SDK, Local Server, Scheduler, Timeout, HashedWheelTimer, Vertx, Spring, Java", "content": "package com.taobao.powermsg3.sdk.localserver.scheduler;

import com.taobao.hsf.NamedThreadFactory; import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.vertx.core.Context; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong;

@Component @Slf4j public final class Scheduler { private HashedWheelTimer hashedWheelTimer;

private AtomicLong timerIdGen = new AtomicLong();

private ConcurrentHashMap<Object, Timeout> timerKey2Timeouts = new ConcurrentHashMap<>();

private PmLocalServerSDKConfig pmLocalServerSDKConfig;

@Autowired
public Scheduler(PmLocalServerSDKConfig pmLocalServerSDKConfig) {
    this.pmLocalServerSDKConfig = pmLocalServerSDKConfig;
    final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
    final int tickInMillis = Math.max(1, schedulerConfig.getTickDurationInMillis());
    final int ticksPerWheel = Math.max(512, schedulerConfig.getTicksPerWheel());
    this.hashedWheelTimer = new HashedWheelTimer(
        new NamedThreadFactory('localserver-scheduler'),
        tickInMillis, TimeUnit.MILLISECONDS,
        ticksPerWheel
    );
}

public boolean cancel(Object timerKey) {
    Timeout timeout = timerKey2Timeouts.get(timerKey);
    return tryCancel(timeout);
}

public Long scheduleOnce(Duration delay,
                         Runnable runnable,
                         Context context) {
    Long id = timerIdGen.incrementAndGet();
    scheduleOnce(id, delay, runnable, context);
    return id;
}

public void scheduleOnce(Object timerKey,
                         Duration delay,
                         Runnable runnable,
                         Context context) {
    Objects.requireNonNull(timerKey, 'timerKey should not be null.');
    Objects.requireNonNull(delay, 'delay should not be null.');
    Objects.requireNonNull(runnable, 'runnable should not be null.');
    Objects.requireNonNull(context, 'context should not be null.');
    timerKey2Timeouts.computeIfPresent(
        timerKey,
        (key, timeout) -> {
            tryCancel(timeout);
            return hashedWheelTimer.newTimeout(t -> {
                context.runOnContext(event -> {
                    timerKey2Timeouts.remove(timerKey);
                    runnable.run();
                });
            }, delay.toNanos(), TimeUnit.NANOSECONDS);
        }
    );
    timerKey2Timeouts.computeIfAbsent(
        timerKey,
        (key) -> {
            return hashedWheelTimer.newTimeout(t -> {
                context.runOnContext(event -> {
                    timerKey2Timeouts.remove(timerKey);
                    runnable.run();
                });
            }, delay.toNanos(), TimeUnit.NANOSECONDS);
        }
    );
}

public Long scheduleWithFixDelay(Duration initialDelay,
                                 Duration delay,
                                 Runnable runnable,
                                 Context context) {
    Long id = timerIdGen.incrementAndGet();
    scheduleWithFixDelay(id, initialDelay, delay, runnable, context);
    return id;
}

public void scheduleWithFixDelay(Object timerKey,
                                 Duration initialDelay,
                                 Duration delay,
                                 Runnable runnable,
                                 Context context) {
    Objects.requireNonNull(timerKey, 'timerKey should not be null.');
    Objects.requireNonNull(initialDelay, 'initialDelay should not be null.');
    Objects.requireNonNull(delay, 'delay should not be null.');
    Objects.requireNonNull(runnable, 'runnable should not be null.');
    Objects.requireNonNull(context, 'context should not be null.');
    Runnable scheduledRunnable = () -> {
        try {
            runnable.run();
        } catch (Exception e) {
            log.error('Error when trigger task of runnable:[{}]', runnable, e);
        } finally {
            timerKey2Timeouts.computeIfPresent(
                timerKey,
                (key, timeout) -> {
                    tryCancel(timeout);
                    return scheduleWithTimerKey(delay, scheduledRunnable, context);
                }
            );
            timerKey2Timeouts.computeIfAbsent(
                timerKey,
                (key) -> {
                    return scheduleWithTimerKey(delay, scheduledRunnable, context);
                }
            );
        }
    };
    timerKey2Timeouts.computeIfPresent(
        timerKey,
        (key, timeout) -> {
            tryCancel(timeout);
            return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
        }
    );
    timerKey2Timeouts.computeIfAbsent(
        timerKey,
        (key) -> {
            return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
        }
    );
}

private boolean tryCancel(Timeout timeout) {
    return timeout != null && !timeout.isCancelled() && !timeout.isExpired() ? timeout.cancel() : true;
}

private Timeout scheduleWithTimerKey(Duration delay,
                                     Runnable runnable,
                                     Context context) {
    return hashedWheelTimer.newTimeout(
        t -> context.runOnContext(event -> runnable.run()),
        delay.toNanos(),
        TimeUnit.NANOSECONDS
    );
}

@PostConstruct
protected void init() {
    hashedWheelTimer.start();
}

@PreDestroy
protected void stop() {
    Set<Timeout> timeouts = hashedWheelTimer.stop();
    if (CollectionUtils.isNotEmpty(timeouts)) {
        timeouts.forEach(timeout -> {
            if (!timeout.isCancelled() && !timeout.isExpired()) {
                TimerTask task = timeout.task();
                try {
                    task.run(timeout);
                } catch (Exception e) {
                    //Ignore
                }
            }
        });
    }
}

}

PowerMsg3 SDK Local Server Scheduler: A Comprehensive Guide

原文地址: https://www.cveoy.top/t/topic/lH50 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录