refactor: 划分了更多的回调函数.

This commit is contained in:
LamGC 2024-01-11 22:10:58 +08:00
parent 431c8a34d8
commit fefd537981
Signed by: LamGC
GPG Key ID: 6C5AE2A913941E1D

View File

@ -43,6 +43,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
OneDrive 中转任务已创建
正在排队中...
文件名 ${task.document.fileName}
-------------------------------------------------
#Queuing
""".trimIndent()
)
.chatId(task.extra["chatId"].toString().toLong())
@ -52,7 +54,7 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
task.extra["infoMessageId"] = message.messageId
}
override fun onTransferStart(progress: OneDriveTransferWorkerProgress) {
override fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
@ -61,6 +63,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
OneDrive 中转任务开始执行
正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件)
文件名 ${progress.currentTask.document.fileName}
-------------------------------------------------
#Starting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
@ -69,7 +73,51 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
}
}
override fun onProgress(progress: OneDriveTransferWorkerProgress) {
override fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
正在获取文件信息...
文件名 ${progress.currentTask.document.fileName}
重试次数$retryCount / $maxRetryCount
-------------------------------------------------
重试并不等同于获取文件失败由于 Telegram Bot API 需要下载文件后
才会获取文件信息因此需要重试以等待文件下载完成
#GettingFileInfo
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGotFileInfo(
progress: OneDriveTransferWorkerProgress,
file: org.telegram.telegrambots.meta.api.objects.File
) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
已获取文件信息正在创建 OneDrive 上传会话...
文件名 ${progress.currentTask.document.fileName}
文件大小${file.fileSize / 1024} KB
-------------------------------------------------
#UploadStarting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onUploadProgress(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
@ -78,6 +126,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
OneDrive 中转任务执行中
文件名 ${progress.currentTask.document.fileName}
进度${String.format("%.3f", progress.progress.get() * 100)}%
-------------------------------------------------
#Processing
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
@ -95,6 +145,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
OneDrive 中转任务执行失败
文件名 ${task.document.fileName}
错误信息${progress.exception?.message}
-------------------------------------------------
#Failure #Error
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
@ -112,6 +164,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
OneDrive 中转任务执行成功
文件名 ${task.document.fileName}
OneDrive 文件路径${progress.driveItem?.webUrl}
------------------------------------------------
#Success
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
@ -124,16 +178,26 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
interface OneDriveTransferCallback {
fun onTransferTaskCreated(task: OneDriveTransferTask)
fun onTransferStart(progress: OneDriveTransferWorkerProgress)
fun onProgress(progress: OneDriveTransferWorkerProgress)
fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress)
fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int)
fun onGotFileInfo(progress: OneDriveTransferWorkerProgress, file: org.telegram.telegrambots.meta.api.objects.File)
fun onUploadProgress(progress: OneDriveTransferWorkerProgress)
fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
}
/**
* OneDrive 中转任务执行器.
* @param threadNum 线程数量.
* @param callback OneDrive 中转任务回调.
* @param taskQueue 任务队列.
* @param chunkSize 上传块大小的倍率, 实际上传块大小为 320 KiB * chunkSize.
*/
class OneDriveTransferTaskExecutor(
threadNum: Int,
private val callback: OneDriveTransferCallback,
private val taskQueue: BlockingQueue<OneDriveTransferTask>
private val taskQueue: BlockingQueue<OneDriveTransferTask>,
private val chunkSize: Int = MAX_CHUNK_SIZE
) : ThreadPoolExecutor(
threadNum, threadNum, 0, TimeUnit.SECONDS,
ArrayBlockingQueue(50),
@ -147,14 +211,18 @@ class OneDriveTransferTaskExecutor(
val threadStatusMap = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
init {
if (chunkSize > MAX_CHUNK_SIZE) {
throw IllegalArgumentException("chunkSize 不能大于 $MAX_CHUNK_SIZE")
}
for (i in 0 until threadNum) {
execute(createWorker(i))
}
}
fun submitTransferTask(task: OneDriveTransferTask) {
taskQueue.offer(task)
callback.onTransferTaskCreated(task)
taskQueue.offer(task)
}
private fun createWorker(id: Int): Runnable = Runnable {
@ -165,7 +233,7 @@ class OneDriveTransferTaskExecutor(
val progress = OneDriveTransferWorkerProgress(task)
threadStatusMap[id] = progress
try {
callback.onTransferStart(progress)
callback.onTransferTaskStart(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务开始回调失败: ${e.message}" }
}
@ -196,6 +264,12 @@ class OneDriveTransferTaskExecutor(
var retryCount = 0
val maxRetryCount = 100
while (true) {
try {
callback.onGettingFileInfo(progress, retryCount, maxRetryCount)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务获取文件信息回调失败: ${e.message}" }
}
try {
tempFile = task.bot.execute(
GetFile.builder()
@ -216,6 +290,11 @@ class OneDriveTransferTaskExecutor(
}
val file = tempFile
logger.debug { "成功获取文件信息:$tempFile" }
try {
callback.onGotFileInfo(progress, file)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务成功获取文件信息回调失败: ${e.message}" }
}
val graphClient = task.service.createGraphClient(task.tgUserId)
val drive = graphClient.drives(task.onedriveId).buildRequest().get()
@ -246,7 +325,7 @@ class OneDriveTransferTaskExecutor(
val progressCallback = IProgressCallback { current, max ->
progress.progress.set(current.toDouble() / max.toDouble())
try {
callback.onProgress(progress)
callback.onUploadProgress(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务进度回调失败: ${e.message}" }
}
@ -261,7 +340,7 @@ class OneDriveTransferTaskExecutor(
file.fileSize,
DriveItem::class.java
)
val uploadResult = largeFileUploadTask.upload(327680 * 26, null, progressCallback)
val uploadResult = largeFileUploadTask.upload(ONCE_CHUNK_SIZE * chunkSize, null, progressCallback)
progress.driveItem = uploadResult.responseBody
}
}
@ -394,6 +473,11 @@ class OneDriveTransferTaskExecutor(
}
}
companion object {
val ONCE_CHUNK_SIZE = 320 * 1024
val MAX_CHUNK_SIZE = 192
}
}
data class OneDriveTransferWorkerProgress(