diff --git a/src/main/java/net/lamgc/cgj/bot/BotCommandProcess.java b/src/main/java/net/lamgc/cgj/bot/BotCommandProcess.java index 13ae9a5..f2fa171 100644 --- a/src/main/java/net/lamgc/cgj/bot/BotCommandProcess.java +++ b/src/main/java/net/lamgc/cgj/bot/BotCommandProcess.java @@ -2,7 +2,6 @@ package net.lamgc.cgj.bot; import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.*; import io.netty.handler.codec.http.HttpHeaderNames; import net.lamgc.cgj.Main; @@ -15,7 +14,6 @@ import net.lamgc.cgj.pixiv.PixivURL; import net.lamgc.cgj.util.URLs; import net.lamgc.utils.base.runner.Argument; import net.lamgc.utils.base.runner.Command; -import net.lamgc.utils.event.EventExecutor; import net.lz1998.cq.utils.CQCode; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; @@ -29,9 +27,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "SameParameterValue"}) @@ -56,20 +51,6 @@ public class BotCommandProcess { private final static CacheStore> pagesCache = new StringListRedisCacheStore(BotEventHandler.redisServer, "imagePages"); public final static CacheStore reportStore = new JsonRedisCacheStore(BotEventHandler.redisServer, "report", gson); - /** - * 图片异步缓存执行器 - */ - private final static EventExecutor imageCacheExecutor = new EventExecutor(new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() >= 2 ? 2 : 1, - (int) Math.ceil(Runtime.getRuntime().availableProcessors() / 2F), - 5L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(128), - new ThreadFactoryBuilder() - .setNameFormat("imageCacheThread-%d") - .build(), - new ThreadPoolExecutor.DiscardOldestPolicy() - )); - private final static RankingUpdateTimer updateTimer = new RankingUpdateTimer(); public static void initialize() { @@ -77,12 +58,6 @@ public class BotCommandProcess { SettingProperties.loadProperties(); - try { - imageCacheExecutor.addHandler(new ImageCacheHandler()); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } - updateTimer.schedule(null); log.info("初始化完成."); } @@ -544,6 +519,20 @@ public class BotCommandProcess { @Argument(name = "quality", force = false) PixivDownload.PageQuality quality, @Argument(name = "page", force = false, defaultValue = "1") int pageIndex) { log.debug("IllustId: {}, Quality: {}, PageIndex: {}", illustId, quality.name(), pageIndex); + + try { + if (isNoSafe(illustId, SettingProperties.getProperties(fromGroup), false)) { + log.warn("作品 {} 存在R-18内容且设置\"image.allowR18\"为false,将屏蔽该作品不发送.", illustId); + return "(根据设置,该作品已被屏蔽!)"; + } else if(isReported(illustId)) { + log.warn("作品Id {} 被报告, 正在等待审核, 跳过该作品.", illustId); + return "(该作品已被封印)"; + } + } catch (IOException e) { + log.warn("作品信息无法获取!", e); + return "发生网络异常,无法获取图片!"; + } + List pagesList; try { pagesList = getIllustPages(illustId, quality, false); @@ -564,19 +553,6 @@ public class BotCommandProcess { return "指定的页数超出了范围(总共 " + pagesList.size() + " 页)"; } - try { - if (isNoSafe(illustId, SettingProperties.getProperties(fromGroup), false)) { - log.warn("作品 {} 存在R-18内容且设置\"image.allowR18\"为false,将屏蔽该作品不发送.", illustId); - return "(根据设置,该作品已被屏蔽!)"; - } else if(isReported(illustId)) { - log.warn("作品Id {} 被报告, 正在等待审核, 跳过该作品.", illustId); - return "(该作品已被封印)"; - } - } catch (IOException e) { - log.warn("作品信息无法获取!", e); - return "发生网络异常,无法获取图片!"; - } - String downloadLink = pagesList.get(pageIndex - 1); String fileName = URLs.getResourceName(Strings.nullToEmpty(downloadLink)); File imageFile = new File(getImageStoreDir(), downloadLink.substring(downloadLink.lastIndexOf("/") + 1)); @@ -601,12 +577,11 @@ public class BotCommandProcess { } } - ImageCacheObject taskObject = new ImageCacheObject(imageCache, illustId, downloadLink, imageFile); try { - imageCacheExecutor.executorSync(taskObject); + ImageCacheStore.executeCacheRequest(new ImageCacheObject(imageCache, illustId, downloadLink, imageFile)); } catch (InterruptedException e) { - log.error("等待图片下载时发生中断", e); - return "图片获取失败!"; + log.warn("图片缓存被中断", e); + return "(错误:图片获取超时)"; } } else { log.debug("图片 {} 缓存命中.", fileName); diff --git a/src/main/java/net/lamgc/cgj/bot/cache/ImageCacheHandler.java b/src/main/java/net/lamgc/cgj/bot/cache/ImageCacheHandler.java index e76e6e2..20b6eae 100644 --- a/src/main/java/net/lamgc/cgj/bot/cache/ImageCacheHandler.java +++ b/src/main/java/net/lamgc/cgj/bot/cache/ImageCacheHandler.java @@ -28,9 +28,9 @@ public class ImageCacheHandler implements EventHandler { private final static Set cacheQueue = Collections.synchronizedSet(new HashSet<>()); @SuppressWarnings("unused") - public void getImageToCache(ImageCacheObject event) { + public void getImageToCache(ImageCacheObject event) throws Exception { if(cacheQueue.contains(event)) { - log.debug("图片 {} 已存在相同缓存任务, 跳过.", event.getStoreFile().getName()); + log.warn("图片 {} 已存在相同缓存任务, 跳过.", event.getStoreFile().getName()); return; } else { cacheQueue.add(event); @@ -43,11 +43,11 @@ public class ImageCacheHandler implements EventHandler { try { if(!storeFile.exists() && !storeFile.createNewFile()) { log.error("无法创建文件(Path: {})", storeFile.getAbsolutePath()); - return; + throw new IOException("Failed to create file"); } } catch (IOException e) { log.error("无法创建文件(Path: {})", storeFile.getAbsolutePath()); - e.printStackTrace(); + throw e; } HttpGet request = new HttpGet(event.getDownloadLink()); @@ -57,11 +57,11 @@ public class ImageCacheHandler implements EventHandler { response = httpClient.execute(request); } catch (IOException e) { log.error("Http请求时发生异常", e); - return; + throw e; } if(response.getStatusLine().getStatusCode() != 200) { log.warn("Http请求异常:{}", response.getStatusLine()); - return; + throw new IOException("Http Response Error: " + response.getStatusLine()); } log.debug("正在下载...(Content-Length: {}KB)", response.getEntity().getContentLength() / 1024); @@ -69,7 +69,7 @@ public class ImageCacheHandler implements EventHandler { IOUtils.copy(response.getEntity().getContent(), fos); } catch (IOException e) { log.error("下载图片时发生异常", e); - return; + throw e; } event.getImageCache().put(URLs.getResourceName(event.getDownloadLink()), storeFile); } finally { diff --git a/src/main/java/net/lamgc/cgj/bot/cache/ImageCacheStore.java b/src/main/java/net/lamgc/cgj/bot/cache/ImageCacheStore.java new file mode 100644 index 0000000..977369b --- /dev/null +++ b/src/main/java/net/lamgc/cgj/bot/cache/ImageCacheStore.java @@ -0,0 +1,109 @@ +package net.lamgc.cgj.bot.cache; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Hashtable; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public final class ImageCacheStore { + + private final static Logger log = LoggerFactory.getLogger(ImageCacheStore.class.getName()); + + private final static Map cacheMap = new Hashtable<>(); + + private final static ThreadPoolExecutor imageCacheExecutor = new ThreadPoolExecutor( + 4, 6, + 30L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(100), + new ThreadFactoryBuilder() + .setNameFormat("ImageCacheThread-%d") + .build() + ); + + private final static ImageCacheHandler handler = new ImageCacheHandler(); + + private ImageCacheStore() {} + + /** + * 传递图片缓存任务, 并等待缓存完成. + * @param cacheObject 缓存任务组 + */ + public static void executeCacheRequest(ImageCacheObject cacheObject) throws InterruptedException { + Task task = getTaskState(cacheObject); + if(task.taskState.get() == TaskState.COMPLETE) { + return; + } + + boolean locked = false; + try { + if(task.taskState.get() == TaskState.COMPLETE) { + return; + } + task.lock.lock(); + locked = true; + // 双重检查 + if(task.taskState.get() == TaskState.COMPLETE) { + return; + } + + // 置任务状态 + task.taskState.set(TaskState.RUNNING); + + try { + Throwable throwable = imageCacheExecutor.submit(() -> { + try { + handler.getImageToCache(cacheObject); + } catch (Throwable e) { + return e; + } + return null; + }).get(); + + if(throwable == null) { + task.taskState.set(TaskState.COMPLETE); + } else { + task.taskState.set(TaskState.ERROR); + } + } catch (ExecutionException e) { + log.error("执行图片缓存任务时发生异常", e); + } + } finally { + if(locked) { + task.lock.unlock(); + } + } + + } + + private static Task getTaskState(ImageCacheObject cacheObject) { + if(!cacheMap.containsKey(cacheObject)) { + cacheMap.put(cacheObject, new Task()); + } + return cacheMap.get(cacheObject); + } + + /** + * 任务状态 + */ + private enum TaskState { + READY, RUNNING, COMPLETE, ERROR + } + + private static class Task { + + public final ReentrantLock lock = new ReentrantLock(true); + + public final AtomicReference taskState = new AtomicReference<>(TaskState.READY); + + public final Condition condition = lock.newCondition(); + + } + +}