PowerMsg3 Local Server SDK Scheduler 性能优化建议

该类用于在多线程帧同步服务器中执行定时任务,以下是一些性能优化建议:

  1. 线程池优化

    • 可以考虑使用线程池来执行定时任务的 Runnable,避免每次都创建新线程,提高性能。
    • 同时,也可以避免创建过多线程而导致的资源浪费和线程上下文切换的开销。
  2. 批量取消定时任务

    • 可以考虑添加一个批量取消定时任务的方法,避免每次取消定时任务都需要遍历 ConcurrentHashMap,提高性能。
  3. 避免使用 ConcurrentHashMap

    • ConcurrentHashMap 在高并发场景下会有性能问题,可以考虑使用其它的数据结构或自行实现一个线程安全的 Map 来替换。
    • 例如,可以使用 java.util.concurrent.ConcurrentSkipListMap,它在高并发场景下比 ConcurrentHashMap 性能更高。
  4. 调整时间轮参数

    • 可以根据实际情况调整时间轮的 tickInMillisticksPerWheel 参数,避免资源浪费和性能问题。
    • 如果定时任务的间隔时间较短,可以将 tickInMillis 设置为更小的值,以便更精确地控制定时任务的执行时间。
    • 如果定时任务的间隔时间较长,可以将 ticksPerWheel 设置为更大的值,以便减少时间轮的轮转次数,提高性能。
  5. 缓存 Timeout 对象

    • 可以考虑缓存 Timeout 对象,避免每次都创建新的 Timeout 对象,提高性能。
    • 可以使用一个线程安全的 Map 来缓存 Timeout 对象,例如 ConcurrentHashMapConcurrentSkipListMap
  6. 尽可能减少锁的使用

    • 线程池、ConcurrentHashMap 等都存在锁的使用,可以尽可能减少锁的使用,避免性能问题。
    • 例如,可以使用线程局部变量来避免在共享变量上加锁。

代码示例:

import com.taobao.hsf.NamedThreadFactory;
import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.vertx.core.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@Component
@Slf4j
public final class Scheduler {
    public static final int MAX_TICKS_PER_WHEEL = 2048;
    public static final int MINIMAL_TICK_IN_MILLIS = 1;
    private final HashedWheelTimer hashedWheelTimer;

    private final AtomicLong timerIdGen = new AtomicLong();

    // 使用 ConcurrentSkipListMap 替换 ConcurrentHashMap
    private final ConcurrentSkipListMap<Object, Timeout> timerKey2Timeouts = new ConcurrentSkipListMap<>();

    @Autowired
    public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {
        final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();
        final int tickInMillis = Math.max(MINIMAL_TICK_IN_MILLIS, schedulerConfig.getTickDurationInMillis());
        final int ticksPerWheel = Math.max(MAX_TICKS_PER_WHEEL, schedulerConfig.getTicksPerWheel());
        this.hashedWheelTimer = new HashedWheelTimer(
            new NamedThreadFactory("localserver-scheduler"),
            tickInMillis, TimeUnit.MILLISECONDS,
            ticksPerWheel
        );
    }

    // 批量取消定时任务
    public void cancelAll() {
        timerKey2Timeouts.values().forEach(timeout -> timeout.cancel());
        timerKey2Timeouts.clear();
    }

    public boolean cancel(final Object timerKey) {
        final Timeout timeout = timerKey2Timeouts.get(timerKey);
        tryCancel(timeout);
        return true;
    }

    public Long scheduleOnce(final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        final Long id = timerIdGen.incrementAndGet();
        scheduleOnce(id, delay, runnable, context);
        return id;
    }

    public void scheduleOnce(final Object timerKey,
                             final Duration delay,
                             final Runnable runnable,
                             final Context context) {
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");
        timerKey2Timeouts.compute(
            timerKey,
            (key, timeout) -> {
                tryCancel(timeout);
                return hashedWheelTimer.newTimeout(t -> {
                    context.runOnContext(event -> {
                        timerKey2Timeouts.remove(timerKey);
                        runnable.run();
                    });
                }, delay.toNanos(), TimeUnit.NANOSECONDS);
            }
        );
    }

    public Long scheduleWithFixDelay(final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        final Long id = timerIdGen.incrementAndGet();
        scheduleWithFixDelay(id, initialDelay, delay, runnable, context);
        return id;
    }

    public void scheduleWithFixDelay(final Object timerKey,
                                     final Duration initialDelay,
                                     final Duration delay,
                                     final Runnable runnable,
                                     final Context context) {
        Objects.requireNonNull(timerKey, "timerKey should not be null.");
        Objects.requireNonNull(initialDelay, "initialDelay should not be null.");
        Objects.requireNonNull(delay, "delay should not be null.");
        Objects.requireNonNull(runnable, "runnable should not be null.");
        Objects.requireNonNull(context, "context should not be null.");
        final Runnable scheduledRunnable = new Runnable() {
            final Runnable self = this;

            @Override
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    log.error("Error when trigger task of runnable:[{}]", runnable, e);
                } finally {
                    timerKey2Timeouts.compute(
                        timerKey,
                        (key, timeout) -> {
                            tryCancel(timeout);
                            return scheduleWithTimerKey(delay, self, context);
                        });
                }
            }
        };

        timerKey2Timeouts.compute(timerKey, (key, timeout) -> {
            tryCancel(timeout);
            return scheduleWithTimerKey(initialDelay, scheduledRunnable, context);
        });
    }

    private boolean tryCancel(final Timeout timeout) {
        if (timeout == null || timeout.isCancelled() || timeout.isExpired()) {
            return true;
        }
        return timeout.cancel();
    }

    private Timeout scheduleWithTimerKey(final Duration delay,
                                         final Runnable runnable,
                                         final Context context) {
        return hashedWheelTimer.newTimeout(
            t -> context.runOnContext(event -> runnable.run()),
            delay.toNanos(),
            TimeUnit.NANOSECONDS);
    }

    @PostConstruct
    protected void init() {
        hashedWheelTimer.start();
    }

    @PreDestroy
    protected void stop() {
        final Set<Timeout> timeouts = hashedWheelTimer.stop();
        if (CollectionUtils.isNotEmpty(timeouts)) {
            timeouts.stream()
                .filter(timeout -> !timeout.isCancelled() && !timeout.isExpired())
                .forEach(timeout -> {
                    final TimerTask task = timeout.task();
                    try {
                        task.run(timeout);
                    } catch (Exception e) {
                        //Ignore
                    }
                });
        }
    }
}

注意:

  • 这些优化建议仅供参考,实际应用中需要根据具体情况进行选择和调整。
  • 优化性能需要权衡效率和复杂度,需要选择适合的方案。
  • 除了上述建议外,还可以通过使用性能分析工具来找出性能瓶颈,并针对性地进行优化。
PowerMsg3 Local Server SDK Scheduler 优化建议

原文地址: https://www.cveoy.top/t/topic/lH7C 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录