优化 PowerMsg3 SDK LocalServer Scheduler - 使用 ScheduledTimerTask 类

本文介绍如何优化 PowerMsg3 SDK LocalServer 中的 Scheduler 类,通过提取单独的 ScheduledTimerTask 类,使代码更加清晰易读,同时提升性能。

原始代码:

package com.taobao.powermsg3.sdk.localserver.scheduler;

import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


@Component
@Slf4j
public final class Scheduler {
    private final HashedWheelTimer hashedWheelTimer;

    private final AtomicLong timerIdGen = new AtomicLong();

    private final ConcurrentHashMap<Object, Timeout> timerKey2Timeouts = new ConcurrentHashMap<>();

    @Autowired
    public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
        final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
        final int tickInMillis = Math.max(1, schedulerConfig.getTickDurationInMillis());
        final int ticksPerWheel = Math.max(512, schedulerConfig.getTicksPerWheel());
        this.hashedWheelTimer = new HashedWheelTimer(
            new NamedThreadFactory("localserver-scheduler"),
            tickInMillis, TimeUnit.MILLISECONDS,
            ticksPerWheel
        );
    }

    public boolean cancel(final Object timerKey) {
        final Timeout timeout = timerKey2Timeouts.get(timerKey);
        tryCancel(timeout);
        return true;
    }

    public Long scheduleOnce(final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        final Long id = timerIdGen.incrementAndGet();
        scheduleOnce(id, delay, runnable, context);
        return id;
    }

    public void scheduleOnce(final Object timerKey,
                             final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");
        timerKey2Timeouts.compute(
            timerKey,
            (key, timeout) -> {
                tryCancel(timeout);
                return hashedWheelTimer.newTimeout(t -> {
                    context.runOnContext(event -> {
                        timerKey2Timeouts.remove(timerKey);
                        runnable.run();
                    });
                }, delay.toNanos(), TimeUnit.NANOSECONDS);
            }
        );
    }

    public Long scheduleWithFixDelay(final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        final Long id = timerIdGen.incrementAndGet();
        scheduleWithFixDelay(id, initialDelay, delay, runnable, context);
        return id;
    }

    public void scheduleWithFixDelay(final Object timerKey,
                                     final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        Objects.requireNonNull(initialDelay, "initialDelay should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");
        final Runnable scheduledRunnable = new Runnable() {
            final Runnable self = this;

            @Override
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    log.error("Error when trigger task of runnable:[{}]", runnable, e);
                } finally {
                    timerKey2Timeouts.compute(
                        timerKey,
                        (key, timeout) -> {
                            tryCancel(timeout);
                            return scheduleWithTimerKey(delay, self, context);
                        });
                }
            }
        };

        timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
            tryCancel(timeout);
            return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
        });
    }

    private boolean tryCancel(final Timeout timeout) {
        if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
            return true;
        }
        return timeout.cancel();
    }

    private Timeout scheduleWithTimerKey(final Duration delay,
                                         final Runnable runnable,
                                         final Context context) {
        return hashedWheelTimer.newTimeout(
            t -> context.runOnContext(event -> runnable.run()),
            delay.toNanos(),
            TimeUnit.NANOSECONDS);
    }


    @PostConstruct
    protected void init() {
        hashedWheelTimer.start();
    }

    @PreDestroy
    protected void stop() {
        final Set<Timeout> timeouts = hashedWheelTimer.stop();
        if (CollectionUtils.isNotEmpty(timeouts)) {
            timeouts.stream()
                .filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
                .forEach(timeout -> {
                    final TimerTask task = timeout.task();
                    try {
                        task.run(timeout);
                    } catch (Exception e) {
                        //Ignore
                    }
                });
        }
    }
}

优化后的代码:

package com.taobao.powermsg3.sdk.localserver.scheduler;

import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@Component
@Slf4j
public final class Scheduler {
    private final HashedWheelTimer hashedWheelTimer;

    private final AtomicLong timerIdGen = new AtomicLong();

    @Autowired
    public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
        final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
        final int tickInMillis = Math.max(1, schedulerConfig.getTickDurationInMillis());
        final int ticksPerWheel = Math.max(512, schedulerConfig.getTicksPerWheel());
        this.hashedWheelTimer = new HashedWheelTimer(
            new NamedThreadFactory("localserver-scheduler"),
            tickInMillis, TimeUnit.MILLISECONDS,
            ticksPerWheel
        );
    }

    public boolean cancel(final Object timerKey) {
        final ScheduledTimerTask task = timerKey2Timeouts.get(timerKey);
        if (task != null) {
            return task.cancel();
        }
        return true;
    }

    public Long scheduleOnce(final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        final Long id = timerIdGen.incrementAndGet();
        scheduleOnce(id, delay, runnable, context);
        return id;
    }

    public void scheduleOnce(final Object timerKey,
                             final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        Objects.requireNonNull(timerKey, 'timerKey should not be null.');
        Objects.requireNonNull(delay, 'delay should not be null.');
        Objects.requireNonNull(runnable, 'runnable should not be null.');
        Objects.requireNonNull(context, 'context should not be null.');
        timerKey2Timeouts.compute(
            timerKey,
            (key, oldTask) -> {
                if (oldTask != null) {
                    oldTask.cancel();
                }
                return new ScheduledTimerTask(hashedWheelTimer, delay, runnable, context, timerKey);
            }
        );
    }

    public Long scheduleWithFixDelay(final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        final Long id = timerIdGen.incrementAndGet();
        scheduleWithFixDelay(id, initialDelay, delay, runnable, context);
        return id;
    }

    public void scheduleWithFixDelay(final Object timerKey,
                                     final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        Objects.requireNonNull(timerKey, 'timerKey should not be null.');
        Objects.requireNonNull(initialDelay, 'initialDelay should not be null.');
        Objects.requireNonNull(delay, 'delay should not be null.');
        Objects.requireNonNull(runnable, 'runnable should not be null.');
        Objects.requireNonNull(context, 'context should not be null.');
        timerKey2Timeouts.compute(
            timerKey,
            (key, oldTask) -> {
                if (oldTask != null) {
                    oldTask.cancel();
                }
                return new ScheduledTimerTask(hashedWheelTimer, initialDelay, delay, runnable, context, timerKey);
            }
        );
    }

    @PostConstruct
    protected void init() {
        hashedWheelTimer.start();
    }

    @PreDestroy
    protected void stop() {
        final Set<Timeout> timeouts = hashedWheelTimer.stop();
        if (CollectionUtils.isNotEmpty(timeouts)) {
            timeouts.stream()
                .filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
                .forEach(timeout -> {
                    final TimerTask task = timeout.task();
                    try {
                        task.run(timeout);
                    } catch (Exception e) {
                        //Ignore
                    }
                });
        }
    }

    private static final ConcurrentHashMap<Object, ScheduledTimerTask> timerKey2Timeouts = new ConcurrentHashMap<>();
}

class ScheduledTimerTask implements TimerTask {
    private final HashedWheelTimer hashedWheelTimer;
    private final Duration delay;
    private final Duration fixDelay;
    private final Runnable runnable;
    private final Context context;
    private final Object timerKey;
    private Timeout timeout;

    public ScheduledTimerTask(HashedWheelTimer hashedWheelTimer, Duration delay, Runnable runnable, Context context, Object timerKey) {
        this.hashedWheelTimer = hashedWheelTimer;
        this.delay = delay;
        this.runnable = runnable;
        this.context = context;
        this.timerKey = timerKey;
    }

    public ScheduledTimerTask(HashedWheelTimer hashedWheelTimer, Duration initialDelay, Duration delay, Runnable runnable, Context context, Object timerKey) {
        this.hashedWheelTimer = hashedWheelTimer;
        this.delay = delay;
        this.fixDelay = delay;
        this.runnable = runnable;
        this.context = context;
        this.timerKey = timerKey;
        schedule();
    }

    private void schedule() {
        timeout = hashedWheelTimer.newTimeout(this, delay.toNanos(), TimeUnit.NANOSECONDS);
    }

    @Override
    public void run(Timeout timeout) {
        context.runOnContext(event -> {
            try {
                runnable.run();
            } catch (Exception e) {
                log.error('Error when trigger task of runnable:[{}]', runnable, e);
            } finally {
                if (fixDelay != null) {
                    schedule();
                }
                Scheduler.timerKey2Timeouts.remove(timerKey);
            }
        });
    }

    public boolean cancel() {
        if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
            return true;
        }
        return timeout.cancel();
    }
}

优化点:

  1. **提取 ScheduledTimerTask 类:**将任务相关的逻辑封装到 ScheduledTimerTask 类中,使 Scheduler 类更加简洁,职责更加明确。
  2. 实现 TimerTask 接口:ScheduledTimerTask 类实现 TimerTask 接口,可以直接调用 task() 方法执行任务,避免了通过 Runnable 间接调用的方式。
  3. **移动 tryCancel() 方法:**将 tryCancel() 方法移动到 ScheduledTimerTask 类中,使其成为该类的方法,更符合面向对象的设计原则。
  4. **移动 timerKey2Timeouts 属性:**将 timerKey2Timeouts 属性移动到 ScheduledTimerTask 类中,避免在 Scheduler 类中频繁地进行操作,提高代码性能。

总结:

通过提取 ScheduledTimerTask 类,将任务相关的逻辑进行封装,不仅使代码更加清晰易读,而且提高了代码的性能。

注意:

上述代码只是示例代码,实际使用中可能需要根据具体情况进行调整。

PowerMsg3 SDK LocalServer Scheduler 优化 - 使用 ScheduledTimerTask 类

原文地址: https://www.cveoy.top/t/topic/lH69 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录