scext-onedrive-transfer/src/main/kotlin/OneDriveTransferCenter.kt

416 lines
16 KiB
Kotlin

package net.lamgc.scext.onedrive_transfer
import com.google.common.util.concurrent.AtomicDouble
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.microsoft.graph.http.GraphServiceException
import com.microsoft.graph.models.DriveItem
import com.microsoft.graph.models.DriveItemCreateUploadSessionParameterSet
import com.microsoft.graph.models.DriveItemUploadableProperties
import com.microsoft.graph.requests.GraphServiceClient
import com.microsoft.graph.tasks.IProgressCallback
import com.microsoft.graph.tasks.LargeFileUploadTask
import mu.KotlinLogging
import okhttp3.Request
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.GetFile
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import org.telegram.telegrambots.meta.api.objects.Document
import org.telegram.telegrambots.meta.exceptions.TelegramApiException
import java.io.File
import java.io.InputStream
import java.net.SocketTimeoutException
import java.util.*
import java.util.concurrent.*
object OneDriveTransferCenter {
private val executor =
OneDriveTransferTaskExecutor(2, DefaultOneDriveTransferCallback, ArrayBlockingQueue<OneDriveTransferTask>(100))
fun submitUploadTask(task: OneDriveTransferTask) {
executor.submitTransferTask(task)
}
}
object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
override fun onTransferTaskCreated(task: OneDriveTransferTask) {
val message = task.bot.execute(
SendMessage.builder()
.text(
"""
OneDrive 中转任务已创建
正在排队中...
文件名: ${task.document.fileName}
""".trimIndent()
)
.chatId(task.extra["chatId"].toString().toLong())
.replyToMessageId(task.extra["messageId"].toString().toInt())
.build()
)
task.extra["infoMessageId"] = message.messageId
}
override fun onTransferStart(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务开始执行
正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件)
文件名: ${progress.currentTask.document.fileName}
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onProgress(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行中
文件名: ${progress.currentTask.document.fileName}
进度:${String.format("%.3f", progress.progress.get() * 100)}%
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行失败
文件名: ${task.document.fileName}
错误信息:${progress.exception?.message}
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行成功
文件名: ${task.document.fileName}
OneDrive 文件路径:${progress.driveItem?.webUrl}
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
}
interface OneDriveTransferCallback {
fun onTransferTaskCreated(task: OneDriveTransferTask)
fun onTransferStart(progress: OneDriveTransferWorkerProgress)
fun onProgress(progress: OneDriveTransferWorkerProgress)
fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
}
class OneDriveTransferTaskExecutor(
threadNum: Int,
private val callback: OneDriveTransferCallback,
private val taskQueue: BlockingQueue<OneDriveTransferTask>
) : ThreadPoolExecutor(
threadNum, threadNum, 0, TimeUnit.SECONDS,
ArrayBlockingQueue(50),
ThreadFactoryBuilder()
.setNameFormat("onedrive-transfer-worker-%d")
.build()
) {
private val logger = KotlinLogging.logger { }
val threadStatusMap = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
init {
for (i in 0 until threadNum) {
execute(createWorker(i))
}
}
fun submitTransferTask(task: OneDriveTransferTask) {
taskQueue.offer(task)
callback.onTransferTaskCreated(task)
}
private fun createWorker(id: Int): Runnable = Runnable {
logger.info { "下载线程 $id 已启动." }
while (!Thread.interrupted()) {
val task = taskQueue.take()
logger.info { "线程 $id 开始执行任务: ${task.document.fileName}" }
val progress = OneDriveTransferWorkerProgress(task)
threadStatusMap[id] = progress
try {
callback.onTransferStart(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务开始回调失败: ${e.message}" }
}
try {
doTransferFile(task, progress)
logger.info { "OneDrive 中转任务执行成功: ${task.document.fileName}" }
try {
callback.onTransferSuccess(task, progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务成功回调失败: ${e.message}" }
}
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务执行失败: ${e.message}" }
callback.onTransferFailure(task, progress.apply {
this.exception = e
})
} finally {
logger.info { "线程 $id 任务执行完毕: ${task.document.fileName}" }
threadStatusMap.remove(id)
}
}
}
private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
var tempFile: org.telegram.telegrambots.meta.api.objects.File
logger.debug { "开始获取文件信息..." }
var retryCount = 0
val maxRetryCount = 100
while (true) {
try {
tempFile = task.bot.execute(
GetFile.builder()
.fileId(task.document.fileId)
.build()
)
break
} catch (e: TelegramApiException) {
if (e.cause !is SocketTimeoutException) {
throw e
}
if (++retryCount > maxRetryCount) {
throw IllegalStateException("GetFile 等待超时", e)
}
logger.debug { "GetFile 接口调用超时, API 端可能正在下载文件, 5 秒后重新调用该接口...($retryCount/$maxRetryCount)" }
Thread.sleep(10000)
}
}
val file = tempFile
logger.debug { "成功获取文件信息:$tempFile" }
val graphClient = task.service.createGraphClient(task.tgUserId)
val drive = graphClient.drives(task.onedriveId).buildRequest().get()
?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
if (file.fileSize > drive.quota!!.remaining!!) {
throw IllegalStateException("OneDrive 剩余空间不足.")
}
val filePath = checkAndGetPath(graphClient, task.onedriveId, task.storagePath, task.document.fileName)
logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
if (file.fileSize < 4 * 1024 * 1024) {
val fileBytes = getFileStream(task.bot, file.filePath).readAllBytes()
val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content()
.buildRequest()
.put(fileBytes)
progress.driveItem = driveItem
progress.progress.set(1.0)
} else {
val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
.createUploadSession(
DriveItemCreateUploadSessionParameterSet.newBuilder()
.withItem(DriveItemUploadableProperties())
.build()
)
.buildRequest()
.post() ?: throw IllegalStateException("无法创建 OneDrive 上传会话.")
val progressCallback = IProgressCallback { current, max ->
progress.progress.set(current.toDouble() / max.toDouble())
try {
callback.onProgress(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务进度回调失败: ${e.message}" }
}
}
val fileStream = getFileStream(task.bot, file.filePath)
val largeFileUploadTask = LargeFileUploadTask(
uploadSession,
graphClient,
fileStream,
file.fileSize,
DriveItem::class.java
)
val uploadResult = largeFileUploadTask.upload(327680 * 26, null, progressCallback)
progress.driveItem = uploadResult.responseBody
}
}
private fun getFileStream(bot: BaseAbilityBot, filePath: String): InputStream {
try {
val localFile = File(filePath)
if (localFile.exists()) {
logger.debug { "本地存在文件:$filePath" }
return localFile.inputStream()
}
} catch (e: Exception) {
logger.debug(e) { "无法从本地获取文件:$filePath" }
}
return bot.downloadFileAsStream(filePath)
}
private fun checkAndGetPath(
graphClient: GraphServiceClient<Request>,
driveId: String,
storagePath: String,
originFileName: String
): String {
val folderPath = checkAndCreateFolder(graphClient, driveId, storagePath)
val fileName = checkFileName(graphClient, driveId, folderPath, originFileName)
return "$folderPath$fileName"
}
private fun checkAndCreateFolder(
graphClient: GraphServiceClient<Request>,
driveId: String,
folderPath: String
): String {
if (folderPath.trim() == "/") {
return ""
}
try {
val testPath = if (folderPath.startsWith('/')) {
folderPath.trimStart('/')
} else {
folderPath
}
graphClient.drives(driveId).root().itemWithPath(testPath).buildRequest().get()
logger.debug { "OneDrive 文件夹已存在:$testPath" }
return if (testPath.endsWith('/')) {
testPath
} else {
"$testPath/"
}
} catch (e: GraphServiceException) {
if (e.responseCode != 404) {
throw e
}
}
val path = folderPath.trim('/')
val pathComponents = path.split("/")
logger.debug { "PathComponents = $pathComponents" }
var parentPath: String
var currentPath = "/"
for (component in pathComponents) {
if (component.trim().isEmpty()) {
continue
}
parentPath = currentPath
currentPath += "$component/"
logger.debug { "CurrentPath = $currentPath" }
try {
val driveItem = graphClient.drives(driveId).root().itemWithPath(currentPath).buildRequest().get()
if (driveItem!!.folder == null) {
throw IllegalStateException("OneDrive 中已存在同名文件: $currentPath")
}
} catch (e: GraphServiceException) {
if (e.responseCode == 404) {
try {
val newFolder = DriveItem()
newFolder.name = component
newFolder.folder = com.microsoft.graph.models.Folder()
graphClient.drives(driveId).root().itemWithPath(parentPath).children()
.buildRequest()
.post(newFolder)
} catch (e: GraphServiceException) {
if (e.error?.error?.code != "nameAlreadyExists") {
throw e
}
}
} else {
throw e
}
}
}
if (!currentPath.endsWith("/")) {
currentPath += "/"
}
return currentPath.trimStart('/')
}
private fun checkFileName(
graphClient: GraphServiceClient<Request>,
driveId: String,
folderPath: String,
fileName: String
): String {
try {
graphClient.drives(driveId).root().itemWithPath("$folderPath$fileName").buildRequest().get()
} catch (e: GraphServiceException) {
if (e.responseCode == 404) {
return fileName
} else {
throw e
}
}
val fileNameComponents = fileName.split(".")
val fileNameWithoutExtension = fileNameComponents.subList(0, fileNameComponents.size - 1).joinToString(".")
val fileExtension = fileNameComponents.last()
var i = 1
while (true) {
val newFileName = "$fileNameWithoutExtension ($i).$fileExtension"
try {
graphClient.drives(driveId).root().itemWithPath("$folderPath$newFileName").buildRequest().get()
i++
} catch (e: GraphServiceException) {
if (e.responseCode != 404) {
throw e
}
return newFileName
}
}
}
}
data class OneDriveTransferWorkerProgress(
val currentTask: OneDriveTransferTask,
val progress: AtomicDouble = AtomicDouble(),
var driveItem: DriveItem? = null,
var exception: Exception? = null
)
data class OneDriveTransferTask(
val tgUserId: Long,
val bot: BaseAbilityBot,
val service: OneDriveTransferService,
val document: Document,
val onedriveId: String,
val storagePath: String,
val extra: MutableMap<String, Any> = mutableMapOf(),
val createdAt: Date = Date()
)