diff --git a/src/main/kotlin/OneDriveTransferCenter.kt b/src/main/kotlin/OneDriveTransferCenter.kt index 37014b5..905cc5c 100644 --- a/src/main/kotlin/OneDriveTransferCenter.kt +++ b/src/main/kotlin/OneDriveTransferCenter.kt @@ -43,6 +43,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { OneDrive 中转任务已创建 正在排队中... 文件名: ${task.document.fileName} + ------------------------------------------------- + #Queuing """.trimIndent() ) .chatId(task.extra["chatId"].toString().toLong()) @@ -52,7 +54,7 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { task.extra["infoMessageId"] = message.messageId } - override fun onTransferStart(progress: OneDriveTransferWorkerProgress) { + override fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) { val newMessage = EditMessageText.builder() .chatId(progress.currentTask.extra["chatId"].toString().toLong()) .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) @@ -61,6 +63,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { OneDrive 中转任务开始执行 正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件) 文件名: ${progress.currentTask.document.fileName} + ------------------------------------------------- + #Starting """.trimIndent() ) .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) @@ -69,7 +73,51 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { } } - override fun onProgress(progress: OneDriveTransferWorkerProgress) { + override fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) { + val newMessage = EditMessageText.builder() + .chatId(progress.currentTask.extra["chatId"].toString().toLong()) + .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) + .text( + """ + 正在获取文件信息... + 文件名: ${progress.currentTask.document.fileName} + 重试次数:$retryCount / $maxRetryCount + ------------------------------------------------- + 重试并不等同于获取文件失败,由于 Telegram Bot API 需要下载文件后 + 才会获取文件信息,因此需要重试以等待文件下载完成。 + + #GettingFileInfo + """.trimIndent() + ) + .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) + if (newMessage != null) { + progress.currentTask.extra["infoMessageId"] = newMessage.messageId + } + } + + override fun onGotFileInfo( + progress: OneDriveTransferWorkerProgress, + file: org.telegram.telegrambots.meta.api.objects.File + ) { + val newMessage = EditMessageText.builder() + .chatId(progress.currentTask.extra["chatId"].toString().toLong()) + .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) + .text( + """ + 已获取文件信息,正在创建 OneDrive 上传会话... + 文件名: ${progress.currentTask.document.fileName} + 文件大小:${file.fileSize / 1024} KB + ------------------------------------------------- + #UploadStarting + """.trimIndent() + ) + .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) + if (newMessage != null) { + progress.currentTask.extra["infoMessageId"] = newMessage.messageId + } + } + + override fun onUploadProgress(progress: OneDriveTransferWorkerProgress) { val newMessage = EditMessageText.builder() .chatId(progress.currentTask.extra["chatId"].toString().toLong()) .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) @@ -78,6 +126,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { OneDrive 中转任务执行中 文件名: ${progress.currentTask.document.fileName} 进度:${String.format("%.3f", progress.progress.get() * 100)}% + ------------------------------------------------- + #Processing """.trimIndent() ) .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) @@ -95,6 +145,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { OneDrive 中转任务执行失败 文件名: ${task.document.fileName} 错误信息:${progress.exception?.message} + ------------------------------------------------- + #Failure #Error """.trimIndent() ) .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) @@ -112,6 +164,8 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { OneDrive 中转任务执行成功 文件名: ${task.document.fileName} OneDrive 文件路径:${progress.driveItem?.webUrl} + ------------------------------------------------ + #Success """.trimIndent() ) .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) @@ -124,16 +178,26 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { interface OneDriveTransferCallback { fun onTransferTaskCreated(task: OneDriveTransferTask) - fun onTransferStart(progress: OneDriveTransferWorkerProgress) - fun onProgress(progress: OneDriveTransferWorkerProgress) + fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) + fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) + fun onGotFileInfo(progress: OneDriveTransferWorkerProgress, file: org.telegram.telegrambots.meta.api.objects.File) + fun onUploadProgress(progress: OneDriveTransferWorkerProgress) fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) } +/** + * OneDrive 中转任务执行器. + * @param threadNum 线程数量. + * @param callback OneDrive 中转任务回调. + * @param taskQueue 任务队列. + * @param chunkSize 上传块大小的倍率, 实际上传块大小为 320 KiB * chunkSize. + */ class OneDriveTransferTaskExecutor( threadNum: Int, private val callback: OneDriveTransferCallback, - private val taskQueue: BlockingQueue + private val taskQueue: BlockingQueue, + private val chunkSize: Int = MAX_CHUNK_SIZE ) : ThreadPoolExecutor( threadNum, threadNum, 0, TimeUnit.SECONDS, ArrayBlockingQueue(50), @@ -147,14 +211,18 @@ class OneDriveTransferTaskExecutor( val threadStatusMap = ConcurrentHashMap() init { + if (chunkSize > MAX_CHUNK_SIZE) { + throw IllegalArgumentException("chunkSize 不能大于 $MAX_CHUNK_SIZE") + } + for (i in 0 until threadNum) { execute(createWorker(i)) } } fun submitTransferTask(task: OneDriveTransferTask) { - taskQueue.offer(task) callback.onTransferTaskCreated(task) + taskQueue.offer(task) } private fun createWorker(id: Int): Runnable = Runnable { @@ -165,7 +233,7 @@ class OneDriveTransferTaskExecutor( val progress = OneDriveTransferWorkerProgress(task) threadStatusMap[id] = progress try { - callback.onTransferStart(progress) + callback.onTransferTaskStart(progress) } catch (e: Exception) { logger.warn(e) { "OneDrive 中转任务开始回调失败: ${e.message}" } } @@ -196,6 +264,12 @@ class OneDriveTransferTaskExecutor( var retryCount = 0 val maxRetryCount = 100 while (true) { + try { + callback.onGettingFileInfo(progress, retryCount, maxRetryCount) + } catch (e: Exception) { + logger.warn(e) { "OneDrive 中转任务获取文件信息回调失败: ${e.message}" } + } + try { tempFile = task.bot.execute( GetFile.builder() @@ -216,6 +290,11 @@ class OneDriveTransferTaskExecutor( } val file = tempFile logger.debug { "成功获取文件信息:$tempFile" } + try { + callback.onGotFileInfo(progress, file) + } catch (e: Exception) { + logger.warn(e) { "OneDrive 中转任务成功获取文件信息回调失败: ${e.message}" } + } val graphClient = task.service.createGraphClient(task.tgUserId) val drive = graphClient.drives(task.onedriveId).buildRequest().get() @@ -246,7 +325,7 @@ class OneDriveTransferTaskExecutor( val progressCallback = IProgressCallback { current, max -> progress.progress.set(current.toDouble() / max.toDouble()) try { - callback.onProgress(progress) + callback.onUploadProgress(progress) } catch (e: Exception) { logger.warn(e) { "OneDrive 中转任务进度回调失败: ${e.message}" } } @@ -261,7 +340,7 @@ class OneDriveTransferTaskExecutor( file.fileSize, DriveItem::class.java ) - val uploadResult = largeFileUploadTask.upload(327680 * 26, null, progressCallback) + val uploadResult = largeFileUploadTask.upload(ONCE_CHUNK_SIZE * chunkSize, null, progressCallback) progress.driveItem = uploadResult.responseBody } } @@ -394,6 +473,11 @@ class OneDriveTransferTaskExecutor( } } + companion object { + val ONCE_CHUNK_SIZE = 320 * 1024 + val MAX_CHUNK_SIZE = 192 + } + } data class OneDriveTransferWorkerProgress(