[Fix] TimeLimitThreadPoolExecutor 修复了超时提醒遭到时限线程池中断的问题, 调整线程池仅发起一次中断;

This commit is contained in:
LamGC 2020-06-17 16:14:32 +08:00
parent bd6b825704
commit 1c742bfb6f

View File

@ -2,6 +2,7 @@ package net.lamgc.cgj.util;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -22,29 +23,53 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor {
*/ */
private final AtomicLong timeoutCheckInterval = new AtomicLong(100); private final AtomicLong timeoutCheckInterval = new AtomicLong(100);
private final Map<Thread, AtomicLong> workerThreadMap = new Hashtable<>(); private final Map<Thread, MonitorInfo> workerThreadMap = new Hashtable<>();
private final Thread timeoutCheckThread = createTimeoutCheckThread(); private final Thread timeoutCheckThread = createTimeoutCheckThread();
public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { public TimeLimitThreadPoolExecutor(long executeLimitTime,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
setInitialTime(executeLimitTime); setInitialTime(executeLimitTime);
timeoutCheckThread.start(); timeoutCheckThread.start();
} }
public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { public TimeLimitThreadPoolExecutor(long executeLimitTime,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
setInitialTime(executeLimitTime); setInitialTime(executeLimitTime);
timeoutCheckThread.start(); timeoutCheckThread.start();
} }
public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { public TimeLimitThreadPoolExecutor(long executeLimitTime,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
setInitialTime(executeLimitTime); setInitialTime(executeLimitTime);
timeoutCheckThread.start(); timeoutCheckThread.start();
} }
public TimeLimitThreadPoolExecutor(long executeLimitTime, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { public TimeLimitThreadPoolExecutor(long executeLimitTime,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
setInitialTime(executeLimitTime); setInitialTime(executeLimitTime);
timeoutCheckThread.start(); timeoutCheckThread.start();
@ -111,11 +136,13 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor {
Thread.sleep(interval); Thread.sleep(interval);
// 检查是否存在超时的任务 // 检查是否存在超时的任务
workerThreadMap.forEach((thread, time) -> { final long executeTimeLimit = this.executeTimeLimit.get();
long currentTime = time.getAndAdd(interval); workerThreadMap.forEach((thread, info) -> {
if(currentTime > executeTimeLimit.get()) { long currentTime = info.getTimeRemaining().getAndAdd(interval);
if(!thread.isInterrupted()) { if(currentTime > executeTimeLimit) {
if(!info.getNotifyInterrupted().get() && !thread.isInterrupted()) {
thread.interrupt(); thread.interrupt();
info.getNotifyInterrupted().set(true);
} }
} }
}); });
@ -131,7 +158,7 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor {
@Override @Override
protected void beforeExecute(Thread t, Runnable r) { protected void beforeExecute(Thread t, Runnable r) {
workerThreadMap.put(t, new AtomicLong()); workerThreadMap.put(t, new MonitorInfo());
super.beforeExecute(t, r); super.beforeExecute(t, r);
} }
@ -146,4 +173,20 @@ public class TimeLimitThreadPoolExecutor extends ThreadPoolExecutor {
this.timeoutCheckThread.interrupt(); this.timeoutCheckThread.interrupt();
super.terminated(); super.terminated();
} }
private static class MonitorInfo {
private final AtomicLong timeRemaining = new AtomicLong();
private final AtomicBoolean notifyInterrupted = new AtomicBoolean(false);
public AtomicBoolean getNotifyInterrupted() {
return notifyInterrupted;
}
public AtomicLong getTimeRemaining() {
return timeRemaining;
}
}
} }