From 1c742bfb6f94099fc06a4905efa12e1fec72fef8 Mon Sep 17 00:00:00 2001 From: LamGC Date: Wed, 17 Jun 2020 16:14:32 +0800 Subject: [PATCH] =?UTF-8?q?[Fix]=20TimeLimitThreadPoolExecutor=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E4=BA=86=E8=B6=85=E6=97=B6=E6=8F=90=E9=86=92=E9=81=AD?= =?UTF-8?q?=E5=88=B0=E6=97=B6=E9=99=90=E7=BA=BF=E7=A8=8B=E6=B1=A0=E4=B8=AD?= =?UTF-8?q?=E6=96=AD=E7=9A=84=E9=97=AE=E9=A2=98,=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E4=BB=85=E5=8F=91=E8=B5=B7=E4=B8=80?= =?UTF-8?q?=E6=AC=A1=E4=B8=AD=E6=96=AD;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cgj/util/TimeLimitThreadPoolExecutor.java | 63 ++++++++++++++++--- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java b/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java index 64d1224..11d7993 100644 --- a/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java +++ b/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java @@ -2,6 +2,7 @@ package net.lamgc.cgj.util; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -22,29 +23,53 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { */ private final AtomicLong timeoutCheckInterval = new AtomicLong(100); - private final Map workerThreadMap = new Hashtable<>(); + private final Map workerThreadMap = new Hashtable<>(); private final Thread timeoutCheckThread = createTimeoutCheckThread(); - public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + public TimeLimitThreadPoolExecutor(long executeLimitTime, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); setInitialTime(executeLimitTime); timeoutCheckThread.start(); } - public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + public TimeLimitThreadPoolExecutor(long executeLimitTime, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); setInitialTime(executeLimitTime); timeoutCheckThread.start(); } - public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { + public TimeLimitThreadPoolExecutor(long executeLimitTime, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); setInitialTime(executeLimitTime); timeoutCheckThread.start(); } - public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + public TimeLimitThreadPoolExecutor(long executeLimitTime, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); setInitialTime(executeLimitTime); timeoutCheckThread.start(); @@ -111,11 +136,13 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { Thread.sleep(interval); // 检查是否存在超时的任务 - workerThreadMap.forEach((thread, time) -> { - long currentTime = time.getAndAdd(interval); - if(currentTime > executeTimeLimit.get()) { - if(!thread.isInterrupted()) { + final long executeTimeLimit = this.executeTimeLimit.get(); + workerThreadMap.forEach((thread, info) -> { + long currentTime = info.getTimeRemaining().getAndAdd(interval); + if(currentTime > executeTimeLimit) { + if(!info.getNotifyInterrupted().get() && !thread.isInterrupted()) { thread.interrupt(); + info.getNotifyInterrupted().set(true); } } }); @@ -131,7 +158,7 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { @Override protected void beforeExecute(Thread t, Runnable r) { - workerThreadMap.put(t, new AtomicLong()); + workerThreadMap.put(t, new MonitorInfo()); super.beforeExecute(t, r); } @@ -146,4 +173,20 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { this.timeoutCheckThread.interrupt(); super.terminated(); } + + private static class MonitorInfo { + + private final AtomicLong timeRemaining = new AtomicLong(); + + private final AtomicBoolean notifyInterrupted = new AtomicBoolean(false); + + public AtomicBoolean getNotifyInterrupted() { + return notifyInterrupted; + } + + public AtomicLong getTimeRemaining() { + return timeRemaining; + } + } + }