diff --git a/src/main/kotlin/OneDriveTransferCenter.kt b/src/main/kotlin/OneDriveTransferCenter.kt index 2689a89..f9e7d03 100644 --- a/src/main/kotlin/OneDriveTransferCenter.kt +++ b/src/main/kotlin/OneDriveTransferCenter.kt @@ -9,11 +9,14 @@ import com.microsoft.graph.models.DriveItemUploadableProperties import com.microsoft.graph.requests.GraphServiceClient import com.microsoft.graph.tasks.IProgressCallback import com.microsoft.graph.tasks.LargeFileUploadTask +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import mu.KotlinLogging import okhttp3.Request import org.telegram.abilitybots.api.bot.BaseAbilityBot import org.telegram.telegrambots.meta.api.methods.GetFile import org.telegram.telegrambots.meta.api.methods.send.SendMessage +import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText import org.telegram.telegrambots.meta.api.objects.Document import org.telegram.telegrambots.meta.exceptions.TelegramApiException @@ -32,6 +35,8 @@ object OneDriveTransferCenter { executor.submitTransferTask(task) } + fun cancelUploadTask(task: OneDriveTransferTask): Boolean = executor.cancelTransferTask(task) + fun getQueueingTaskCount(): Int = executor.taskQueue.size fun getProcessingTasks(): Map = @@ -42,18 +47,46 @@ object OneDriveTransferCenter { } object DefaultOneDriveTransferCallback : OneDriveTransferCallback { + + private val logger = KotlinLogging.logger { } + override fun onTransferTaskCreated(task: OneDriveTransferTask) { + if (task.extra["infoMessageId"] != null) { + val deleteMessage = DeleteMessage.builder() + .chatId(task.extra["chatId"].toString().toLong()) + .messageId(task.extra["infoMessageId"].toString().toInt()) + .build() + + try { + task.bot.execute(deleteMessage) + } catch (e: Exception) { + logger.debug(e) { "删除旧状态消息时出错, 忽略该异常." } + } + } + + val msg = if (task.retryCount == 0) { + """ + OneDrive 中转任务已创建 + 正在排队中... + 文件名: ${task.document.fileName} + ------------------------------------------------- + #Queuing + """.trimIndent() + } else { + """ + OneDrive 中转任务已创建 + 正在排队中... + 文件名: ${task.document.fileName} + 上次错误信息:${(task.extra["lastError"] as Exception?)?.message ?: "(没有错误信息)"} + 重试次数:${task.retryCount} + ------------------------------------------------- + #Queuing + """.trimIndent() + } + val message = task.bot.execute( SendMessage.builder() - .text( - """ - OneDrive 中转任务已创建 - 正在排队中... - 文件名: ${task.document.fileName} - ------------------------------------------------- - #Queuing - """.trimIndent() - ) + .text(msg) .chatId(task.extra["chatId"].toString().toLong()) .replyToMessageId(task.extra["messageId"].toString().toInt()) .build() @@ -152,6 +185,9 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { OneDrive 中转任务执行失败 文件名: ${task.document.fileName} 错误信息:${progress.exception?.message} + 重试次数:${task.retryCount} + + 任务将会追加至队列尾部进行重试。 ------------------------------------------------- #Failure #Error """.trimIndent() @@ -159,6 +195,14 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) if (newMessage != null) { progress.currentTask.extra["infoMessageId"] = newMessage.messageId + progress.currentTask.extra["lastError"] = progress.exception + } + if (task.retryCount < 5) { + runBlocking { + delay(10000) + task.retryCount++ + OneDriveTransferCenter.submitUploadTask(task) + } } } @@ -181,6 +225,24 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { } } + override fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) { + val newMessage = EditMessageText.builder() + .chatId(task.extra["chatId"].toString().toLong()) + .messageId(task.extra["infoMessageId"].toString().toInt()) + .text( + """ + OneDrive 中转任务已取消 + 文件名: ${task.document.fileName} + ------------------------------------------------- + #Cancelled + """.trimIndent() + ) + .build().orSendMessage(task.bot, task.extra["messageId"].toString().toInt()) + if (newMessage != null) { + task.extra["infoMessageId"] = newMessage.messageId + } + } + } interface OneDriveTransferCallback { @@ -191,6 +253,7 @@ interface OneDriveTransferCallback { fun onUploadProgress(progress: OneDriveTransferWorkerProgress) fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) + fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) } /** @@ -209,13 +272,14 @@ class OneDriveTransferTaskExecutor( threadNum, threadNum, 0, TimeUnit.SECONDS, ArrayBlockingQueue(50), ThreadFactoryBuilder() - .setNameFormat("onedrive-transfer-worker-%d") + .setNameFormat("Transfer Worker %d") .build() ) { private val logger = KotlinLogging.logger { } val threadStatusMap = ConcurrentHashMap() + val threadMap = ConcurrentHashMap() init { if (chunkSize > MAX_CHUNK_SIZE) { @@ -232,9 +296,28 @@ class OneDriveTransferTaskExecutor( taskQueue.offer(task) } + fun cancelTransferTask(task: OneDriveTransferTask): Boolean { + if (!taskQueue.remove(task)) { + for (i in 0 until threadNum) { + if (threadStatusMap[i]?.currentTask?.id == task.id) { + threadMap[i]?.interrupt() + return true + } + } + return false + } + try { + callback.onTransferCancelled(task, null) + } catch (e: Exception) { + logger.warn(e) { "OneDrive 中转任务取消回调失败: ${e.message}" } + } + return true + } + private fun createWorker(id: Int): Runnable = Runnable { + threadMap[id] = Thread.currentThread() logger.info { "下载线程 $id 已启动." } - while (!Thread.interrupted()) { + while (!isTerminating) { val task = taskQueue.take() logger.info { "线程 $id 开始执行任务: ${task.document.fileName}" } val progress = OneDriveTransferWorkerProgress(task) @@ -254,6 +337,15 @@ class OneDriveTransferTaskExecutor( logger.warn(e) { "OneDrive 中转任务成功回调失败: ${e.message}" } } } catch (e: Exception) { + if (e is InterruptedException) { + logger.info { "线程 $id 任务被取消: ${task.document.fileName}" } + try { + callback.onTransferCancelled(task, progress) + } catch (e: Exception) { + logger.warn(e) { "OneDrive 中转任务取消回调失败: ${e.message}" } + } + continue + } logger.warn(e) { "OneDrive 中转任务执行失败: ${e.message}" } progress.status = OneDriveTransferStatus.FAILURE callback.onTransferFailure(task, progress.apply { @@ -264,6 +356,7 @@ class OneDriveTransferTaskExecutor( threadStatusMap.remove(id) } } + logger.info { "下载线程 $id 已停止." } } private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) { @@ -520,7 +613,8 @@ data class OneDriveTransferTask( val document: Document, val onedriveId: String, val storagePath: String, - val extra: MutableMap = mutableMapOf(), + val extra: MutableMap = mutableMapOf(), val createdAt: Date = Date(), - var retryCount: Int = 0 + var retryCount: Int = 0, + val id: UUID = UUID.randomUUID() ) diff --git a/src/main/kotlin/OneDriveTransferExtension.kt b/src/main/kotlin/OneDriveTransferExtension.kt index ffe1b22..f992d1b 100644 --- a/src/main/kotlin/OneDriveTransferExtension.kt +++ b/src/main/kotlin/OneDriveTransferExtension.kt @@ -392,7 +392,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : "Worker $workerId: [${task.status}] ${ String.format( "%.3f", - task.progress.get() + task.progress.get() * 100.0 ) }% ${task.currentTask.document.fileSize} Bytes\n" ) @@ -402,6 +402,33 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : } .build() + fun cancelTask(): Ability = Ability.builder() + .named("cancel_task") + .info("取消指定的任务(回复对应的文件或状态消息)") + .privacy(Privacy.PUBLIC) + .locality(Locality.USER) + .action { + if (!it.update().message.isReply) { + it.bot().silent().send("请回复要取消的消息.", it.chatId()) + return@action + } + + val replyMessage = it.update().message.replyToMessage + val result = onedriveService.cancelTask(it.chatId(), replyMessage.messageId) + val msg = if (result) { + "已取消任务." + } else { + "任务已完成,无法取消." + } + val sendMessage = SendMessage.builder() + .chatId(it.chatId().toString()) + .text(msg) + .replyToMessageId(it.update().message.messageId) + .build() + it.bot().execute(sendMessage) + } + .build() + private fun AbilityBuilder.named(name: String): AbilityBuilder { return if (config.useCommandPrefix) { name("odt_$name") diff --git a/src/main/kotlin/Services.kt b/src/main/kotlin/Services.kt index 23c26b9..3a52fbc 100644 --- a/src/main/kotlin/Services.kt +++ b/src/main/kotlin/Services.kt @@ -17,6 +17,7 @@ import org.telegram.abilitybots.api.bot.BaseAbilityBot import org.telegram.telegrambots.meta.api.objects.Document import java.net.URL import java.util.concurrent.CompletableFuture +import java.util.concurrent.CopyOnWriteArrayList class OneDriveTransferService( @@ -35,6 +36,8 @@ class OneDriveTransferService( .setTokenCacheAccessAspect(DatabaseTokenCache(db)) .build() + private val tasks = CopyOnWriteArrayList() + init { transaction(db) { SchemaUtils.create(OneDriveTransferSettings, TokenCaches) @@ -117,10 +120,19 @@ class OneDriveTransferService( ).apply { extra["chatId"] = userId extra["messageId"] = messageId - } + }.also { tasks.add(it) } ) } + fun cancelTask(userId: Long, replyMessageId: Int): Boolean { + val task = tasks.find { + it.tgUserId == userId && + (it.extra["infoMessageId"] == replyMessageId || it.extra["messageId"] == replyMessageId) + } ?: throw NoSuchElementException() + + return OneDriveTransferCenter.cancelUploadTask(task) + } + companion object { private val THREAD_CURRENT_GRAPH_CLIENT = ThreadLocal()