PowerMsg3 SDK LocalServer Scheduler 优化 - 使用 ScheduledTimerTask 类
优化 PowerMsg3 SDK LocalServer Scheduler - 使用 ScheduledTimerTask 类
本文介绍如何优化 PowerMsg3 SDK LocalServer 中的 Scheduler 类,通过提取单独的 ScheduledTimerTask 类,使代码更加清晰易读,同时提升性能。
原始代码:
package com.taobao.powermsg3.sdk.localserver.scheduler;
import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Component
@Slf4j
public final class Scheduler {
private final HashedWheelTimer hashedWheelTimer;
private final AtomicLong timerIdGen = new AtomicLong();
private final ConcurrentHashMap<Object, Timeout> timerKey2Timeouts = new ConcurrentHashMap<>();
@Autowired
public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
final int tickInMillis = Math.max(1, schedulerConfig.getTickDurationInMillis());
final int ticksPerWheel = Math.max(512, schedulerConfig.getTicksPerWheel());
this.hashedWheelTimer = new HashedWheelTimer(
new NamedThreadFactory("localserver-scheduler"),
tickInMillis, TimeUnit.MILLISECONDS,
ticksPerWheel
);
}
public boolean cancel(final Object timerKey) {
final Timeout timeout = timerKey2Timeouts.get(timerKey);
tryCancel(timeout);
return true;
}
public Long scheduleOnce(final Duration delay,
final Runnable runnable,
final Context context) {
final Long id = timerIdGen.incrementAndGet();
scheduleOnce(id, delay, runnable, context);
return id;
}
public void scheduleOnce(final Object timerKey,
final Duration delay,
final Runnable runnable,
final Context context) {
Objects.requireNonNull(timerKey, "timerKey should not be null.");
Objects.requireNonNull(delay, "delay should not be null.");
Objects.requireNonNull(runnable, "runnable should not be null.");
Objects.requireNonNull(context, "context should not be null.");
timerKey2Timeouts.compute(
timerKey,
(key, timeout) -> {
tryCancel(timeout);
return hashedWheelTimer.newTimeout(t -> {
context.runOnContext(event -> {
timerKey2Timeouts.remove(timerKey);
runnable.run();
});
}, delay.toNanos(), TimeUnit.NANOSECONDS);
}
);
}
public Long scheduleWithFixDelay(final Duration initialDelay,
final Duration delay,
final Runnable runnable,
final Context context) {
final Long id = timerIdGen.incrementAndGet();
scheduleWithFixDelay(id, initialDelay, delay, runnable, context);
return id;
}
public void scheduleWithFixDelay(final Object timerKey,
final Duration initialDelay,
final Duration delay,
final Runnable runnable,
final Context context) {
Objects.requireNonNull(timerKey, "timerKey should not be null.");
Objects.requireNonNull(initialDelay, "initialDelay should not be null.");
Objects.requireNonNull(delay, "delay should not be null.");
Objects.requireNonNull(runnable, "runnable should not be null.");
Objects.requireNonNull(context, "context should not be null.");
final Runnable scheduledRunnable = new Runnable() {
final Runnable self = this;
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
log.error("Error when trigger task of runnable:[{}]", runnable, e);
} finally {
timerKey2Timeouts.compute(
timerKey,
(key, timeout) -> {
tryCancel(timeout);
return scheduleWithTimerKey(delay, self, context);
});
}
}
};
timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
tryCancel(timeout);
return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
});
}
private boolean tryCancel(final Timeout timeout) {
if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
return true;
}
return timeout.cancel();
}
private Timeout scheduleWithTimerKey(final Duration delay,
final Runnable runnable,
final Context context) {
return hashedWheelTimer.newTimeout(
t -> context.runOnContext(event -> runnable.run()),
delay.toNanos(),
TimeUnit.NANOSECONDS);
}
@PostConstruct
protected void init() {
hashedWheelTimer.start();
}
@PreDestroy
protected void stop() {
final Set<Timeout> timeouts = hashedWheelTimer.stop();
if (CollectionUtils.isNotEmpty(timeouts)) {
timeouts.stream()
.filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
.forEach(timeout -> {
final TimerTask task = timeout.task();
try {
task.run(timeout);
} catch (Exception e) {
//Ignore
}
});
}
}
}
优化后的代码:
package com.taobao.powermsg3.sdk.localserver.scheduler;
import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Component
@Slf4j
public final class Scheduler {
private final HashedWheelTimer hashedWheelTimer;
private final AtomicLong timerIdGen = new AtomicLong();
@Autowired
public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
final int tickInMillis = Math.max(1, schedulerConfig.getTickDurationInMillis());
final int ticksPerWheel = Math.max(512, schedulerConfig.getTicksPerWheel());
this.hashedWheelTimer = new HashedWheelTimer(
new NamedThreadFactory("localserver-scheduler"),
tickInMillis, TimeUnit.MILLISECONDS,
ticksPerWheel
);
}
public boolean cancel(final Object timerKey) {
final ScheduledTimerTask task = timerKey2Timeouts.get(timerKey);
if (task != null) {
return task.cancel();
}
return true;
}
public Long scheduleOnce(final Duration delay,
final Runnable runnable,
final Context context) {
final Long id = timerIdGen.incrementAndGet();
scheduleOnce(id, delay, runnable, context);
return id;
}
public void scheduleOnce(final Object timerKey,
final Duration delay,
final Runnable runnable,
final Context context) {
Objects.requireNonNull(timerKey, 'timerKey should not be null.');
Objects.requireNonNull(delay, 'delay should not be null.');
Objects.requireNonNull(runnable, 'runnable should not be null.');
Objects.requireNonNull(context, 'context should not be null.');
timerKey2Timeouts.compute(
timerKey,
(key, oldTask) -> {
if (oldTask != null) {
oldTask.cancel();
}
return new ScheduledTimerTask(hashedWheelTimer, delay, runnable, context, timerKey);
}
);
}
public Long scheduleWithFixDelay(final Duration initialDelay,
final Duration delay,
final Runnable runnable,
final Context context) {
final Long id = timerIdGen.incrementAndGet();
scheduleWithFixDelay(id, initialDelay, delay, runnable, context);
return id;
}
public void scheduleWithFixDelay(final Object timerKey,
final Duration initialDelay,
final Duration delay,
final Runnable runnable,
final Context context) {
Objects.requireNonNull(timerKey, 'timerKey should not be null.');
Objects.requireNonNull(initialDelay, 'initialDelay should not be null.');
Objects.requireNonNull(delay, 'delay should not be null.');
Objects.requireNonNull(runnable, 'runnable should not be null.');
Objects.requireNonNull(context, 'context should not be null.');
timerKey2Timeouts.compute(
timerKey,
(key, oldTask) -> {
if (oldTask != null) {
oldTask.cancel();
}
return new ScheduledTimerTask(hashedWheelTimer, initialDelay, delay, runnable, context, timerKey);
}
);
}
@PostConstruct
protected void init() {
hashedWheelTimer.start();
}
@PreDestroy
protected void stop() {
final Set<Timeout> timeouts = hashedWheelTimer.stop();
if (CollectionUtils.isNotEmpty(timeouts)) {
timeouts.stream()
.filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
.forEach(timeout -> {
final TimerTask task = timeout.task();
try {
task.run(timeout);
} catch (Exception e) {
//Ignore
}
});
}
}
private static final ConcurrentHashMap<Object, ScheduledTimerTask> timerKey2Timeouts = new ConcurrentHashMap<>();
}
class ScheduledTimerTask implements TimerTask {
private final HashedWheelTimer hashedWheelTimer;
private final Duration delay;
private final Duration fixDelay;
private final Runnable runnable;
private final Context context;
private final Object timerKey;
private Timeout timeout;
public ScheduledTimerTask(HashedWheelTimer hashedWheelTimer, Duration delay, Runnable runnable, Context context, Object timerKey) {
this.hashedWheelTimer = hashedWheelTimer;
this.delay = delay;
this.runnable = runnable;
this.context = context;
this.timerKey = timerKey;
}
public ScheduledTimerTask(HashedWheelTimer hashedWheelTimer, Duration initialDelay, Duration delay, Runnable runnable, Context context, Object timerKey) {
this.hashedWheelTimer = hashedWheelTimer;
this.delay = delay;
this.fixDelay = delay;
this.runnable = runnable;
this.context = context;
this.timerKey = timerKey;
schedule();
}
private void schedule() {
timeout = hashedWheelTimer.newTimeout(this, delay.toNanos(), TimeUnit.NANOSECONDS);
}
@Override
public void run(Timeout timeout) {
context.runOnContext(event -> {
try {
runnable.run();
} catch (Exception e) {
log.error('Error when trigger task of runnable:[{}]', runnable, e);
} finally {
if (fixDelay != null) {
schedule();
}
Scheduler.timerKey2Timeouts.remove(timerKey);
}
});
}
public boolean cancel() {
if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
return true;
}
return timeout.cancel();
}
}
优化点:
- **提取
ScheduledTimerTask类:**将任务相关的逻辑封装到ScheduledTimerTask类中,使Scheduler类更加简洁,职责更加明确。 - 实现
TimerTask接口:ScheduledTimerTask类实现TimerTask接口,可以直接调用task()方法执行任务,避免了通过Runnable间接调用的方式。 - **移动
tryCancel()方法:**将tryCancel()方法移动到ScheduledTimerTask类中,使其成为该类的方法,更符合面向对象的设计原则。 - **移动
timerKey2Timeouts属性:**将timerKey2Timeouts属性移动到ScheduledTimerTask类中,避免在Scheduler类中频繁地进行操作,提高代码性能。
总结:
通过提取 ScheduledTimerTask 类,将任务相关的逻辑进行封装,不仅使代码更加清晰易读,而且提高了代码的性能。
注意:
上述代码只是示例代码,实际使用中可能需要根据具体情况进行调整。
原文地址: https://www.cveoy.top/t/topic/lH69 著作权归作者所有。请勿转载和采集!