高性能多线程帧同步服务器定时器优化方案

针对多线程帧同步服务器中的定时器 Scheduler 类,优化代码以提高性能和可维护性。

优化方案:

  1. ConcurrentHashMap 改为 ConcurrentMap 更加通用化。

    ConcurrentHashMap 替换为 ConcurrentMap 可以提升代码的泛用性,使其适用于更多的场景。

  2. 使用 Java 8 的 CompletableFuture 来实现异步任务,避免显式地使用线程。

    使用 CompletableFuture 可以更优雅地处理异步操作,提高代码的可读性和可维护性,并避免显式地创建和管理线程。

  3. 考虑使用 ScheduledExecutorService 来代替 HashedWheelTimer,因为 ScheduledExecutorService 更加稳定,且在大量任务的情况下性能更好。

    ScheduledExecutorService 是 Java 提供的标准线程池,功能强大且稳定,在处理大量任务时比 HashedWheelTimer 更加高效。

  4. 考虑将 scheduleOncescheduleWithFixDelay 合并为一个方法,使用参数来控制是否是固定延迟。

    合并方法可以简化代码结构,提高代码的可读性和可维护性。

  5. 考虑使用 Spring 的 @Async 注解来实现异步任务,避免手动创建线程池。

    使用 Spring 的 @Async 注解可以更方便地实现异步任务,避免手动创建和管理线程池。

  6. log.error 改为 log.warn,因为这里的异常可能是由于定时器因为某些原因被取消,而不是真正的错误。

    将日志级别调整为 warn 可以避免误判错误信息,提高代码的健壮性。

  7. 考虑对 timerIdGen 进行优化,例如使用 AtomicLongFieldUpdater 来避免使用 AtomicLong 对象。

    使用 AtomicLongFieldUpdater 可以减少对象创建和垃圾回收的开销,提升性能。

**优化后的代码示例:**javaimport com.taobao.hsf.NamedThreadFactory;import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;import io.vertx.core.Context;import lombok.extern.slf4j.Slf4j;import org.apache.commons.collections4.CollectionUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Async;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.time.Duration;import java.util.Objects;import java.util.Set;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicLong;import java.util.concurrent.atomic.AtomicLongFieldUpdater;

@Component@Slf4jpublic final class Scheduler {

private static final AtomicLongFieldUpdater<Scheduler> TIMER_ID_GEN_UPDATER =            AtomicLongFieldUpdater.newUpdater(Scheduler.class, 'timerIdGen');

private static final int MAX_TICKS_PER_WHEEL = 2048;    private static final int MINIMAL_TICK_IN_MILLIS = 1;

private final ScheduledExecutorService scheduledExecutorService;

private volatile long timerIdGen;

private final ConcurrentMap<Object, ScheduledExecutorService.ScheduledFuture<?>> timerKey2Futures =            new ConcurrentHashMap<>();

@Autowired    public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) {        final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler();        final int tickInMillis = Math.max(MINIMAL_TICK_IN_MILLIS, schedulerConfig.getTickDurationInMillis());        final int ticksPerWheel = Math.max(MAX_TICKS_PER_WHEEL, schedulerConfig.getTicksPerWheel());        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(                schedulerConfig.getThreadCount(),                new NamedThreadFactory('localserver-scheduler')        );    }

@Async    public Long schedule(final Duration delay, final Runnable runnable, final Context context, boolean fixedDelay) {        final long id = TIMER_ID_GEN_UPDATER.incrementAndGet(this);        schedule(id, delay, runnable, context, fixedDelay);        return id;    }

@Async    public void schedule(final Object timerKey, final Duration delay, final Runnable runnable,                       final Context context, boolean fixedDelay) {        Objects.requireNonNull(timerKey, 'timerKey should not be null.');        Objects.requireNonNull(delay, 'delay should not be null.');        Objects.requireNonNull(runnable, 'runnable should not be null.');        Objects.requireNonNull(context, 'context should not be null.');

    timerKey2Futures.compute(timerKey, (key, future) -> {            if (future != null && !future.isCancelled() && !future.isDone()) {                future.cancel(false);            }

        if (fixedDelay) {                return scheduledExecutorService.scheduleWithFixedDelay(                        () -> context.runOnContext(event -> runnable.run()),                        delay.toMillis(), delay.toMillis(), TimeUnit.MILLISECONDS);            } else {                return scheduledExecutorService.schedule(                        () -> context.runOnContext(event -> runnable.run()),                        delay.toMillis(), TimeUnit.MILLISECONDS);            }        });    }

public boolean cancel(final Object timerKey) {        final ScheduledExecutorService.ScheduledFuture<?> future = timerKey2Futures.get(timerKey);        if (future == null || future.isCancelled() || future.isDone()) {            return true;        }        return future.cancel(false);    }

@PostConstruct    protected void init() {        // 初始化操作    }

@PreDestroy    protected void stop() {        final Set<ScheduledExecutorService.ScheduledFuture<?>> futures = scheduledExecutorService.shutdownNow();        if (CollectionUtils.isNotEmpty(futures)) {            futures.stream()                    .filter(future -> !future.isCancelled() && !future.isDone())                    .forEach(future -> {                        final Runnable task = future.get()::run;                        try {                            task.run();                        } catch (Exception e) {                            log.warn('Error when trigger task of runnable:[{}]', task, e);                        }                    });        }
高性能多线程帧同步服务器定时器优化方案

原文地址: https://www.cveoy.top/t/topic/lH7M 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录