[Fix] Issue #1 修复了ImageCache对重复缓存请求的策略(忽略并返回)导致的后续处理发生NPE的问题;

[Update] ImageCache 更新了ImageCache, 将该部分单独分离到 ImageCacheStore;
[Update] BotCommandProcess 适配新的ImageCache, 调整内容检查的时机以减少不必要的API访问;
This commit is contained in:
LamGC 2020-05-09 18:16:16 +08:00
parent 4beb4d78fb
commit 40057c3683
3 changed files with 133 additions and 49 deletions

View File

@ -2,7 +2,6 @@ package net.lamgc.cgj.bot;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.*; import com.google.gson.*;
import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderNames;
import net.lamgc.cgj.Main; import net.lamgc.cgj.Main;
@ -15,7 +14,6 @@ import net.lamgc.cgj.pixiv.PixivURL;
import net.lamgc.cgj.util.URLs; import net.lamgc.cgj.util.URLs;
import net.lamgc.utils.base.runner.Argument; import net.lamgc.utils.base.runner.Argument;
import net.lamgc.utils.base.runner.Command; import net.lamgc.utils.base.runner.Command;
import net.lamgc.utils.event.EventExecutor;
import net.lz1998.cq.utils.CQCode; import net.lz1998.cq.utils.CQCode;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
@ -29,9 +27,6 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "SameParameterValue"}) @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "SameParameterValue"})
@ -56,20 +51,6 @@ public class BotCommandProcess {
private final static CacheStore<List<String>> pagesCache = new StringListRedisCacheStore(BotEventHandler.redisServer, "imagePages"); private final static CacheStore<List<String>> pagesCache = new StringListRedisCacheStore(BotEventHandler.redisServer, "imagePages");
public final static CacheStore<JsonElement> reportStore = new JsonRedisCacheStore(BotEventHandler.redisServer, "report", gson); public final static CacheStore<JsonElement> reportStore = new JsonRedisCacheStore(BotEventHandler.redisServer, "report", gson);
/**
* 图片异步缓存执行器
*/
private final static EventExecutor imageCacheExecutor = new EventExecutor(new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() >= 2 ? 2 : 1,
(int) Math.ceil(Runtime.getRuntime().availableProcessors() / 2F),
5L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(128),
new ThreadFactoryBuilder()
.setNameFormat("imageCacheThread-%d")
.build(),
new ThreadPoolExecutor.DiscardOldestPolicy()
));
private final static RankingUpdateTimer updateTimer = new RankingUpdateTimer(); private final static RankingUpdateTimer updateTimer = new RankingUpdateTimer();
public static void initialize() { public static void initialize() {
@ -77,12 +58,6 @@ public class BotCommandProcess {
SettingProperties.loadProperties(); SettingProperties.loadProperties();
try {
imageCacheExecutor.addHandler(new ImageCacheHandler());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
updateTimer.schedule(null); updateTimer.schedule(null);
log.info("初始化完成."); log.info("初始化完成.");
} }
@ -544,6 +519,20 @@ public class BotCommandProcess {
@Argument(name = "quality", force = false) PixivDownload.PageQuality quality, @Argument(name = "quality", force = false) PixivDownload.PageQuality quality,
@Argument(name = "page", force = false, defaultValue = "1") int pageIndex) { @Argument(name = "page", force = false, defaultValue = "1") int pageIndex) {
log.debug("IllustId: {}, Quality: {}, PageIndex: {}", illustId, quality.name(), pageIndex); log.debug("IllustId: {}, Quality: {}, PageIndex: {}", illustId, quality.name(), pageIndex);
try {
if (isNoSafe(illustId, SettingProperties.getProperties(fromGroup), false)) {
log.warn("作品 {} 存在R-18内容且设置\"image.allowR18\"为false将屏蔽该作品不发送.", illustId);
return "(根据设置,该作品已被屏蔽!)";
} else if(isReported(illustId)) {
log.warn("作品Id {} 被报告, 正在等待审核, 跳过该作品.", illustId);
return "(该作品已被封印)";
}
} catch (IOException e) {
log.warn("作品信息无法获取!", e);
return "发生网络异常,无法获取图片!";
}
List<String> pagesList; List<String> pagesList;
try { try {
pagesList = getIllustPages(illustId, quality, false); pagesList = getIllustPages(illustId, quality, false);
@ -564,19 +553,6 @@ public class BotCommandProcess {
return "指定的页数超出了范围(总共 " + pagesList.size() + " 页)"; return "指定的页数超出了范围(总共 " + pagesList.size() + " 页)";
} }
try {
if (isNoSafe(illustId, SettingProperties.getProperties(fromGroup), false)) {
log.warn("作品 {} 存在R-18内容且设置\"image.allowR18\"为false将屏蔽该作品不发送.", illustId);
return "(根据设置,该作品已被屏蔽!)";
} else if(isReported(illustId)) {
log.warn("作品Id {} 被报告, 正在等待审核, 跳过该作品.", illustId);
return "(该作品已被封印)";
}
} catch (IOException e) {
log.warn("作品信息无法获取!", e);
return "发生网络异常,无法获取图片!";
}
String downloadLink = pagesList.get(pageIndex - 1); String downloadLink = pagesList.get(pageIndex - 1);
String fileName = URLs.getResourceName(Strings.nullToEmpty(downloadLink)); String fileName = URLs.getResourceName(Strings.nullToEmpty(downloadLink));
File imageFile = new File(getImageStoreDir(), downloadLink.substring(downloadLink.lastIndexOf("/") + 1)); File imageFile = new File(getImageStoreDir(), downloadLink.substring(downloadLink.lastIndexOf("/") + 1));
@ -601,12 +577,11 @@ public class BotCommandProcess {
} }
} }
ImageCacheObject taskObject = new ImageCacheObject(imageCache, illustId, downloadLink, imageFile);
try { try {
imageCacheExecutor.executorSync(taskObject); ImageCacheStore.executeCacheRequest(new ImageCacheObject(imageCache, illustId, downloadLink, imageFile));
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("等待图片下载时发生中断", e); log.warn("图片缓存被中断", e);
return "图片获取失败!"; return "(错误:图片获取超时)";
} }
} else { } else {
log.debug("图片 {} 缓存命中.", fileName); log.debug("图片 {} 缓存命中.", fileName);

View File

@ -28,9 +28,9 @@ public class ImageCacheHandler implements EventHandler {
private final static Set<ImageCacheObject> cacheQueue = Collections.synchronizedSet(new HashSet<>()); private final static Set<ImageCacheObject> cacheQueue = Collections.synchronizedSet(new HashSet<>());
@SuppressWarnings("unused") @SuppressWarnings("unused")
public void getImageToCache(ImageCacheObject event) { public void getImageToCache(ImageCacheObject event) throws Exception {
if(cacheQueue.contains(event)) { if(cacheQueue.contains(event)) {
log.debug("图片 {} 已存在相同缓存任务, 跳过.", event.getStoreFile().getName()); log.warn("图片 {} 已存在相同缓存任务, 跳过.", event.getStoreFile().getName());
return; return;
} else { } else {
cacheQueue.add(event); cacheQueue.add(event);
@ -43,11 +43,11 @@ public class ImageCacheHandler implements EventHandler {
try { try {
if(!storeFile.exists() && !storeFile.createNewFile()) { if(!storeFile.exists() && !storeFile.createNewFile()) {
log.error("无法创建文件(Path: {})", storeFile.getAbsolutePath()); log.error("无法创建文件(Path: {})", storeFile.getAbsolutePath());
return; throw new IOException("Failed to create file");
} }
} catch (IOException e) { } catch (IOException e) {
log.error("无法创建文件(Path: {})", storeFile.getAbsolutePath()); log.error("无法创建文件(Path: {})", storeFile.getAbsolutePath());
e.printStackTrace(); throw e;
} }
HttpGet request = new HttpGet(event.getDownloadLink()); HttpGet request = new HttpGet(event.getDownloadLink());
@ -57,11 +57,11 @@ public class ImageCacheHandler implements EventHandler {
response = httpClient.execute(request); response = httpClient.execute(request);
} catch (IOException e) { } catch (IOException e) {
log.error("Http请求时发生异常", e); log.error("Http请求时发生异常", e);
return; throw e;
} }
if(response.getStatusLine().getStatusCode() != 200) { if(response.getStatusLine().getStatusCode() != 200) {
log.warn("Http请求异常{}", response.getStatusLine()); log.warn("Http请求异常{}", response.getStatusLine());
return; throw new IOException("Http Response Error: " + response.getStatusLine());
} }
log.debug("正在下载...(Content-Length: {}KB)", response.getEntity().getContentLength() / 1024); log.debug("正在下载...(Content-Length: {}KB)", response.getEntity().getContentLength() / 1024);
@ -69,7 +69,7 @@ public class ImageCacheHandler implements EventHandler {
IOUtils.copy(response.getEntity().getContent(), fos); IOUtils.copy(response.getEntity().getContent(), fos);
} catch (IOException e) { } catch (IOException e) {
log.error("下载图片时发生异常", e); log.error("下载图片时发生异常", e);
return; throw e;
} }
event.getImageCache().put(URLs.getResourceName(event.getDownloadLink()), storeFile); event.getImageCache().put(URLs.getResourceName(event.getDownloadLink()), storeFile);
} finally { } finally {

View File

@ -0,0 +1,109 @@
package net.lamgc.cgj.bot.cache;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public final class ImageCacheStore {
private final static Logger log = LoggerFactory.getLogger(ImageCacheStore.class.getName());
private final static Map<ImageCacheObject, Task> cacheMap = new Hashtable<>();
private final static ThreadPoolExecutor imageCacheExecutor = new ThreadPoolExecutor(
4, 6,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder()
.setNameFormat("ImageCacheThread-%d")
.build()
);
private final static ImageCacheHandler handler = new ImageCacheHandler();
private ImageCacheStore() {}
/**
* 传递图片缓存任务, 并等待缓存完成.
* @param cacheObject 缓存任务组
*/
public static void executeCacheRequest(ImageCacheObject cacheObject) throws InterruptedException {
Task task = getTaskState(cacheObject);
if(task.taskState.get() == TaskState.COMPLETE) {
return;
}
boolean locked = false;
try {
if(task.taskState.get() == TaskState.COMPLETE) {
return;
}
task.lock.lock();
locked = true;
// 双重检查
if(task.taskState.get() == TaskState.COMPLETE) {
return;
}
// 置任务状态
task.taskState.set(TaskState.RUNNING);
try {
Throwable throwable = imageCacheExecutor.submit(() -> {
try {
handler.getImageToCache(cacheObject);
} catch (Throwable e) {
return e;
}
return null;
}).get();
if(throwable == null) {
task.taskState.set(TaskState.COMPLETE);
} else {
task.taskState.set(TaskState.ERROR);
}
} catch (ExecutionException e) {
log.error("执行图片缓存任务时发生异常", e);
}
} finally {
if(locked) {
task.lock.unlock();
}
}
}
private static Task getTaskState(ImageCacheObject cacheObject) {
if(!cacheMap.containsKey(cacheObject)) {
cacheMap.put(cacheObject, new Task());
}
return cacheMap.get(cacheObject);
}
/**
* 任务状态
*/
private enum TaskState {
READY, RUNNING, COMPLETE, ERROR
}
private static class Task {
public final ReentrantLock lock = new ReentrantLock(true);
public final AtomicReference<TaskState> taskState = new AtomicReference<>(TaskState.READY);
public final Condition condition = lock.newCondition();
}
}