PowerMsg3 Local Server SDK Scheduler - 高性能定时任务管理
import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 定时任务调度器,用于管理和执行定时任务。
* 该调度器基于Netty的HashedWheelTimer实现,提供高效可靠的定时任务执行功能。
*/
@Component
@Slf4j
public final class Scheduler {
// 最大轮次大小
public static final int MAX_TICKS_PER_WHEEL = 2048;
// 最小时间间隔(毫秒)
public static final int MINIMAL_TICK_IN_MILLIS = 1;
// 定时器
private final HashedWheelTimer hashedWheelTimer;
// 任务ID生成器
private final AtomicLong timerIdGen = new AtomicLong();
// 任务ID到Timeout的映射
private final ConcurrentHashMap<Object, Timeout> timerKey2Timeouts = new ConcurrentHashMap<>();
@Autowired
public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
// 获取配置信息
final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
// 设置时间间隔,至少为1毫秒
final int tickInMillis = Math.max(MINIMAL_TICK_IN_MILLIS, schedulerConfig.getTickDurationInMillis());
// 设置轮次大小,至少为MAX_TICKS_PER_WHEEL
final int ticksPerWheel = Math.max(MAX_TICKS_PER_WHEEL, schedulerConfig.getTicksPerWheel());
// 初始化HashedWheelTimer
this.hashedWheelTimer = new HashedWheelTimer(
new NamedThreadFactory("localserver-scheduler"),
tickInMillis, TimeUnit.MILLISECONDS,
ticksPerWheel
);
}
/**
* 取消任务
* @param timerKey 任务ID
* @return 是否取消成功
*/
public boolean cancel(final Object timerKey) {
final Timeout timeout = timerKey2Timeouts.get(timerKey);
return tryCancel(timeout);
}
/**
* 延迟执行一次性任务
* @param delay 延迟时间
* @param runnable 任务执行体
* @param context 上下文
* @return 任务ID
*/
public Long scheduleOnce(final Duration delay, final Runnable runnable, final Context context) {
return scheduleOnce(timerIdGen.incrementAndGet(), delay, runnable, context);
}
/**
* 延迟执行一次性任务
* @param timerKey 任务ID
* @param delay 延迟时间
* @param runnable 任务执行体
* @param context 上下文
*/
public void scheduleOnce(final Object timerKey, final Duration delay, final Runnable runnable, final Context context) {
// 参数校验
Objects.requireNonNull(timerKey, "timerKey should not be null.");
Objects.requireNonNull(delay, "delay should not be null.");
Objects.requireNonNull(runnable, "runnable should not be null.");
Objects.requireNonNull(context, "context should not be null.");
// 计算任务并放入定时器
timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
tryCancel(timeout);
return hashedWheelTimer.newTimeout(t -> {
context.runOnContext(event -> {
// 执行任务并从定时器中移除
timerKey2Timeouts.remove(timerKey);
runnable.run();
});
}, delay.toNanos(), TimeUnit.NANOSECONDS);
});
}
/**
* 延迟执行固定间隔任务
* @param initialDelay 初始延迟时间
* @param delay 固定间隔时间
* @param runnable 任务执行体
* @param context 上下文
* @return 任务ID
*/
public Long scheduleWithFixDelay(final Duration initialDelay, final Duration delay, final Runnable runnable, final Context context) {
return scheduleWithFixDelay(timerIdGen.incrementAndGet(), initialDelay, delay, runnable, context);
}
/**
* 延迟执行固定间隔任务
* @param timerKey 任务ID
* @param initialDelay 初始延迟时间
* @param delay 固定间隔时间
* @param runnable 任务执行体
* @param context 上下文
*/
public void scheduleWithFixDelay(final Object timerKey, final Duration initialDelay, final Duration delay, final Runnable runnable, final Context context) {
// 参数校验
Objects.requireNonNull(timerKey, "timerKey should not be null.");
Objects.requireNonNull(initialDelay, "initialDelay should not be null.");
Objects.requireNonNull(delay, "delay should not be null.");
Objects.requireNonNull(runnable, "runnable should not be null.");
Objects.requireNonNull(context, "context should not be null.");
// 创建任务执行体,实现固定间隔的自动执行
final Runnable scheduledRunnable = new Runnable() {
final Runnable self = this;
@Override
public void run() {
try {
// 执行任务
runnable.run();
} catch (Exception e) {
// 记录错误日志
log.error("Error when trigger task of runnable:[{}]", runnable, e);
} finally {
// 重新调度任务
timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
tryCancel(timeout);
return scheduleWithTimerKey(delay, self, context);
});
}
}
};
// 计算任务并放入定时器
timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
tryCancel(timeout);
return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
});
}
/**
* 尝试取消任务
* @param timeout 任务的Timeout对象
* @return 是否取消成功
*/
private boolean tryCancel(final Timeout timeout) {
if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
return true;
}
return timeout.cancel();
}
/**
* 使用任务ID创建定时任务
* @param delay 延迟时间
* @param runnable 任务执行体
* @param context 上下文
* @return 任务的Timeout对象
*/
private Timeout scheduleWithTimerKey(final Duration delay, final Runnable runnable, final Context context) {
return hashedWheelTimer.newTimeout(t -> context.runOnContext(event -> runnable.run()), delay.toNanos(), TimeUnit.NANOSECONDS);
}
/**
* 初始化定时器
*/
@PostConstruct
protected void init() {
hashedWheelTimer.start();
}
/**
* 停止定时器
*/
@PreDestroy
protected void stop() {
final Set<Timeout> timeouts = hashedWheelTimer.stop();
if (CollectionUtils.isNotEmpty(timeouts)) {
timeouts.stream()
.filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
.forEach(timeout -> {
final TimerTask task = timeout.task();
try {
// 执行任务
task.run(timeout);
} catch (Exception e) {
// 忽略异常
}
});
}
}
}
原文地址: https://www.cveoy.top/t/topic/lH7I 著作权归作者所有。请勿转载和采集!