PowerMsg3本地服务器SDK调度器优化建议
PowerMsg3本地服务器SDK调度器优化建议
本文分析了PowerMsg3本地服务器SDK中Scheduler类的实现,并提出了优化建议,以提高调度器的效率和性能。
1. 使用单例模式
Scheduler类只需要一个实例,可以使用单例模式来避免不必要的资源浪费。可以将Scheduler类改为单例模式,并在需要使用调度器的地方获取该单例实例。
2. 使用线程池
当前实现中使用了线程工厂来创建线程,可以考虑使用线程池来重用线程,避免线程创建和销毁的开销。使用线程池可以提高线程利用率,降低资源消耗。
3. 使用定时任务队列
当前实现中使用了Netty的HashedWheelTimer来实现定时任务调度,可以考虑使用定时任务队列来替代,例如使用PriorityQueue来存储定时任务,并使用一个线程来扫描并执行任务。使用定时任务队列可以提高调度器的可扩展性和灵活性。
4. 考虑任务执行时间
当前实现中,定时任务的执行时间会影响下一个任务的执行时间,应该考虑任务执行时间对调度器的影响,并选择合适的调度算法。例如,可以使用优先级队列来存储任务,并根据任务的优先级和执行时间来调度任务。
5. 使用异步执行
当前实现中,定时任务的执行是同步的,可以考虑使用异步执行来提高任务并发度和性能。例如,可以使用Vert.x的异步执行机制来执行定时任务。
代码示例
import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Component
@Slf4j
public final class Scheduler {
public static final int MAX_TICKS_PER_WHEEL = 2048;
public static final int MINIMAL_TICK_IN_MILLIS = 1;
private final HashedWheelTimer hashedWheelTimer;
private final AtomicLong timerIdGen = new AtomicLong();
private final ConcurrentHashMap<Object, Timeout> timerKey2Timeouts = new ConcurrentHashMap<>();
@Autowired
public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
final int tickInMillis = Math.max(MINIMAL_TICK_IN_MILLIS, schedulerConfig.getTickDurationInMillis());
final int ticksPerWheel = Math.max(MAX_TICKS_PER_WHEEL, schedulerConfig.getTicksPerWheel());
this.hashedWheelTimer = new HashedWheelTimer(
new NamedThreadFactory('localserver-scheduler'),
tickInMillis, TimeUnit.MILLISECONDS,
ticksPerWheel
);
}
public boolean cancel(final Object timerKey) {
final Timeout timeout = timerKey2Timeouts.get(timerKey);
tryCancel(timeout);
return true;
}
public Long scheduleOnce(final Duration delay,
final Runnable runnable,
final Context context) {
final Long id = timerIdGen.incrementAndGet();
scheduleOnce(id, delay, runnable, context);
return id;
}
public void scheduleOnce(final Object timerKey,
final Duration delay,
final Runnable runnable,
final Context context) {
Objects.requireNonNull(timerKey, 'timerKey should not be null.');
Objects.requireNonNull(delay, 'delay should not be null.');
Objects.requireNonNull(runnable, 'runnable should not be null.');
Objects.requireNonNull(context, 'context should not be null.');
timerKey2Timeouts.compute(
timerKey,
(key, timeout) -> {
tryCancel(timeout);
return hashedWheelTimer.newTimeout(t -> {
context.runOnContext(event -> {
timerKey2Timeouts.remove(timerKey);
runnable.run();
});
}, delay.toNanos(), TimeUnit.NANOSECONDS);
}
);
}
public Long scheduleWithFixDelay(final Duration initialDelay,
final Duration delay,
final Runnable runnable,
final Context context) {
final Long id = timerIdGen.incrementAndGet();
scheduleWithFixDelay(id, initialDelay, delay, runnable, context);
return id;
}
public void scheduleWithFixDelay(final Object timerKey,
final Duration initialDelay,
final Duration delay,
final Runnable runnable,
final Context context) {
Objects.requireNonNull(timerKey, 'timerKey should not be null.');
Objects.requireNonNull(initialDelay, 'initialDelay should not be null.');
Objects.requireNonNull(delay, 'delay should not be null.');
Objects.requireNonNull(runnable, 'runnable should not be null.');
Objects.requireNonNull(context, 'context should not be null.');
final Runnable scheduledRunnable = new Runnable() {
final Runnable self = this;
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
log.error('Error when trigger task of runnable:[{}]', runnable, e);
} finally {
timerKey2Timeouts.compute(
timerKey,
(key, timeout) -> {
tryCancel(timeout);
return scheduleWithTimerKey(delay, self, context);
});
}
}
};
timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
tryCancel(timeout);
return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
});
}
private boolean tryCancel(final Timeout timeout) {
if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
return true;
}
return timeout.cancel();
}
private Timeout scheduleWithTimerKey(final Duration delay,
final Runnable runnable,
final Context context) {
return hashedWheelTimer.newTimeout(
t -> context.runOnContext(event -> runnable.run()),
delay.toNanos(),
TimeUnit.NANOSECONDS);
}
@PostConstruct
protected void init() {
hashedWheelTimer.start();
}
@PreDestroy
protected void stop() {
final Set<Timeout> timeouts = hashedWheelTimer.stop();
if (CollectionUtils.isNotEmpty(timeouts)) {
timeouts.stream()
.filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
.forEach(timeout -> {
final TimerTask task = timeout.task();
try {
task.run(timeout);
} catch (Exception e) {
//Ignore
}
});
}
}
}
总结
本文介绍了PowerMsg3本地服务器SDK调度器的一些优化建议,可以参考这些建议对调度器进行优化,提高调度器的性能和效率。
原文地址: https://www.cveoy.top/t/topic/lH7x 著作权归作者所有。请勿转载和采集!