[Change][Document] Core 修正 ThreadPoolEventExecutor 文档错误, 添加对 Cancelable 事件的操作细节;

[Change] ThreadPoolEventExecutor 适配 HandlerRegistry 更改;
[Change] ThreadPoolEventExecutor 对内部类 'ExecuteRunnable' 添加对 Cancelable 事件的更多处理;
[Document] ThreadPoolEventExecutor 修正文档错误;
This commit is contained in:
LamGC 2021-01-01 10:16:09 +08:00
parent 82759225b3
commit f1e248a702
Signed by: LamGC
GPG Key ID: 6C5AE2A913941E1D

View File

@ -24,25 +24,27 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
/** /**
* 基于线程池, 以事件为单位的异步事件执行器. * 基于线程池, 以事件为单位的异步事件执行器.
* <p> 以事件为单位对于 ContentGrabbingJi 来说是实现难度低, 还很合适的, * <p> 以事件为单位对于 ContentGrabbingJi 来说不仅合适而且实现难度低,
* ContentGrabbingJi 只会注册一个 Handler, 所以依然类似于一个 Handler 一个线程. * ContentGrabbingJi 只会注册一个 Handler, 所以依然类似于一个 Handler 一个线程.
* @author LamGC * @author LamGC
*/ */
public class ThreadPoolEventExecutor implements EventExecutor { public class ThreadPoolEventExecutor implements EventExecutor {
private final ThreadPoolExecutor threadExecutor; private final ThreadPoolExecutor threadExecutor;
private final EventHandlerRegistry registry; private final HandlerRegistry registry;
/** /**
* 构造线程池事件执行器. * 构造线程池事件执行器.
* @param threadExecutor 执行器所使用的线程池. * @param threadExecutor 执行器所使用的线程池.
* @param registry 事件处理注册器. * @param registry 事件处理注册器.
*/ */
public ThreadPoolEventExecutor(ThreadPoolExecutor threadExecutor, EventHandlerRegistry registry) { public ThreadPoolEventExecutor(ThreadPoolExecutor threadExecutor, HandlerRegistry registry) {
this.threadExecutor = threadExecutor; this.threadExecutor = threadExecutor;
this.registry = registry; this.registry = registry;
} }
@ -60,12 +62,13 @@ public class ThreadPoolEventExecutor implements EventExecutor {
/** /**
* 按事件为单位的事件处理执行类. * 按事件为单位的事件处理执行类.
*/ */
private final static class ExecuteRunnable implements Runnable { private final static class ExecuteRunnable implements Runnable, Observer {
private final static Logger log = LoggerFactory.getLogger(ExecuteRunnable.class); private final static Logger log = LoggerFactory.getLogger(ExecuteRunnable.class);
private final EventObject event; private final EventObject event;
private final Map<Method, Object> handlerMethods; private final Map<Method, Object> handlerMethods;
private Thread currentExecuteThread;
private ExecuteRunnable(EventObject event, Map<Method, Object> handlerMethods) { private ExecuteRunnable(EventObject event, Map<Method, Object> handlerMethods) {
this.event = Objects.requireNonNull(event); this.event = Objects.requireNonNull(event);
@ -75,6 +78,11 @@ public class ThreadPoolEventExecutor implements EventExecutor {
@Override @Override
public void run() { public void run() {
if (event instanceof Cancelable) { if (event instanceof Cancelable) {
if (((Cancelable) event).canceled()) {
log.trace("事件 {} 在处理前被取消.", event);
return;
}
registerCancelHook((Cancelable) event);
runCancelableEvent(); runCancelableEvent();
} else { } else {
runOrdinaryEvent(); runOrdinaryEvent();
@ -132,6 +140,22 @@ public class ThreadPoolEventExecutor implements EventExecutor {
} }
} }
private void registerCancelHook(Cancelable cancelableEvent) {
if (!cancelableEvent.observableCancel()) {
log.trace("事件 {} 不支持注册取消状态观察者.", cancelableEvent);
return;
}
this.currentExecuteThread = Thread.currentThread();
cancelableEvent.registerCancelObserver(this);
}
@Override
public void update(Observable o, Object arg) {
if (arg == event && this.currentExecuteThread != null) {
log.debug("事件 {} 取消状态确认, 已对线程 {} 发起中断.", event, currentExecuteThread);
this.currentExecuteThread.interrupt();
}
}
} }
} }