From 68a1c23469c4c1a227fc5ee4df76698836acc6bb Mon Sep 17 00:00:00 2001 From: LamGC Date: Fri, 13 Dec 2024 09:37:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=20Scalabot=20v0.8.0,?= =?UTF-8?q?=20=E5=B9=B6=E4=B8=94=E5=87=86=E5=A4=87=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E4=B8=BB=E4=BB=8E=E8=8A=82=E7=82=B9=E5=8A=9F=E8=83=BD.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle.kts | 25 +- src/main/kotlin/AgentMain.kt | 5 + .../kotlin/DefaultOneDriveTransferCallback.kt | 252 +++++++++++++++ src/main/kotlin/ExtensionConfig.kt | 7 + src/main/kotlin/ExtensionFactory.kt | 11 +- src/main/kotlin/OneDriveTransferCenter.kt | 287 ++++-------------- src/main/kotlin/OneDriveTransferExtension.kt | 106 ++++--- src/main/kotlin/Services.kt | 2 +- src/main/kotlin/Utils.kt | 10 +- 9 files changed, 425 insertions(+), 280 deletions(-) create mode 100644 src/main/kotlin/AgentMain.kt create mode 100644 src/main/kotlin/DefaultOneDriveTransferCallback.kt diff --git a/build.gradle.kts b/build.gradle.kts index e2c03a4..ff27e5a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,7 +1,7 @@ import java.net.URI plugins { - kotlin("jvm") version "1.9.21" + kotlin("jvm") version "2.1.0" `maven-publish` } @@ -9,6 +9,7 @@ group = "net.lamgc.scext" version = "0.1.0-SNAPSHOT" repositories { + mavenLocal() mavenCentral() maven { @@ -20,7 +21,7 @@ repositories { dependencies { compileOnly("org.slf4j:slf4j-api:2.0.10") compileOnly("io.github.microutils:kotlin-logging:3.0.5") - compileOnly("net.lamgc:scalabot-extension:0.6.1") + compileOnly("net.lamgc:scalabot-extension:0.8.0-1") implementation("com.fasterxml.jackson.core:jackson-core:2.16.1") implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1") @@ -39,7 +40,7 @@ dependencies { implementation("com.zaxxer:HikariCP:5.1.0") implementation("com.microsoft.graph:microsoft-graph:5.77.0") - implementation("com.azure:azure-identity:1.11.1") + implementation("com.azure:azure-identity:1.14.2") testImplementation("org.jetbrains.kotlin:kotlin-test") } @@ -51,6 +52,24 @@ kotlin { jvmToolchain(17) } +val fatJar = task("fatJar", type = Jar::class) { + archiveBaseName = "${project.name}-agent" + manifest { + attributes["Implementation-Title"] = "OneDrive-Transfer" + attributes["Implementation-Version"] = version + attributes["Main-Class"] = "net.lamgc.scext.onedrive_transfer.AgentMainKt" + } + duplicatesStrategy = DuplicatesStrategy.EXCLUDE + from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) })) + with(tasks.jar.get() as CopySpec) +} + +tasks { + "jar" { + dependsOn(fatJar) + } +} + publishing { repositories { maven("https://git.lamgc.me/api/packages/LamGC/maven") { diff --git a/src/main/kotlin/AgentMain.kt b/src/main/kotlin/AgentMain.kt new file mode 100644 index 0000000..08ada2a --- /dev/null +++ b/src/main/kotlin/AgentMain.kt @@ -0,0 +1,5 @@ +package net.lamgc.scext.onedrive_transfer + +fun main() { + +} diff --git a/src/main/kotlin/DefaultOneDriveTransferCallback.kt b/src/main/kotlin/DefaultOneDriveTransferCallback.kt new file mode 100644 index 0000000..b49d89e --- /dev/null +++ b/src/main/kotlin/DefaultOneDriveTransferCallback.kt @@ -0,0 +1,252 @@ +package net.lamgc.scext.onedrive_transfer + +import com.google.common.cache.CacheBuilder +import com.google.common.cache.CacheLoader +import com.google.common.util.concurrent.RateLimiter +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import mu.KotlinLogging +import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot +import org.telegram.telegrambots.meta.api.methods.send.SendMessage +import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage +import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText +import java.util.* + +@Suppress("UnstableApiUsage") +object DefaultOneDriveTransferCallback : OneDriveTransferCallback { + + private val logger = KotlinLogging.logger { } + + private val progressUpdateRateLimiterMap = WeakHashMap() + private val progressUpdateUserRateLimiterMap = CacheBuilder.newBuilder() + .expireAfterAccess(15, java.util.concurrent.TimeUnit.MINUTES) + .softValues() + .initialCapacity(12) + .concurrencyLevel(4) + .build(object : CacheLoader() { + override fun load(key: BotUserRateLimitToken): RateLimiter { + return RateLimiter.create(3.0) + } + }) + + private fun BaseAbilityBot.rateLimiterPerBot(): RateLimiter { + return progressUpdateRateLimiterMap[this] ?: RateLimiter.create(25.0).also { + progressUpdateRateLimiterMap[this] = it + } + } + + private fun BaseAbilityBot.rateLimiterPerUser(userId: Long): RateLimiter { + val token = BotUserRateLimitToken(this, userId) + return progressUpdateUserRateLimiterMap[token] + } + + private fun BaseAbilityBot.tryAcquireRateLimit(userId: Long): Boolean { + return if (rateLimiterPerUser(userId).tryAcquire()) { + rateLimiterPerBot().tryAcquire() + } else { + false + } + } + + override fun onTransferTaskCreated(task: OneDriveTransferTask) { + if (task.extra["infoMessageId"] != null) { + val deleteMessage = DeleteMessage.builder() + .chatId(task.extra["chatId"].toString().toLong()) + .messageId(task.extra["infoMessageId"].toString().toInt()) + .build() + + try { + task.bot.telegramClient.execute(deleteMessage) + } catch (e: Exception) { + logger.debug(e) { "删除旧状态消息时出错, 忽略该异常." } + } + } + + val msg = if (task.retryCount == 0) { + """ + OneDrive 中转任务已创建 + 正在排队中... + 文件名: ${task.document.fileName} + ------------------------------------------------- + #Queuing + """.trimIndent() + } else { + """ + OneDrive 中转任务已创建 + 正在排队中... + 文件名: ${task.document.fileName} + 上次错误信息:${(task.extra["lastError"] as Exception?)?.message ?: "(没有错误信息)"} + 重试次数:${task.retryCount} + ------------------------------------------------- + #Queuing + """.trimIndent() + } + + val message = task.bot.telegramClient.execute( + SendMessage.builder() + .text(msg) + .chatId(task.extra["chatId"].toString().toLong()) + .replyToMessageId(task.extra["messageId"].toString().toInt()) + .build() + ) + task.extra["infoMessageId"] = message.messageId + } + + override fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) { + val newMessage = EditMessageText.builder() + .chatId(progress.currentTask.extra["chatId"].toString().toLong()) + .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) + .text( + """ + OneDrive 中转任务开始执行 + 正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件) + 文件名: ${progress.currentTask.document.fileName} + ------------------------------------------------- + #Starting + """.trimIndent() + ) + .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) + if (newMessage != null) { + progress.currentTask.extra["infoMessageId"] = newMessage.messageId + } + } + + override fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) { + val newMessage = EditMessageText.builder() + .chatId(progress.currentTask.extra["chatId"].toString().toLong()) + .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) + .text( + """ + 正在获取文件信息... + 文件名: ${progress.currentTask.document.fileName} + 重试次数:$retryCount / $maxRetryCount + ------------------------------------------------- + 重试并不等同于获取文件失败,由于 Telegram Bot API 需要下载文件后 + 才会获取文件信息,因此需要重试以等待文件下载完成。 + + #GettingFileInfo + """.trimIndent() + ) + .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) + if (newMessage != null) { + progress.currentTask.extra["infoMessageId"] = newMessage.messageId + } + } + + override fun onGotFileInfo( + progress: OneDriveTransferWorkerProgress, + file: org.telegram.telegrambots.meta.api.objects.File + ) { + val newMessage = EditMessageText.builder() + .chatId(progress.currentTask.extra["chatId"].toString().toLong()) + .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) + .text( + """ + 已获取文件信息,正在创建 OneDrive 上传会话... + 文件名: ${progress.currentTask.document.fileName} + 文件大小:${file.fileSize / 1024} KB + ------------------------------------------------- + #UploadStarting + """.trimIndent() + ) + .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) + if (newMessage != null) { + progress.currentTask.extra["infoMessageId"] = newMessage.messageId + } + } + + override fun onUploadProgress(progress: OneDriveTransferWorkerProgress) { + if (!progress.currentTask.bot.tryAcquireRateLimit(progress.currentTask.tgUserId)) { + return + } + val newMessage = EditMessageText.builder() + .chatId(progress.currentTask.extra["chatId"].toString().toLong()) + .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) + .text( + """ + OneDrive 中转任务执行中 + 文件名: ${progress.currentTask.document.fileName} + 进度:${String.format("%.3f", progress.progress.get() * 100)}% + ------------------------------------------------- + #Processing + """.trimIndent() + ) + .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) + if (newMessage != null) { + progress.currentTask.extra["infoMessageId"] = newMessage.messageId + } + } + + override fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) { + val newMessage = EditMessageText.builder() + .chatId(task.extra["chatId"].toString().toLong()) + .messageId(task.extra["infoMessageId"].toString().toInt()) + .text( + """ + OneDrive 中转任务执行失败 + 文件名: ${task.document.fileName} + 错误信息:${progress.exception?.message} + 重试次数:${task.retryCount} + + 任务将会追加至队列尾部进行重试。 + ------------------------------------------------- + #Failure #Error + """.trimIndent() + ) + .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) + if (newMessage != null) { + progress.currentTask.extra["infoMessageId"] = newMessage.messageId + progress.currentTask.extra["lastError"] = progress.exception + } + if (task.retryCount < 5) { + runBlocking { + delay(10000) + task.retryCount++ + OneDriveTransferCenter.submitUploadTask(task) + } + } + } + + override fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) { + val newMessage = EditMessageText.builder() + .chatId(task.extra["chatId"].toString().toLong()) + .messageId(task.extra["infoMessageId"].toString().toInt()) + .text( + """ + OneDrive 中转任务执行成功 + 文件名: ${task.document.fileName} + OneDrive 文件路径:${progress.driveItem?.webUrl} + ------------------------------------------------ + #Success + """.trimIndent() + ) + .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) + if (newMessage != null) { + progress.currentTask.extra["infoMessageId"] = newMessage.messageId + } + } + + override fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) { + val newMessage = EditMessageText.builder() + .chatId(task.extra["chatId"].toString().toLong()) + .messageId(task.extra["infoMessageId"].toString().toInt()) + .text( + """ + OneDrive 中转任务已取消 + 文件名: ${task.document.fileName} + ------------------------------------------------- + #Cancelled + """.trimIndent() + ) + .build().orSendMessage(task.bot, task.extra["messageId"].toString().toInt()) + if (newMessage != null) { + task.extra["infoMessageId"] = newMessage.messageId + } + } + +} + +private data class BotUserRateLimitToken( + val bot: BaseAbilityBot, + val userId: Long +) diff --git a/src/main/kotlin/ExtensionConfig.kt b/src/main/kotlin/ExtensionConfig.kt index 415576f..133e85c 100644 --- a/src/main/kotlin/ExtensionConfig.kt +++ b/src/main/kotlin/ExtensionConfig.kt @@ -6,4 +6,11 @@ data class ExtensionConfig( val useCommandPrefix: Boolean = true, val maxFileSize: Long = 1024L * 1024 * 1024 * 4, val maxTransferSize: Long = 1024L * 1024 * 1024 * 20, + val centralSetting: CentralSetting = CentralSetting() +) + +data class CentralSetting( + val enable: Boolean = false, + val port: Int = 24860, + val secret: String = "", ) diff --git a/src/main/kotlin/ExtensionFactory.kt b/src/main/kotlin/ExtensionFactory.kt index 5d7f6dd..6e21e75 100644 --- a/src/main/kotlin/ExtensionFactory.kt +++ b/src/main/kotlin/ExtensionFactory.kt @@ -1,12 +1,17 @@ package net.lamgc.scext.onedrive_transfer +import net.lamgc.scalabot.extension.BotExtensionCreateOptions import net.lamgc.scalabot.extension.BotExtensionFactory -import org.telegram.abilitybots.api.bot.BaseAbilityBot -import org.telegram.abilitybots.api.util.AbilityExtension +import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot +import org.telegram.telegrambots.abilitybots.api.util.AbilityExtension import java.io.File class ExtensionFactory : BotExtensionFactory { - override fun createExtensionInstance(bot: BaseAbilityBot, dataFolder: File): AbilityExtension { + override fun createExtensionInstance( + bot: BaseAbilityBot, + dataFolder: File, + createOptions: BotExtensionCreateOptions + ): AbilityExtension { return OneDriveTransferExtension(bot, dataFolder) } } diff --git a/src/main/kotlin/OneDriveTransferCenter.kt b/src/main/kotlin/OneDriveTransferCenter.kt index 1fc0b80..a680eef 100644 --- a/src/main/kotlin/OneDriveTransferCenter.kt +++ b/src/main/kotlin/OneDriveTransferCenter.kt @@ -9,15 +9,10 @@ import com.microsoft.graph.models.DriveItemUploadableProperties import com.microsoft.graph.requests.GraphServiceClient import com.microsoft.graph.tasks.IProgressCallback import com.microsoft.graph.tasks.LargeFileUploadTask -import kotlinx.coroutines.delay -import kotlinx.coroutines.runBlocking import mu.KotlinLogging import okhttp3.Request -import org.telegram.abilitybots.api.bot.BaseAbilityBot +import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot import org.telegram.telegrambots.meta.api.methods.GetFile -import org.telegram.telegrambots.meta.api.methods.send.SendMessage -import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage -import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText import org.telegram.telegrambots.meta.api.objects.Document import org.telegram.telegrambots.meta.exceptions.TelegramApiException import java.io.File @@ -28,218 +23,19 @@ import java.util.concurrent.* object OneDriveTransferCenter { - val executor = - OneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, LinkedBlockingQueue()) + val executor: OneDriveTransferTaskExecutor = + LocalOneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, LinkedBlockingQueue()) fun submitUploadTask(task: OneDriveTransferTask): Boolean = executor.submitTransferTask(task) fun cancelUploadTask(task: OneDriveTransferTask): Boolean = executor.cancelTransferTask(task) - fun getQueueingTaskCount(): Int = executor.taskQueue.size + fun getQueueingTaskCount(): Int = executor.getQueuedTransferTasks().size fun getProcessingTasks(): Map = - Collections.unmodifiableMap(executor.threadStatusMap) + Collections.unmodifiableMap(executor.getWorkingTasks()) - fun getProcessThreadNum(): Int = executor.threadNum - -} - -object DefaultOneDriveTransferCallback : OneDriveTransferCallback { - - private val logger = KotlinLogging.logger { } - - override fun onTransferTaskCreated(task: OneDriveTransferTask) { - if (task.extra["infoMessageId"] != null) { - val deleteMessage = DeleteMessage.builder() - .chatId(task.extra["chatId"].toString().toLong()) - .messageId(task.extra["infoMessageId"].toString().toInt()) - .build() - - try { - task.bot.execute(deleteMessage) - } catch (e: Exception) { - logger.debug(e) { "删除旧状态消息时出错, 忽略该异常." } - } - } - - val msg = if (task.retryCount == 0) { - """ - OneDrive 中转任务已创建 - 正在排队中... - 文件名: ${task.document.fileName} - ------------------------------------------------- - #Queuing - """.trimIndent() - } else { - """ - OneDrive 中转任务已创建 - 正在排队中... - 文件名: ${task.document.fileName} - 上次错误信息:${(task.extra["lastError"] as Exception?)?.message ?: "(没有错误信息)"} - 重试次数:${task.retryCount} - ------------------------------------------------- - #Queuing - """.trimIndent() - } - - val message = task.bot.execute( - SendMessage.builder() - .text(msg) - .chatId(task.extra["chatId"].toString().toLong()) - .replyToMessageId(task.extra["messageId"].toString().toInt()) - .build() - ) - task.extra["infoMessageId"] = message.messageId - } - - override fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) { - val newMessage = EditMessageText.builder() - .chatId(progress.currentTask.extra["chatId"].toString().toLong()) - .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) - .text( - """ - OneDrive 中转任务开始执行 - 正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件) - 文件名: ${progress.currentTask.document.fileName} - ------------------------------------------------- - #Starting - """.trimIndent() - ) - .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) - if (newMessage != null) { - progress.currentTask.extra["infoMessageId"] = newMessage.messageId - } - } - - override fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) { - val newMessage = EditMessageText.builder() - .chatId(progress.currentTask.extra["chatId"].toString().toLong()) - .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) - .text( - """ - 正在获取文件信息... - 文件名: ${progress.currentTask.document.fileName} - 重试次数:$retryCount / $maxRetryCount - ------------------------------------------------- - 重试并不等同于获取文件失败,由于 Telegram Bot API 需要下载文件后 - 才会获取文件信息,因此需要重试以等待文件下载完成。 - - #GettingFileInfo - """.trimIndent() - ) - .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) - if (newMessage != null) { - progress.currentTask.extra["infoMessageId"] = newMessage.messageId - } - } - - override fun onGotFileInfo( - progress: OneDriveTransferWorkerProgress, - file: org.telegram.telegrambots.meta.api.objects.File - ) { - val newMessage = EditMessageText.builder() - .chatId(progress.currentTask.extra["chatId"].toString().toLong()) - .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) - .text( - """ - 已获取文件信息,正在创建 OneDrive 上传会话... - 文件名: ${progress.currentTask.document.fileName} - 文件大小:${file.fileSize / 1024} KB - ------------------------------------------------- - #UploadStarting - """.trimIndent() - ) - .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) - if (newMessage != null) { - progress.currentTask.extra["infoMessageId"] = newMessage.messageId - } - } - - override fun onUploadProgress(progress: OneDriveTransferWorkerProgress) { - val newMessage = EditMessageText.builder() - .chatId(progress.currentTask.extra["chatId"].toString().toLong()) - .messageId(progress.currentTask.extra["infoMessageId"].toString().toInt()) - .text( - """ - OneDrive 中转任务执行中 - 文件名: ${progress.currentTask.document.fileName} - 进度:${String.format("%.3f", progress.progress.get() * 100)}% - ------------------------------------------------- - #Processing - """.trimIndent() - ) - .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) - if (newMessage != null) { - progress.currentTask.extra["infoMessageId"] = newMessage.messageId - } - } - - override fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) { - val newMessage = EditMessageText.builder() - .chatId(task.extra["chatId"].toString().toLong()) - .messageId(task.extra["infoMessageId"].toString().toInt()) - .text( - """ - OneDrive 中转任务执行失败 - 文件名: ${task.document.fileName} - 错误信息:${progress.exception?.message} - 重试次数:${task.retryCount} - - 任务将会追加至队列尾部进行重试。 - ------------------------------------------------- - #Failure #Error - """.trimIndent() - ) - .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) - if (newMessage != null) { - progress.currentTask.extra["infoMessageId"] = newMessage.messageId - progress.currentTask.extra["lastError"] = progress.exception - } - if (task.retryCount < 5) { - runBlocking { - delay(10000) - task.retryCount++ - OneDriveTransferCenter.submitUploadTask(task) - } - } - } - - override fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) { - val newMessage = EditMessageText.builder() - .chatId(task.extra["chatId"].toString().toLong()) - .messageId(task.extra["infoMessageId"].toString().toInt()) - .text( - """ - OneDrive 中转任务执行成功 - 文件名: ${task.document.fileName} - OneDrive 文件路径:${progress.driveItem?.webUrl} - ------------------------------------------------ - #Success - """.trimIndent() - ) - .build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt()) - if (newMessage != null) { - progress.currentTask.extra["infoMessageId"] = newMessage.messageId - } - } - - override fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) { - val newMessage = EditMessageText.builder() - .chatId(task.extra["chatId"].toString().toLong()) - .messageId(task.extra["infoMessageId"].toString().toInt()) - .text( - """ - OneDrive 中转任务已取消 - 文件名: ${task.document.fileName} - ------------------------------------------------- - #Cancelled - """.trimIndent() - ) - .build().orSendMessage(task.bot, task.extra["messageId"].toString().toInt()) - if (newMessage != null) { - task.extra["infoMessageId"] = newMessage.messageId - } - } + fun getWorkerCount(): Int = executor.getWorkerCount() } @@ -254,6 +50,20 @@ interface OneDriveTransferCallback { fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) } +interface OneDriveTransferTaskExecutor { + + fun submitTransferTask(task: OneDriveTransferTask): Boolean + + fun cancelTransferTask(task: OneDriveTransferTask): Boolean + + fun getQueuedTransferTasks(): List + + fun getWorkerCount(): Int + + fun getWorkingTasks(): Map + +} + /** * OneDrive 中转任务执行器. * @param threadNum 线程数量. @@ -261,10 +71,10 @@ interface OneDriveTransferCallback { * @param taskQueue 任务队列. * @param chunkSize 上传块大小的倍率, 实际上传块大小为 320 KiB * chunkSize. */ -class OneDriveTransferTaskExecutor( - val threadNum: Int, +class LocalOneDriveTransferTaskExecutor( + private val threadNum: Int, private val callback: OneDriveTransferCallback, - val taskQueue: BlockingQueue, + private val taskQueue: BlockingQueue, private val chunkSize: Int = 26 ) : ThreadPoolExecutor( threadNum, threadNum, 0, TimeUnit.SECONDS, @@ -272,12 +82,12 @@ class OneDriveTransferTaskExecutor( ThreadFactoryBuilder() .setNameFormat("Transfer Worker %d") .build() -) { +), OneDriveTransferTaskExecutor { private val logger = KotlinLogging.logger { } - val threadStatusMap = ConcurrentHashMap() - val threadMap = ConcurrentHashMap() + private val threadStatusMap = ConcurrentHashMap() + private val threadMap = ConcurrentHashMap() init { if (chunkSize > MAX_CHUNK_SIZE) { @@ -289,7 +99,7 @@ class OneDriveTransferTaskExecutor( } } - fun submitTransferTask(task: OneDriveTransferTask): Boolean { + override fun submitTransferTask(task: OneDriveTransferTask): Boolean { if (taskQueue.remainingCapacity() > 0) { callback.onTransferTaskCreated(task) return taskQueue.offer(task) @@ -297,7 +107,7 @@ class OneDriveTransferTaskExecutor( return false } - fun cancelTransferTask(task: OneDriveTransferTask): Boolean { + override fun cancelTransferTask(task: OneDriveTransferTask): Boolean { if (!taskQueue.remove(task)) { for (i in 0 until threadNum) { if (threadStatusMap[i]?.currentTask?.id == task.id) { @@ -315,6 +125,12 @@ class OneDriveTransferTaskExecutor( return true } + override fun getQueuedTransferTasks(): List = taskQueue.toList() + + override fun getWorkerCount(): Int = threadNum + + override fun getWorkingTasks(): Map = threadStatusMap + private fun createWorker(id: Int): Runnable = Runnable { threadMap[id] = Thread.currentThread() logger.info { "下载线程 $id 已启动." } @@ -378,7 +194,7 @@ class OneDriveTransferTaskExecutor( } try { - tempFile = task.bot.execute( + tempFile = task.bot.telegramClient.execute( GetFile.builder() .fileId(task.document.fileId) .build() @@ -428,7 +244,9 @@ class OneDriveTransferTaskExecutor( val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath) .createUploadSession( DriveItemCreateUploadSessionParameterSet.newBuilder() - .withItem(DriveItemUploadableProperties()) + .withItem(DriveItemUploadableProperties().apply { + fileSize = file.fileSize + }) .build() ) .buildRequest() @@ -470,7 +288,7 @@ class OneDriveTransferTaskExecutor( logger.debug(e) { "无法从本地获取文件:$filePath" } } - return bot.downloadFileAsStream(filePath) + return bot.telegramClient.downloadFileAsStream(filePath) } private fun checkAndGetPath( @@ -588,18 +406,41 @@ class OneDriveTransferTaskExecutor( } companion object { - val ONCE_CHUNK_SIZE = 320 * 1024 - val MAX_CHUNK_SIZE = 192 + const val ONCE_CHUNK_SIZE = 320 * 1024 + const val MAX_CHUNK_SIZE = 192 } } enum class OneDriveTransferStatus { + /** + * 正在排队. + */ QUEUING, + + /** + * 正在从 Telegram 获取文件信息. + */ GETTING_FILE_INFO, + + /** + * 正在创建上传会话. + */ CREATING_UPLOAD_SESSION, + + /** + * 上传中. + */ UPLOADING, + + /** + * 上传成功. + */ SUCCESS, + + /** + * 上传失败. + */ FAILURE } diff --git a/src/main/kotlin/OneDriveTransferExtension.kt b/src/main/kotlin/OneDriveTransferExtension.kt index f992d1b..3adf377 100644 --- a/src/main/kotlin/OneDriveTransferExtension.kt +++ b/src/main/kotlin/OneDriveTransferExtension.kt @@ -6,13 +6,12 @@ import com.google.common.cache.CacheBuilder import com.microsoft.aad.msal4j.MsalInteractionRequiredException import mu.KotlinLogging import org.jetbrains.exposed.sql.Database -import org.telegram.abilitybots.api.bot.BaseAbilityBot -import org.telegram.abilitybots.api.objects.Ability -import org.telegram.abilitybots.api.objects.Ability.AbilityBuilder -import org.telegram.abilitybots.api.objects.Locality -import org.telegram.abilitybots.api.objects.Privacy -import org.telegram.abilitybots.api.objects.Reply -import org.telegram.abilitybots.api.util.AbilityExtension +import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot +import org.telegram.telegrambots.abilitybots.api.objects.Ability +import org.telegram.telegrambots.abilitybots.api.objects.Locality +import org.telegram.telegrambots.abilitybots.api.objects.Privacy +import org.telegram.telegrambots.abilitybots.api.objects.Reply +import org.telegram.telegrambots.abilitybots.api.util.AbilityExtension import org.telegram.telegrambots.meta.api.methods.send.SendMessage import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText @@ -20,6 +19,7 @@ import org.telegram.telegrambots.meta.api.objects.replykeyboard.InlineKeyboardMa import org.telegram.telegrambots.meta.api.objects.replykeyboard.ReplyKeyboardMarkup import org.telegram.telegrambots.meta.api.objects.replykeyboard.ReplyKeyboardRemove import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.InlineKeyboardButton +import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.InlineKeyboardRow import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.KeyboardRow import java.io.File import java.net.MalformedURLException @@ -63,7 +63,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : .privacy(Privacy.PUBLIC) .locality(Locality.USER) .action { - it.bot().silent().send( + it.bot().silent.send( """ OneDrive Transfer 是一个 Telegram 机器人, 可以将 Telegram 中的文件上传到 OneDrive 中转。 @@ -97,7 +97,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : .action { ctx -> val url = onedriveService.createLoginUrl(ctx.chatId()) actionCache.put(ctx.chatId(), "login") - ctx.bot().silent().send(""" + ctx.bot().silent.send( + """ 请使用以下链接进行登录: $url @@ -111,23 +112,24 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : try { val account = onedriveService.updateAccount(upd.message.chat.id, URL(upd.message.text.trim())) actionCache.invalidate(upd.message.chatId) - bot.silent().send(""" + bot.silent.send( + """ 登录成功! Microsoft 账号:${account.userName} 请使用 /select_drive 选择 OneDrive 驱动器以及设置上传路径。 """.trimIndent(), upd.message.chatId) } catch (e: MsalInteractionRequiredException) { if (e.errorCode() == "AADSTS54005") { - bot.silent().send("登录失败,登录链接已过期,请重新登录。", upd.message.chatId) + bot.silent.send("登录失败,登录链接已过期,请重新登录。", upd.message.chatId) } else { - bot.silent().send("登录失败,错误代码:${e.errorCode()}", upd.message.chatId) + bot.silent.send("登录失败,错误代码:${e.errorCode()}", upd.message.chatId) } actionCache.invalidate(upd.message.chatId) } catch (e: MalformedURLException) { - bot.silent().send("链接格式错误,请发送正确的登录链接。", upd.message.chatId) + bot.silent.send("链接格式错误,请发送正确的登录链接。", upd.message.chatId) } catch (e: Exception) { logger.error(e) { "处理 Oauth2 令牌时发生错误." } - bot.silent().send("处理 Oauth2 令牌时发生错误,请稍后重试。", upd.message.chatId) + bot.silent.send("处理 Oauth2 令牌时发生错误,请稍后重试。", upd.message.chatId) actionCache.invalidate(upd.message.chatId) } }, @@ -144,10 +146,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : .action { val account = onedriveService.accountManager.getAccountByTgUserId(it.chatId()) if (account == null) { - it.bot().silent().send("当前账户未登录 OneDrive.", it.chatId()) + it.bot().silent.send("当前账户未登录 OneDrive.", it.chatId()) return@action } - it.bot().silent().send(""" + it.bot().silent.send( + """ 当前账户已登录 OneDrive. Microsoft 账号: ${account.userName} """.trimIndent(), it.chatId()) @@ -163,14 +166,14 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : val currentDrive = try { onedriveService.getCurrentDrive(it.chatId()) } catch (e: OneDriveNotLoginException) { - it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId()) + it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId()) return@action } catch (e: Exception) { null } val drives = onedriveService.listDriversByUserId(it.chatId()) if (drives.isEmpty()) { - it.bot().silent().send("当前账户没有 OneDrive 驱动器.", it.chatId()) + it.bot().silent.send("当前账户没有 OneDrive 驱动器.", it.chatId()) return@action } @@ -186,30 +189,39 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : .text(msgContent) .replyMarkup(InlineKeyboardMarkup.builder().apply { for (drive in drives) { - keyboardRow(listOf(InlineKeyboardButton.builder() - .text("[${drive.driveType}] ${drive.name}") - .callbackData(buttonTokenCache.putToken("${drive.id}", tokenPrefix = "select_drive:")) - .build())) + keyboardRow( + InlineKeyboardRow( + InlineKeyboardButton.builder() + .text("[${drive.driveType}] ${drive.name}") + .callbackData( + buttonTokenCache.putToken( + "${drive.id}", + tokenPrefix = "select_drive:" + ) + ) + .build() + ) + ) } }.build()) .build().let { msg -> - it.bot().execute(msg) + it.bot().silent.execute(msg) } } .reply({ bot, upd -> val driveId = buttonTokenCache.getIfPresent(upd.callbackQuery.data.substringAfter(':')) if (driveId == null) { - bot.silent().send("无效的驱动器 ID.", upd.callbackQuery.message.chatId) + bot.silent.send("无效的驱动器 ID.", upd.callbackQuery.message.chatId) return@reply } val drive = try { onedriveService.listDriversByUserId(upd.callbackQuery.from.id).firstOrNull { it.id == driveId } } catch (e: OneDriveNotLoginException) { - bot.silent().send("当前账户没有登录 OneDrive.", upd.callbackQuery.message.chatId) + bot.silent.send("当前账户没有登录 OneDrive.", upd.callbackQuery.message.chatId) return@reply } if (drive == null) { - bot.silent().send("无效的驱动器 ID.", upd.callbackQuery.message.chatId) + bot.silent.send("无效的驱动器 ID.", upd.callbackQuery.message.chatId) return@reply } onedriveService.setDrive(upd.callbackQuery.from.id, driveId) @@ -228,7 +240,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : """.trimIndent() ) .build() - bot.execute(editMessageText) + bot.silent.execute(editMessageText) }, { upd -> upd.hasCallbackQuery() && upd.callbackQuery.data != null && upd.callbackQuery.data.startsWith("select_drive:") }) @@ -243,16 +255,16 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : val currentDrive = try { onedriveService.getCurrentDrive(it.chatId()) } catch (e: OneDriveNotLoginException) { - it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId()) + it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId()) return@action } if (currentDrive == null) { - it.bot().silent().send("当前账户没有选择 OneDrive 驱动器.", it.chatId()) + it.bot().silent.send("当前账户没有选择 OneDrive 驱动器.", it.chatId()) return@action } val transferSetting = onedriveService.accountManager.getTransferSetting(it.chatId()) if (transferSetting == null) { - it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId()) + it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId()) return@action } val msgContent = """ @@ -279,14 +291,15 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : this.isPersistent(false) }.build()) .build().let { msg -> - it.bot().execute(msg) + it.bot().silent.execute(msg) } } .reply({ bot, upd -> val path = upd.message.text.trim() if (path == "/cancel") { actionCache.invalidate(upd.message.chatId) - bot.execute(SendMessage.builder() + bot.silent.execute( + SendMessage.builder() .chatId(upd.message.chatId.toString()) .text("已取消设置.") .replyMarkup(ReplyKeyboardRemove.builder() @@ -298,7 +311,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : val transferSetting = onedriveService.accountManager.getTransferSetting(upd.message.chatId) if (transferSetting == null) { actionCache.invalidate(upd.message.chatId) - bot.execute(SendMessage.builder() + bot.silent.execute( + SendMessage.builder() .chatId(upd.message.chatId.toString()) .text("当前账户没有登录 OneDrive.") .replyMarkup(ReplyKeyboardRemove.builder() @@ -311,11 +325,13 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : transferSetting.storagePath = path } actionCache.invalidate(upd.message.chatId) - bot.execute(DeleteMessage.builder() + bot.silent.execute( + DeleteMessage.builder() .chatId(upd.message.chatId.toString()) .messageId(upd.message.messageId) .build()) - bot.execute(SendMessage.builder() + bot.silent.execute( + SendMessage.builder() .chatId(upd.message.chatId.toString()) .text("已设置上传路径为:$path") .replyMarkup(ReplyKeyboardRemove.builder() @@ -335,13 +351,13 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : .action { val transferSetting = onedriveService.accountManager.getTransferSetting(it.chatId()) if (transferSetting == null) { - it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId()) + it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId()) return@action } onedriveService.accountManager.doSomething { transferSetting.delete() } - it.bot().silent().send("已登出 OneDrive.", it.chatId()) + it.bot().silent.send("已登出 OneDrive.", it.chatId()) } .build() @@ -349,11 +365,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : { bot, upd -> val document = upd.message.document if (document == null) { - bot.silent().send("请发送文件.", upd.message.chatId) + bot.silent.send("请发送文件.", upd.message.chatId) return@of } if (document.fileSize > config.maxFileSize) { - bot.silent().send("文件大小超过限制.", upd.message.chatId) + bot.silent.send("文件大小超过限制.", upd.message.chatId) return@of } onedriveService.submitUploadDocumentTask(upd.message.chatId, upd.message.messageId, document) @@ -379,11 +395,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : val msg = StringBuilder( """ 当前排队任务数:${OneDriveTransferCenter.getQueueingTaskCount()} - 当前处理任务数:${processingTasks.size} / ${OneDriveTransferCenter.executor.threadNum} + 当前处理任务数:${processingTasks.size} / ${OneDriveTransferCenter.getWorkerCount()} -------------------------------------------------------------- """.trimIndent() ).append('\n') - for (workerId in 0...putToken(value: String, tokenLength: Int = 16, tokenPr fun EditMessageText.orSendMessage(bot: BaseAbilityBot, replyMessageId: Int, tryRemove: Boolean = true): Message? { try { - bot.execute(this) + bot.telegramClient.execute(this) return null } catch (e: TelegramApiRequestException) { if (e.errorCode != 400 || e.apiResponse != "message can't be edited") { throw e } - return bot.execute( + return bot.telegramClient.execute( SendMessage.builder() .replyToMessageId(replyMessageId) .chatId(chatId.toString()) @@ -52,7 +52,7 @@ fun EditMessageText.orSendMessage(bot: BaseAbilityBot, replyMessageId: Int, tryR } finally { if (tryRemove) { try { - bot.execute( + bot.telegramClient.execute( DeleteMessage.builder() .messageId(messageId) .build()