refactor: 修正了不少问题.

This commit is contained in:
LamGC 2024-01-11 00:28:41 +08:00
parent 0a86bcbf90
commit e5e0ebe399
Signed by: LamGC
GPG Key ID: 6C5AE2A913941E1D
3 changed files with 73 additions and 20 deletions

View File

@ -16,17 +16,18 @@ import org.telegram.telegrambots.meta.api.methods.GetFile
import org.telegram.telegrambots.meta.api.methods.send.SendMessage import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import org.telegram.telegrambots.meta.api.objects.Document import org.telegram.telegrambots.meta.api.objects.Document
import java.io.File
import java.io.InputStream
import java.net.SocketTimeoutException
import java.util.* import java.util.*
import java.util.concurrent.* import java.util.concurrent.*
object OneDriveTransferCenter { object OneDriveTransferCenter {
private val queue = ArrayBlockingQueue<OneDriveTransferTask>(100) private val executor = OneDriveTransferTaskExecutor(5, DefaultOneDriveTransferCallback, ArrayBlockingQueue<OneDriveTransferTask>(100))
private val executor = OneDriveTransferTaskExecutor(5, DefaultOneDriveTransferCallback, queue)
fun submitUploadTask(task: OneDriveTransferTask) { fun submitUploadTask(task: OneDriveTransferTask) {
queue.offer(task) executor.submitTransferTask(task)
} }
} }
@ -38,6 +39,7 @@ object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
.text( .text(
""" """
OneDrive 中转任务开始执行 OneDrive 中转任务开始执行
正在获取文件信息...
文件名 ${progress.currentTask.document.fileName} 文件名 ${progress.currentTask.document.fileName}
""".trimIndent() """.trimIndent()
) )
@ -115,6 +117,10 @@ class OneDriveTransferTaskExecutor(
} }
} }
fun submitTransferTask(task: OneDriveTransferTask) {
taskQueue.offer(task)
}
private fun createWorker(id: Int): Runnable = Runnable { private fun createWorker(id: Int): Runnable = Runnable {
logger.info { "下载线程 $id 已启动." } logger.info { "下载线程 $id 已启动." }
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
@ -140,11 +146,31 @@ class OneDriveTransferTaskExecutor(
} }
private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) { private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val file = task.bot.execute( var tempFile: org.telegram.telegrambots.meta.api.objects.File
logger.debug { "开始获取文件信息..." }
var retryCount = 0
val maxRetryCount = 100
while (true) {
try {
tempFile = task.bot.execute(
GetFile.builder() GetFile.builder()
.fileId(task.document.fileId) .fileId(task.document.fileId)
.build() .build()
) )
break
} catch (e: Exception) {
if (e !is SocketTimeoutException) {
throw e
}
if (++retryCount > maxRetryCount) {
throw IllegalStateException("GetFile 等待超时", e)
}
logger.debug { "GetFile 接口调用超时, API 端可能正在下载文件, 5 秒后重新调用该接口...($retryCount/$maxRetryCount)" }
Thread.sleep(10000)
}
}
val file = tempFile
logger.debug { "成功获取文件信息:$tempFile" }
val graphClient = task.service.createGraphClient(task.tgUserId) val graphClient = task.service.createGraphClient(task.tgUserId)
val drive = graphClient.drives(task.onedriveId).buildRequest().get() ?: throw IllegalStateException("无法获取 OneDrive 驱动器.") val drive = graphClient.drives(task.onedriveId).buildRequest().get() ?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
@ -156,7 +182,7 @@ class OneDriveTransferTaskExecutor(
logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" } logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
if (file.fileSize < 4 * 1024 * 1024) { if (file.fileSize < 4 * 1024 * 1024) {
val fileBytes = task.bot.downloadFileAsStream(file).readAllBytes() val fileBytes = getFileStream(task.bot, file.filePath).readAllBytes()
val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content() val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content()
.buildRequest() .buildRequest()
.put(fileBytes) .put(fileBytes)
@ -178,7 +204,7 @@ class OneDriveTransferTaskExecutor(
} }
} }
val fileStream = task.bot.downloadFileAsStream(file) val fileStream = getFileStream(task.bot, file.filePath)
val largeFileUploadTask = LargeFileUploadTask( val largeFileUploadTask = LargeFileUploadTask(
uploadSession, uploadSession,
@ -192,6 +218,20 @@ class OneDriveTransferTaskExecutor(
} }
} }
private fun getFileStream(bot: BaseAbilityBot, filePath: String): InputStream {
try {
val localFile = File(filePath)
if (localFile.exists()) {
logger.debug { "本地存在文件:$filePath" }
return localFile.inputStream()
}
} catch (e: Exception) {
logger.debug(e) { "无法从本地获取文件:$filePath" }
}
return bot.downloadFileAsStream(filePath)
}
private fun checkAndGetPath(graphClient: GraphServiceClient<Request>, driveId: String, storagePath: String, originFileName: String): String { private fun checkAndGetPath(graphClient: GraphServiceClient<Request>, driveId: String, storagePath: String, originFileName: String): String {
val folderPath = checkAndCreateFolder(graphClient, driveId, storagePath) val folderPath = checkAndCreateFolder(graphClient, driveId, storagePath)
val fileName = checkFileName(graphClient, driveId, folderPath, originFileName) val fileName = checkFileName(graphClient, driveId, folderPath, originFileName)

View File

@ -134,6 +134,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
} catch (e: OneDriveNotLoginException) { } catch (e: OneDriveNotLoginException) {
it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId()) it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId())
return@action return@action
} catch (e: Exception) {
null
} }
val drives = onedriveService.listDriversByUserId(it.chatId()) val drives = onedriveService.listDriversByUserId(it.chatId())
if (drives.isEmpty()) { if (drives.isEmpty()) {

View File

@ -8,14 +8,13 @@ import com.microsoft.graph.authentication.IAuthenticationProvider
import com.microsoft.graph.httpcore.HttpClients import com.microsoft.graph.httpcore.HttpClients
import com.microsoft.graph.models.Drive import com.microsoft.graph.models.Drive
import com.microsoft.graph.requests.GraphServiceClient import com.microsoft.graph.requests.GraphServiceClient
import mu.KotlinLogging
import okhttp3.Request import okhttp3.Request
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.transactions.transaction
import org.telegram.abilitybots.api.bot.BaseAbilityBot import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.objects.Document import org.telegram.telegrambots.meta.api.objects.Document
import java.net.InetSocketAddress
import java.net.Proxy
import java.net.URL import java.net.URL
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
@ -25,6 +24,8 @@ class OneDriveTransferService(
private val config: ExtensionConfig, private val config: ExtensionConfig,
private val db: Database private val db: Database
) { ) {
private val logger = KotlinLogging.logger { }
val accountManager: OneDriveTransferSettingManager val accountManager: OneDriveTransferSettingManager
private val authClient: ConfidentialClientApplication = ConfidentialClientApplication.builder( private val authClient: ConfidentialClientApplication = ConfidentialClientApplication.builder(
config.clientId, config.clientId,
@ -32,7 +33,6 @@ class OneDriveTransferService(
) )
.authority(OneDriveTransferSettingManager.AUTHORITY) .authority(OneDriveTransferSettingManager.AUTHORITY)
.setTokenCacheAccessAspect(DatabaseTokenCache(db)) .setTokenCacheAccessAspect(DatabaseTokenCache(db))
.proxy(Proxy(Proxy.Type.HTTP, InetSocketAddress.createUnresolved("127.0.0.1", 1089)))
.build() .build()
init { init {
@ -67,6 +67,16 @@ class OneDriveTransferService(
accountManager.updateAccount(userId, redirectUrl) accountManager.updateAccount(userId, redirectUrl)
fun listDriversByUserId(userId: Long): List<Drive> { fun listDriversByUserId(userId: Long): List<Drive> {
val sites = try {
val sites = createGraphClient(userId).sites()
.buildRequest().get()?.currentPage ?: emptyList()
sites.map {
it.drive
}.filterNotNull().toList()
} catch (e: Exception) {
logger.debug(e) { "获取 OneDrive 站点失败, 可能是用户没有权限或不是组织账号." }
emptyList()
}
val drive = createGraphClient(userId).me().drive().buildRequest().get() val drive = createGraphClient(userId).me().drive().buildRequest().get()
val drives = createGraphClient(userId).drives() val drives = createGraphClient(userId).drives()
.buildRequest() .buildRequest()
@ -76,6 +86,7 @@ class OneDriveTransferService(
add(drive) add(drive)
} }
addAll(drives) addAll(drives)
addAll(sites)
} }
} }