高性能多线程帧同步服务器定时器优化方案
高性能多线程帧同步服务器定时器优化方案
针对多线程帧同步服务器中的定时器 Scheduler 类,优化代码以提高性能和可维护性。
优化方案:
-
将
ConcurrentHashMap改为ConcurrentMap更加通用化。将
ConcurrentHashMap替换为ConcurrentMap可以提升代码的泛用性,使其适用于更多的场景。 -
使用 Java 8 的
CompletableFuture来实现异步任务,避免显式地使用线程。使用
CompletableFuture可以更优雅地处理异步操作,提高代码的可读性和可维护性,并避免显式地创建和管理线程。 -
考虑使用
ScheduledExecutorService来代替HashedWheelTimer,因为ScheduledExecutorService更加稳定,且在大量任务的情况下性能更好。ScheduledExecutorService是 Java 提供的标准线程池,功能强大且稳定,在处理大量任务时比HashedWheelTimer更加高效。 -
考虑将
scheduleOnce和scheduleWithFixDelay合并为一个方法,使用参数来控制是否是固定延迟。合并方法可以简化代码结构,提高代码的可读性和可维护性。
-
考虑使用 Spring 的
@Async注解来实现异步任务,避免手动创建线程池。使用 Spring 的
@Async注解可以更方便地实现异步任务,避免手动创建和管理线程池。 -
将
log.error改为log.warn,因为这里的异常可能是由于定时器因为某些原因被取消,而不是真正的错误。将日志级别调整为
warn可以避免误判错误信息,提高代码的健壮性。 -
考虑对
timerIdGen进行优化,例如使用AtomicLongFieldUpdater来避免使用AtomicLong对象。使用
AtomicLongFieldUpdater可以减少对象创建和垃圾回收的开销,提升性能。
**优化后的代码示例:**javaimport com.taobao.hsf.NamedThreadFactory;import com.taobao.powermsg3.sdk.localserver.PmLocalServerSDKConfig;import io.vertx.core.Context;import lombok.extern.slf4j.Slf4j;import org.apache.commons.collections4.CollectionUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Async;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.time.Duration;import java.util.Objects;import java.util.Set;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicLong;import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@Component@Slf4jpublic final class Scheduler {
private static final AtomicLongFieldUpdater<Scheduler> TIMER_ID_GEN_UPDATER = AtomicLongFieldUpdater.newUpdater(Scheduler.class, 'timerIdGen');
private static final int MAX_TICKS_PER_WHEEL = 2048; private static final int MINIMAL_TICK_IN_MILLIS = 1;
private final ScheduledExecutorService scheduledExecutorService;
private volatile long timerIdGen;
private final ConcurrentMap<Object, ScheduledExecutorService.ScheduledFuture<?>> timerKey2Futures = new ConcurrentHashMap<>();
@Autowired public Scheduler(final PmLocalServerSDKConfig pmLocalServerSDKConfig) { final PmLocalServerSDKConfig.SchedulerConfig schedulerConfig = pmLocalServerSDKConfig.getScheduler(); final int tickInMillis = Math.max(MINIMAL_TICK_IN_MILLIS, schedulerConfig.getTickDurationInMillis()); final int ticksPerWheel = Math.max(MAX_TICKS_PER_WHEEL, schedulerConfig.getTicksPerWheel()); this.scheduledExecutorService = new ScheduledThreadPoolExecutor( schedulerConfig.getThreadCount(), new NamedThreadFactory('localserver-scheduler') ); }
@Async public Long schedule(final Duration delay, final Runnable runnable, final Context context, boolean fixedDelay) { final long id = TIMER_ID_GEN_UPDATER.incrementAndGet(this); schedule(id, delay, runnable, context, fixedDelay); return id; }
@Async public void schedule(final Object timerKey, final Duration delay, final Runnable runnable, final Context context, boolean fixedDelay) { Objects.requireNonNull(timerKey, 'timerKey should not be null.'); Objects.requireNonNull(delay, 'delay should not be null.'); Objects.requireNonNull(runnable, 'runnable should not be null.'); Objects.requireNonNull(context, 'context should not be null.');
timerKey2Futures.compute(timerKey, (key, future) -> { if (future != null && !future.isCancelled() && !future.isDone()) { future.cancel(false); }
if (fixedDelay) { return scheduledExecutorService.scheduleWithFixedDelay( () -> context.runOnContext(event -> runnable.run()), delay.toMillis(), delay.toMillis(), TimeUnit.MILLISECONDS); } else { return scheduledExecutorService.schedule( () -> context.runOnContext(event -> runnable.run()), delay.toMillis(), TimeUnit.MILLISECONDS); } }); }
public boolean cancel(final Object timerKey) { final ScheduledExecutorService.ScheduledFuture<?> future = timerKey2Futures.get(timerKey); if (future == null || future.isCancelled() || future.isDone()) { return true; } return future.cancel(false); }
@PostConstruct protected void init() { // 初始化操作 }
@PreDestroy protected void stop() { final Set<ScheduledExecutorService.ScheduledFuture<?>> futures = scheduledExecutorService.shutdownNow(); if (CollectionUtils.isNotEmpty(futures)) { futures.stream() .filter(future -> !future.isCancelled() && !future.isDone()) .forEach(future -> { final Runnable task = future.get()::run; try { task.run(); } catch (Exception e) { log.warn('Error when trigger task of runnable:[{}]', task, e); } }); }
原文地址: https://www.cveoy.top/t/topic/lH7M 著作权归作者所有。请勿转载和采集!