From f279d99fda1dc3042bb738bdc598fa68f5284e85 Mon Sep 17 00:00:00 2001 From: LamGC Date: Mon, 4 May 2020 02:04:59 +0800 Subject: [PATCH] =?UTF-8?q?[Add]=20TimeLimitThreadPoolExecutor=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E4=B8=80=E4=B8=AA=E5=B8=A6=E6=9C=89=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E9=99=90=E5=88=B6=E7=9A=84=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E5=8F=8A=E5=AF=B9=E5=BA=94=E5=8D=95=E5=85=83=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=B1=BB;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cgj/util/TimeLimitThreadPoolExecutor.java | 151 ++++++++++++++++++ .../util/TimeLimitThreadPoolExecutorTest.java | 30 ++++ 2 files changed, 181 insertions(+) create mode 100644 src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java create mode 100644 src/test/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutorTest.java diff --git a/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java b/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java new file mode 100644 index 0000000..fba952a --- /dev/null +++ b/src/main/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutor.java @@ -0,0 +1,151 @@ +package net.lamgc.cgj.util; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 带有时间限制的线程池. + * 当线程超出了限制时间时, 将会对该线程发出中断. + */ +public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor { + + /** + * 执行时间限制, 单位毫秒. + * 默认30s. + */ + private final AtomicLong executeTimeLimit = new AtomicLong(); + + /** + * 检查间隔时间. + * 默认100ms. + */ + private final AtomicLong timeoutCheckInterval = new AtomicLong(100); + + private final Map workerThreadMap = new Hashtable<>(); + + private final Thread timeoutCheckThread = createTimeoutCheckThread(); + + public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + setInitialTime(0, executeLimitTime); + timeoutCheckThread.start(); + } + + public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + setInitialTime(0, executeLimitTime); + timeoutCheckThread.start(); + } + + public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + setInitialTime(0, executeLimitTime); + timeoutCheckThread.start(); + } + + public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + setInitialTime(0, executeLimitTime); + timeoutCheckThread.start(); + } + + private void setInitialTime(long checkInterval, long executeLimitTime) { + if(checkInterval > 0) { + timeoutCheckInterval.set(checkInterval); + } + if(executeLimitTime > 0) { + executeTimeLimit.set(executeLimitTime); + } + } + + /** + * 设置执行时间. + *

注意: 该修改仅在线程池完全停止后才有效

+ * @see #isTerminated() + * @param time 新的限制时间(ms) + */ + public void setExecuteTimeLimit(long time) { + if(time <= 0) { + throw new IllegalArgumentException("Time is not allowed to be set to 0 or less"); + } + if(this.isTerminated()) { + executeTimeLimit.set(time); + } + } + + /** + * 设置超时检查间隔. + *

该方法仅会在当前检查后生效.

+ * @param time 新的检查间隔(ms) + */ + public void setTimeoutCheckInterval(long time) { + if(time <= 0) { + throw new IllegalArgumentException("Time is not allowed to be set to 0 or less"); + } + timeoutCheckInterval.set(time); + } + + /** + * 获取当前设置的执行时间限制. + * @return 执行时间限制(ms). + */ + public long getExecuteTimeLimit() { + return executeTimeLimit.get(); + } + + /** + * 获取当前设定的超时检查间隔 + * @return 间隔时间(ms). + */ + public long getTimeoutCheckInterval() { + return timeoutCheckInterval.get(); + } + + private Thread createTimeoutCheckThread() { + Thread checkThread = new Thread(() -> { + if(executeTimeLimit.get() <= 0) { + return; + } + while (true) { + try { + long interval = this.timeoutCheckInterval.get(); + Thread.sleep(interval); + + // 检查是否存在超时的任务 + workerThreadMap.forEach((thread, time) -> { + long currentTime = time.getAndAdd(interval); + if(currentTime > executeTimeLimit.get()) { + if(!thread.isInterrupted()) { + thread.interrupt(); + } + } + }); + } catch(InterruptedException ignored) { + break; + } + } + }); + + checkThread.setName("ThreadPool-" + Integer.toHexString(this.hashCode()) +"-TimeoutCheck"); + return checkThread; + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + workerThreadMap.put(t, new AtomicLong()); + super.beforeExecute(t, r); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + workerThreadMap.remove(Thread.currentThread()); + super.afterExecute(r, t); + } + + @Override + protected void terminated() { + this.timeoutCheckThread.interrupt(); + super.terminated(); + } +} diff --git a/src/test/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutorTest.java b/src/test/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutorTest.java new file mode 100644 index 0000000..d8fa89f --- /dev/null +++ b/src/test/java/net/lamgc/cgj/util/TimeLimitThreadPoolExecutorTest.java @@ -0,0 +1,30 @@ +package net.lamgc.cgj.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class TimeLimitThreadPoolExecutorTest { + + @Test + public void timeoutTest() throws InterruptedException { + TimeLimitThreadPoolExecutor executor = new TimeLimitThreadPoolExecutor(1000, 1, 1, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50)); + System.out.println(executor.isTerminated()); + System.out.println(executor.isShutdown()); + + executor.setTimeoutCheckInterval(150); + System.out.println("当前设定: ETL: " + executor.getExecuteTimeLimit() + "ms, TCI: " + executor.getTimeoutCheckInterval() + "ms"); + executor.execute(() -> { + try { + Thread.sleep(5 * 1000); + } catch (InterruptedException e) { + System.out.println("线程 " + Thread.currentThread().getName() + " 被中断"); + } + }); + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(5 * 1000, TimeUnit.MILLISECONDS)); + } + +}