diff --git a/ContentGrabbingJi-core/src/main/java/net/lamgc/cgj/bot/event/ThreadPoolEventExecutor.java b/ContentGrabbingJi-core/src/main/java/net/lamgc/cgj/bot/event/ThreadPoolEventExecutor.java index d653c29..1657050 100644 --- a/ContentGrabbingJi-core/src/main/java/net/lamgc/cgj/bot/event/ThreadPoolEventExecutor.java +++ b/ContentGrabbingJi-core/src/main/java/net/lamgc/cgj/bot/event/ThreadPoolEventExecutor.java @@ -24,25 +24,27 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; import java.util.Objects; +import java.util.Observable; +import java.util.Observer; import java.util.concurrent.ThreadPoolExecutor; /** * 基于线程池, 以事件为单位的异步事件执行器. - *

以事件为单位对于 ContentGrabbingJi 来说是实现难度低, 还很合适的, + *

以事件为单位对于 ContentGrabbingJi 来说不仅合适,而且实现难度低, * ContentGrabbingJi 只会注册一个 Handler, 所以依然类似于一个 Handler 一个线程. * @author LamGC */ public class ThreadPoolEventExecutor implements EventExecutor { private final ThreadPoolExecutor threadExecutor; - private final EventHandlerRegistry registry; + private final HandlerRegistry registry; /** * 构造线程池事件执行器. * @param threadExecutor 执行器所使用的线程池. * @param registry 事件处理注册器. */ - public ThreadPoolEventExecutor(ThreadPoolExecutor threadExecutor, EventHandlerRegistry registry) { + public ThreadPoolEventExecutor(ThreadPoolExecutor threadExecutor, HandlerRegistry registry) { this.threadExecutor = threadExecutor; this.registry = registry; } @@ -60,12 +62,13 @@ public class ThreadPoolEventExecutor implements EventExecutor { /** * 按事件为单位的事件处理执行类. */ - private final static class ExecuteRunnable implements Runnable { + private final static class ExecuteRunnable implements Runnable, Observer { private final static Logger log = LoggerFactory.getLogger(ExecuteRunnable.class); private final EventObject event; private final Map handlerMethods; + private Thread currentExecuteThread; private ExecuteRunnable(EventObject event, Map handlerMethods) { this.event = Objects.requireNonNull(event); @@ -75,6 +78,11 @@ public class ThreadPoolEventExecutor implements EventExecutor { @Override public void run() { if (event instanceof Cancelable) { + if (((Cancelable) event).canceled()) { + log.trace("事件 {} 在处理前被取消.", event); + return; + } + registerCancelHook((Cancelable) event); runCancelableEvent(); } else { runOrdinaryEvent(); @@ -132,6 +140,22 @@ public class ThreadPoolEventExecutor implements EventExecutor { } } + private void registerCancelHook(Cancelable cancelableEvent) { + if (!cancelableEvent.observableCancel()) { + log.trace("事件 {} 不支持注册取消状态观察者.", cancelableEvent); + return; + } + this.currentExecuteThread = Thread.currentThread(); + cancelableEvent.registerCancelObserver(this); + } + + @Override + public void update(Observable o, Object arg) { + if (arg == event && this.currentExecuteThread != null) { + log.debug("事件 {} 取消状态确认, 已对线程 {} 发起中断.", event, currentExecuteThread); + this.currentExecuteThread.interrupt(); + } + } } }