diff --git a/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java b/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java index 64d1224..11d7993 100644 --- a/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java +++ b/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java @@ -2,6 +2,7 @@ package net.lamgc.cgj.util; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -22,29 +23,53 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { */ private final AtomicLong timeoutCheckInterval = new AtomicLong(100); - private final Map workerThreadMap = new Hashtable<>(); + private final Map workerThreadMap = new Hashtable<>(); private final Thread timeoutCheckThread = createTimeoutCheckThread(); - public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + public TimeLimitThreadPoolExecutor(long executeLimitTime, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); setInitialTime(executeLimitTime); timeoutCheckThread.start(); } - public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + public TimeLimitThreadPoolExecutor(long executeLimitTime, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); setInitialTime(executeLimitTime); timeoutCheckThread.start(); } - public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { + public TimeLimitThreadPoolExecutor(long executeLimitTime, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); setInitialTime(executeLimitTime); timeoutCheckThread.start(); } - public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + public TimeLimitThreadPoolExecutor(long executeLimitTime, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); setInitialTime(executeLimitTime); timeoutCheckThread.start(); @@ -111,11 +136,13 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { Thread.sleep(interval); // 检查是否存在超时的任务 - workerThreadMap.forEach((thread, time) -> { - long currentTime = time.getAndAdd(interval); - if(currentTime > executeTimeLimit.get()) { - if(!thread.isInterrupted()) { + final long executeTimeLimit = this.executeTimeLimit.get(); + workerThreadMap.forEach((thread, info) -> { + long currentTime = info.getTimeRemaining().getAndAdd(interval); + if(currentTime > executeTimeLimit) { + if(!info.getNotifyInterrupted().get() && !thread.isInterrupted()) { thread.interrupt(); + info.getNotifyInterrupted().set(true); } } }); @@ -131,7 +158,7 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { @Override protected void beforeExecute(Thread t, Runnable r) { - workerThreadMap.put(t, new AtomicLong()); + workerThreadMap.put(t, new MonitorInfo()); super.beforeExecute(t, r); } @@ -146,4 +173,20 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { this.timeoutCheckThread.interrupt(); super.terminated(); } + + private static class MonitorInfo { + + private final AtomicLong timeRemaining = new AtomicLong(); + + private final AtomicBoolean notifyInterrupted = new AtomicBoolean(false); + + public AtomicBoolean getNotifyInterrupted() { + return notifyInterrupted; + } + + public AtomicLong getTimeRemaining() { + return timeRemaining; + } + } + }