From e5e0ebe399ffe7f32c5ac1abcb2e32fb9204cb23 Mon Sep 17 00:00:00 2001 From: LamGC Date: Thu, 11 Jan 2024 00:28:41 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BF=AE=E6=AD=A3=E4=BA=86?= =?UTF-8?q?=E4=B8=8D=E5=B0=91=E9=97=AE=E9=A2=98.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/kotlin/OneDriveTransferCenter.kt | 74 +++++++++++++++----- src/main/kotlin/OneDriveTransferExtension.kt | 2 + src/main/kotlin/Services.kt | 17 ++++- 3 files changed, 73 insertions(+), 20 deletions(-) diff --git a/src/main/kotlin/OneDriveTransferCenter.kt b/src/main/kotlin/OneDriveTransferCenter.kt index 6f53c78..c5588c3 100644 --- a/src/main/kotlin/OneDriveTransferCenter.kt +++ b/src/main/kotlin/OneDriveTransferCenter.kt @@ -16,17 +16,18 @@ import org.telegram.telegrambots.meta.api.methods.GetFile import org.telegram.telegrambots.meta.api.methods.send.SendMessage import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText import org.telegram.telegrambots.meta.api.objects.Document +import java.io.File +import java.io.InputStream +import java.net.SocketTimeoutException import java.util.* import java.util.concurrent.* object OneDriveTransferCenter { - private val queue = ArrayBlockingQueue(100) - - private val executor = OneDriveTransferTaskExecutor(5, DefaultOneDriveTransferCallback, queue) + private val executor = OneDriveTransferTaskExecutor(5, DefaultOneDriveTransferCallback, ArrayBlockingQueue(100)) fun submitUploadTask(task: OneDriveTransferTask) { - queue.offer(task) + executor.submitTransferTask(task) } } @@ -37,9 +38,10 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { SendMessage.builder() .text( """ - OneDrive 中转任务开始执行 - 文件名:${progress.currentTask.document.fileName} - """.trimIndent() + OneDrive 中转任务开始执行 + 正在获取文件信息... + 文件名: ${progress.currentTask.document.fileName} + """.trimIndent() ) .chatId(progress.currentTask.extra["chatId"].toString().toLong()) .replyToMessageId(progress.currentTask.extra["messageId"].toString().toInt()) @@ -54,7 +56,7 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { .messageId(progress.currentTask.extra["messageId"].toString().toInt()) .text(""" OneDrive 中转任务执行中 - 文件名:${progress.currentTask.document.fileName} + 文件名: ${progress.currentTask.document.fileName} 进度:${progress.progress.get() * 100}% """.trimIndent()) .build()) @@ -66,7 +68,7 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { .messageId(task.extra["messageId"].toString().toInt()) .text(""" OneDrive 中转任务执行失败 - 文件名:${task.document.fileName} + 文件名: ${task.document.fileName} 错误信息:${progress.exception?.message} """.trimIndent()) .build()) @@ -78,7 +80,7 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback { .messageId(task.extra["messageId"].toString().toInt()) .text(""" OneDrive 中转任务执行成功 - 文件名:${task.document.fileName} + 文件名: ${task.document.fileName} OneDrive 文件路径:${progress.driveItem?.webUrl} """.trimIndent()) .build()) @@ -115,6 +117,10 @@ class OneDriveTransferTaskExecutor( } } + fun submitTransferTask(task: OneDriveTransferTask) { + taskQueue.offer(task) + } + private fun createWorker(id: Int): Runnable = Runnable { logger.info { "下载线程 $id 已启动." } while (!Thread.interrupted()) { @@ -140,11 +146,31 @@ class OneDriveTransferTaskExecutor( } private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) { - val file = task.bot.execute( - GetFile.builder() - .fileId(task.document.fileId) - .build() - ) + var tempFile: org.telegram.telegrambots.meta.api.objects.File + logger.debug { "开始获取文件信息..." } + var retryCount = 0 + val maxRetryCount = 100 + while (true) { + try { + tempFile = task.bot.execute( + GetFile.builder() + .fileId(task.document.fileId) + .build() + ) + break + } catch (e: Exception) { + if (e !is SocketTimeoutException) { + throw e + } + if (++retryCount > maxRetryCount) { + throw IllegalStateException("GetFile 等待超时", e) + } + logger.debug { "GetFile 接口调用超时, API 端可能正在下载文件, 5 秒后重新调用该接口...($retryCount/$maxRetryCount)" } + Thread.sleep(10000) + } + } + val file = tempFile + logger.debug { "成功获取文件信息:$tempFile" } val graphClient = task.service.createGraphClient(task.tgUserId) val drive = graphClient.drives(task.onedriveId).buildRequest().get() ?: throw IllegalStateException("无法获取 OneDrive 驱动器.") @@ -156,7 +182,7 @@ class OneDriveTransferTaskExecutor( logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" } if (file.fileSize < 4 * 1024 * 1024) { - val fileBytes = task.bot.downloadFileAsStream(file).readAllBytes() + val fileBytes = getFileStream(task.bot, file.filePath).readAllBytes() val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content() .buildRequest() .put(fileBytes) @@ -178,7 +204,7 @@ class OneDriveTransferTaskExecutor( } } - val fileStream = task.bot.downloadFileAsStream(file) + val fileStream = getFileStream(task.bot, file.filePath) val largeFileUploadTask = LargeFileUploadTask( uploadSession, @@ -192,6 +218,20 @@ class OneDriveTransferTaskExecutor( } } + private fun getFileStream(bot: BaseAbilityBot, filePath: String): InputStream { + try { + val localFile = File(filePath) + if (localFile.exists()) { + logger.debug { "本地存在文件:$filePath" } + return localFile.inputStream() + } + } catch (e: Exception) { + logger.debug(e) { "无法从本地获取文件:$filePath" } + } + + return bot.downloadFileAsStream(filePath) + } + private fun checkAndGetPath(graphClient: GraphServiceClient, driveId: String, storagePath: String, originFileName: String): String { val folderPath = checkAndCreateFolder(graphClient, driveId, storagePath) val fileName = checkFileName(graphClient, driveId, folderPath, originFileName) diff --git a/src/main/kotlin/OneDriveTransferExtension.kt b/src/main/kotlin/OneDriveTransferExtension.kt index a5f16e8..98ca8a5 100644 --- a/src/main/kotlin/OneDriveTransferExtension.kt +++ b/src/main/kotlin/OneDriveTransferExtension.kt @@ -134,6 +134,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : } catch (e: OneDriveNotLoginException) { it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId()) return@action + } catch (e: Exception) { + null } val drives = onedriveService.listDriversByUserId(it.chatId()) if (drives.isEmpty()) { diff --git a/src/main/kotlin/Services.kt b/src/main/kotlin/Services.kt index 211f925..8b4e340 100644 --- a/src/main/kotlin/Services.kt +++ b/src/main/kotlin/Services.kt @@ -8,14 +8,13 @@ import com.microsoft.graph.authentication.IAuthenticationProvider import com.microsoft.graph.httpcore.HttpClients import com.microsoft.graph.models.Drive import com.microsoft.graph.requests.GraphServiceClient +import mu.KotlinLogging import okhttp3.Request import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.transactions.transaction import org.telegram.abilitybots.api.bot.BaseAbilityBot import org.telegram.telegrambots.meta.api.objects.Document -import java.net.InetSocketAddress -import java.net.Proxy import java.net.URL import java.util.concurrent.CompletableFuture @@ -25,6 +24,8 @@ class OneDriveTransferService( private val config: ExtensionConfig, private val db: Database ) { + private val logger = KotlinLogging.logger { } + val accountManager: OneDriveTransferSettingManager private val authClient: ConfidentialClientApplication = ConfidentialClientApplication.builder( config.clientId, @@ -32,7 +33,6 @@ class OneDriveTransferService( ) .authority(OneDriveTransferSettingManager.AUTHORITY) .setTokenCacheAccessAspect(DatabaseTokenCache(db)) - .proxy(Proxy(Proxy.Type.HTTP, InetSocketAddress.createUnresolved("127.0.0.1", 1089))) .build() init { @@ -67,6 +67,16 @@ class OneDriveTransferService( accountManager.updateAccount(userId, redirectUrl) fun listDriversByUserId(userId: Long): List { + val sites = try { + val sites = createGraphClient(userId).sites() + .buildRequest().get()?.currentPage ?: emptyList() + sites.map { + it.drive + }.filterNotNull().toList() + } catch (e: Exception) { + logger.debug(e) { "获取 OneDrive 站点失败, 可能是用户没有权限或不是组织账号." } + emptyList() + } val drive = createGraphClient(userId).me().drive().buildRequest().get() val drives = createGraphClient(userId).drives() .buildRequest() @@ -76,6 +86,7 @@ class OneDriveTransferService( add(drive) } addAll(drives) + addAll(sites) } }