From 44299979891725d8d0cf9fe29f8c0abcf27100f5 Mon Sep 17 00:00:00 2001 From: LamGC Date: Fri, 12 Jan 2024 23:56:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E5=91=98=E7=8A=B6=E6=80=81=E6=8C=87=E4=BB=A4.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/kotlin/OneDriveTransferCenter.kt | 39 +++++++++++++++++--- src/main/kotlin/OneDriveTransferExtension.kt | 33 +++++++++++++++++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/OneDriveTransferCenter.kt b/src/main/kotlin/OneDriveTransferCenter.kt index 9037509..2689a89 100644 --- a/src/main/kotlin/OneDriveTransferCenter.kt +++ b/src/main/kotlin/OneDriveTransferCenter.kt @@ -25,13 +25,20 @@ import java.util.concurrent.* object OneDriveTransferCenter { - private val executor = + val executor = OneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, ArrayBlockingQueue(100)) fun submitUploadTask(task: OneDriveTransferTask) { executor.submitTransferTask(task) } + fun getQueueingTaskCount(): Int = executor.taskQueue.size + + fun getProcessingTasks(): Map = + Collections.unmodifiableMap(executor.threadStatusMap) + + fun getProcessThreadNum(): Int = executor.threadNum + } object DefaultOneDriveTransferCallback : OneDriveTransferCallback { @@ -194,10 +201,10 @@ interface OneDriveTransferCallback { * @param chunkSize 上传块大小的倍率, 实际上传块大小为 320 KiB * chunkSize. */ class OneDriveTransferTaskExecutor( - threadNum: Int, + val threadNum: Int, private val callback: OneDriveTransferCallback, - private val taskQueue: BlockingQueue, - private val chunkSize: Int = 96 + val taskQueue: BlockingQueue, + private val chunkSize: Int = 26 ) : ThreadPoolExecutor( threadNum, threadNum, 0, TimeUnit.SECONDS, ArrayBlockingQueue(50), @@ -248,6 +255,7 @@ class OneDriveTransferTaskExecutor( } } catch (e: Exception) { logger.warn(e) { "OneDrive 中转任务执行失败: ${e.message}" } + progress.status = OneDriveTransferStatus.FAILURE callback.onTransferFailure(task, progress.apply { this.exception = e }) @@ -261,6 +269,7 @@ class OneDriveTransferTaskExecutor( private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) { var tempFile: org.telegram.telegrambots.meta.api.objects.File logger.debug { "开始获取文件信息..." } + progress.status = OneDriveTransferStatus.GETTING_FILE_INFO var retryCount = 0 val maxRetryCount = 100 while (true) { @@ -296,6 +305,8 @@ class OneDriveTransferTaskExecutor( logger.warn(e) { "OneDrive 中转任务成功获取文件信息回调失败: ${e.message}" } } + progress.status = OneDriveTransferStatus.CREATING_UPLOAD_SESSION + val graphClient = task.service.createGraphClient(task.tgUserId) val drive = graphClient.drives(task.onedriveId).buildRequest().get() ?: throw IllegalStateException("无法获取 OneDrive 驱动器.") @@ -308,11 +319,13 @@ class OneDriveTransferTaskExecutor( if (file.fileSize < 4 * 1024 * 1024) { val fileBytes = getFileStream(task.bot, file.filePath).readAllBytes() + progress.status = OneDriveTransferStatus.UPLOADING val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content() .buildRequest() .put(fileBytes) progress.driveItem = driveItem progress.progress.set(1.0) + progress.status = OneDriveTransferStatus.SUCCESS } else { val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath) .createUploadSession( @@ -340,8 +353,11 @@ class OneDriveTransferTaskExecutor( file.fileSize, DriveItem::class.java ) + progress.status = OneDriveTransferStatus.UPLOADING val uploadResult = largeFileUploadTask.upload(ONCE_CHUNK_SIZE * chunkSize, null, progressCallback) progress.driveItem = uploadResult.responseBody + progress.progress.set(1.0) + progress.status = OneDriveTransferStatus.SUCCESS } } @@ -480,11 +496,21 @@ class OneDriveTransferTaskExecutor( } +enum class OneDriveTransferStatus { + QUEUING, + GETTING_FILE_INFO, + CREATING_UPLOAD_SESSION, + UPLOADING, + SUCCESS, + FAILURE +} + data class OneDriveTransferWorkerProgress( val currentTask: OneDriveTransferTask, val progress: AtomicDouble = AtomicDouble(), var driveItem: DriveItem? = null, - var exception: Exception? = null + var exception: Exception? = null, + var status: OneDriveTransferStatus = OneDriveTransferStatus.QUEUING ) data class OneDriveTransferTask( @@ -495,5 +521,6 @@ data class OneDriveTransferTask( val onedriveId: String, val storagePath: String, val extra: MutableMap = mutableMapOf(), - val createdAt: Date = Date() + val createdAt: Date = Date(), + var retryCount: Int = 0 ) diff --git a/src/main/kotlin/OneDriveTransferExtension.kt b/src/main/kotlin/OneDriveTransferExtension.kt index 99e5b7b..ffe1b22 100644 --- a/src/main/kotlin/OneDriveTransferExtension.kt +++ b/src/main/kotlin/OneDriveTransferExtension.kt @@ -369,6 +369,39 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : } ) + fun adminStatus(): Ability = Ability.builder() + .named("status") + .info("查看当前中转状态.(仅限管理员)") + .privacy(Privacy.CREATOR) + .locality(Locality.USER) + .action { + val processingTasks = OneDriveTransferCenter.getProcessingTasks() + val msg = StringBuilder( + """ + 当前排队任务数:${OneDriveTransferCenter.getQueueingTaskCount()} + 当前处理任务数:${processingTasks.size} / ${OneDriveTransferCenter.executor.threadNum} + -------------------------------------------------------------- + """.trimIndent() + ).append('\n') + for (workerId in 0..