scext-onedrive-transfer/src/main/kotlin/OneDriveTransferCenter.kt

621 lines
25 KiB
Kotlin

package net.lamgc.scext.onedrive_transfer
import com.google.common.util.concurrent.AtomicDouble
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.microsoft.graph.http.GraphServiceException
import com.microsoft.graph.models.DriveItem
import com.microsoft.graph.models.DriveItemCreateUploadSessionParameterSet
import com.microsoft.graph.models.DriveItemUploadableProperties
import com.microsoft.graph.requests.GraphServiceClient
import com.microsoft.graph.tasks.IProgressCallback
import com.microsoft.graph.tasks.LargeFileUploadTask
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import okhttp3.Request
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.GetFile
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import org.telegram.telegrambots.meta.api.objects.Document
import org.telegram.telegrambots.meta.exceptions.TelegramApiException
import java.io.File
import java.io.InputStream
import java.net.SocketTimeoutException
import java.util.*
import java.util.concurrent.*
object OneDriveTransferCenter {
val executor =
OneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, ArrayBlockingQueue(100))
fun submitUploadTask(task: OneDriveTransferTask) {
executor.submitTransferTask(task)
}
fun cancelUploadTask(task: OneDriveTransferTask): Boolean = executor.cancelTransferTask(task)
fun getQueueingTaskCount(): Int = executor.taskQueue.size
fun getProcessingTasks(): Map<Int, OneDriveTransferWorkerProgress> =
Collections.unmodifiableMap(executor.threadStatusMap)
fun getProcessThreadNum(): Int = executor.threadNum
}
object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
private val logger = KotlinLogging.logger { }
override fun onTransferTaskCreated(task: OneDriveTransferTask) {
if (task.extra["infoMessageId"] != null) {
val deleteMessage = DeleteMessage.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.build()
try {
task.bot.execute(deleteMessage)
} catch (e: Exception) {
logger.debug(e) { "删除旧状态消息时出错, 忽略该异常." }
}
}
val msg = if (task.retryCount == 0) {
"""
OneDrive 中转任务已创建
正在排队中...
文件名: ${task.document.fileName}
-------------------------------------------------
#Queuing
""".trimIndent()
} else {
"""
OneDrive 中转任务已创建
正在排队中...
文件名: ${task.document.fileName}
上次错误信息:${(task.extra["lastError"] as Exception?)?.message ?: "(没有错误信息)"}
重试次数:${task.retryCount}
-------------------------------------------------
#Queuing
""".trimIndent()
}
val message = task.bot.execute(
SendMessage.builder()
.text(msg)
.chatId(task.extra["chatId"].toString().toLong())
.replyToMessageId(task.extra["messageId"].toString().toInt())
.build()
)
task.extra["infoMessageId"] = message.messageId
}
override fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务开始执行
正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件)
文件名: ${progress.currentTask.document.fileName}
-------------------------------------------------
#Starting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
正在获取文件信息...
文件名: ${progress.currentTask.document.fileName}
重试次数:$retryCount / $maxRetryCount
-------------------------------------------------
重试并不等同于获取文件失败,由于 Telegram Bot API 需要下载文件后
才会获取文件信息,因此需要重试以等待文件下载完成。
#GettingFileInfo
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGotFileInfo(
progress: OneDriveTransferWorkerProgress,
file: org.telegram.telegrambots.meta.api.objects.File
) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
已获取文件信息,正在创建 OneDrive 上传会话...
文件名: ${progress.currentTask.document.fileName}
文件大小:${file.fileSize / 1024} KB
-------------------------------------------------
#UploadStarting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onUploadProgress(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行中
文件名: ${progress.currentTask.document.fileName}
进度:${String.format("%.3f", progress.progress.get() * 100)}%
-------------------------------------------------
#Processing
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行失败
文件名: ${task.document.fileName}
错误信息:${progress.exception?.message}
重试次数:${task.retryCount}
任务将会追加至队列尾部进行重试。
-------------------------------------------------
#Failure #Error
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
progress.currentTask.extra["lastError"] = progress.exception
}
if (task.retryCount < 5) {
runBlocking {
delay(10000)
task.retryCount++
OneDriveTransferCenter.submitUploadTask(task)
}
}
}
override fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行成功
文件名: ${task.document.fileName}
OneDrive 文件路径:${progress.driveItem?.webUrl}
------------------------------------------------
#Success
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务已取消
文件名: ${task.document.fileName}
-------------------------------------------------
#Cancelled
""".trimIndent()
)
.build().orSendMessage(task.bot, task.extra["messageId"].toString().toInt())
if (newMessage != null) {
task.extra["infoMessageId"] = newMessage.messageId
}
}
}
interface OneDriveTransferCallback {
fun onTransferTaskCreated(task: OneDriveTransferTask)
fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress)
fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int)
fun onGotFileInfo(progress: OneDriveTransferWorkerProgress, file: org.telegram.telegrambots.meta.api.objects.File)
fun onUploadProgress(progress: OneDriveTransferWorkerProgress)
fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?)
}
/**
* OneDrive 中转任务执行器.
* @param threadNum 线程数量.
* @param callback OneDrive 中转任务回调.
* @param taskQueue 任务队列.
* @param chunkSize 上传块大小的倍率, 实际上传块大小为 320 KiB * chunkSize.
*/
class OneDriveTransferTaskExecutor(
val threadNum: Int,
private val callback: OneDriveTransferCallback,
val taskQueue: BlockingQueue<OneDriveTransferTask>,
private val chunkSize: Int = 26
) : ThreadPoolExecutor(
threadNum, threadNum, 0, TimeUnit.SECONDS,
ArrayBlockingQueue(50),
ThreadFactoryBuilder()
.setNameFormat("Transfer Worker %d")
.build()
) {
private val logger = KotlinLogging.logger { }
val threadStatusMap = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
val threadMap = ConcurrentHashMap<Int, Thread>()
init {
if (chunkSize > MAX_CHUNK_SIZE) {
throw IllegalArgumentException("chunkSize 不能大于 $MAX_CHUNK_SIZE")
}
for (i in 0 until threadNum) {
execute(createWorker(i))
}
}
fun submitTransferTask(task: OneDriveTransferTask) {
callback.onTransferTaskCreated(task)
taskQueue.offer(task)
}
fun cancelTransferTask(task: OneDriveTransferTask): Boolean {
if (!taskQueue.remove(task)) {
for (i in 0 until threadNum) {
if (threadStatusMap[i]?.currentTask?.id == task.id) {
threadMap[i]?.interrupt()
return true
}
}
return false
}
try {
callback.onTransferCancelled(task, null)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务取消回调失败: ${e.message}" }
}
return true
}
private fun createWorker(id: Int): Runnable = Runnable {
threadMap[id] = Thread.currentThread()
logger.info { "下载线程 $id 已启动." }
while (!isTerminating) {
val task = taskQueue.take()
logger.info { "线程 $id 开始执行任务: ${task.document.fileName}" }
val progress = OneDriveTransferWorkerProgress(task)
threadStatusMap[id] = progress
try {
callback.onTransferTaskStart(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务开始回调失败: ${e.message}" }
}
try {
doTransferFile(task, progress)
logger.info { "OneDrive 中转任务执行成功: ${task.document.fileName}" }
try {
callback.onTransferSuccess(task, progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务成功回调失败: ${e.message}" }
}
} catch (e: Exception) {
if (e is InterruptedException) {
logger.info { "线程 $id 任务被取消: ${task.document.fileName}" }
try {
callback.onTransferCancelled(task, progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务取消回调失败: ${e.message}" }
}
continue
}
logger.warn(e) { "OneDrive 中转任务执行失败: ${e.message}" }
progress.status = OneDriveTransferStatus.FAILURE
callback.onTransferFailure(task, progress.apply {
this.exception = e
})
} finally {
logger.info { "线程 $id 任务执行完毕: ${task.document.fileName}" }
threadStatusMap.remove(id)
}
}
logger.info { "下载线程 $id 已停止." }
}
private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
var tempFile: org.telegram.telegrambots.meta.api.objects.File
logger.debug { "开始获取文件信息..." }
progress.status = OneDriveTransferStatus.GETTING_FILE_INFO
var retryCount = 0
val maxRetryCount = 100
while (true) {
try {
callback.onGettingFileInfo(progress, retryCount, maxRetryCount)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务获取文件信息回调失败: ${e.message}" }
}
try {
tempFile = task.bot.execute(
GetFile.builder()
.fileId(task.document.fileId)
.build()
)
break
} catch (e: TelegramApiException) {
if (e.cause !is SocketTimeoutException) {
throw e
}
if (++retryCount > maxRetryCount) {
throw IllegalStateException("GetFile 等待超时", e)
}
logger.debug { "GetFile 接口调用超时, API 端可能正在下载文件, 5 秒后重新调用该接口...($retryCount/$maxRetryCount)" }
Thread.sleep(10000)
}
}
val file = tempFile
logger.debug { "成功获取文件信息:$tempFile" }
try {
callback.onGotFileInfo(progress, file)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务成功获取文件信息回调失败: ${e.message}" }
}
progress.status = OneDriveTransferStatus.CREATING_UPLOAD_SESSION
val graphClient = task.service.createGraphClient(task.tgUserId)
val drive = graphClient.drives(task.onedriveId).buildRequest().get()
?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
if (file.fileSize > drive.quota!!.remaining!!) {
throw IllegalStateException("OneDrive 剩余空间不足.")
}
val filePath = checkAndGetPath(graphClient, task.onedriveId, task.storagePath, task.document.fileName)
logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
if (file.fileSize < 4 * 1024 * 1024) {
val fileBytes = getFileStream(task.bot, file.filePath).readAllBytes()
progress.status = OneDriveTransferStatus.UPLOADING
val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content()
.buildRequest()
.put(fileBytes)
progress.driveItem = driveItem
progress.progress.set(1.0)
progress.status = OneDriveTransferStatus.SUCCESS
} else {
val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
.createUploadSession(
DriveItemCreateUploadSessionParameterSet.newBuilder()
.withItem(DriveItemUploadableProperties())
.build()
)
.buildRequest()
.post() ?: throw IllegalStateException("无法创建 OneDrive 上传会话.")
val progressCallback = IProgressCallback { current, max ->
progress.progress.set(current.toDouble() / max.toDouble())
try {
callback.onUploadProgress(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务进度回调失败: ${e.message}" }
}
}
val fileStream = getFileStream(task.bot, file.filePath)
val largeFileUploadTask = LargeFileUploadTask(
uploadSession,
graphClient,
fileStream,
file.fileSize,
DriveItem::class.java
)
progress.status = OneDriveTransferStatus.UPLOADING
val uploadResult = largeFileUploadTask.upload(ONCE_CHUNK_SIZE * chunkSize, null, progressCallback)
progress.driveItem = uploadResult.responseBody
progress.progress.set(1.0)
progress.status = OneDriveTransferStatus.SUCCESS
}
}
private fun getFileStream(bot: BaseAbilityBot, filePath: String): InputStream {
try {
val localFile = File(filePath)
if (localFile.exists()) {
logger.debug { "本地存在文件:$filePath" }
return localFile.inputStream()
}
} catch (e: Exception) {
logger.debug(e) { "无法从本地获取文件:$filePath" }
}
return bot.downloadFileAsStream(filePath)
}
private fun checkAndGetPath(
graphClient: GraphServiceClient<Request>,
driveId: String,
storagePath: String,
originFileName: String
): String {
val folderPath = checkAndCreateFolder(graphClient, driveId, storagePath)
val fileName = checkFileName(graphClient, driveId, folderPath, originFileName)
return "$folderPath$fileName"
}
private fun checkAndCreateFolder(
graphClient: GraphServiceClient<Request>,
driveId: String,
folderPath: String
): String {
if (folderPath.trim() == "/") {
return ""
}
try {
val testPath = if (folderPath.startsWith('/')) {
folderPath.trimStart('/')
} else {
folderPath
}
graphClient.drives(driveId).root().itemWithPath(testPath).buildRequest().get()
logger.debug { "OneDrive 文件夹已存在:$testPath" }
return if (testPath.endsWith('/')) {
testPath
} else {
"$testPath/"
}
} catch (e: GraphServiceException) {
if (e.responseCode != 404) {
throw e
}
}
val path = folderPath.trim('/')
val pathComponents = path.split("/")
logger.debug { "PathComponents = $pathComponents" }
var parentPath: String
var currentPath = "/"
for (component in pathComponents) {
if (component.trim().isEmpty()) {
continue
}
parentPath = currentPath
currentPath += "$component/"
logger.debug { "CurrentPath = $currentPath" }
try {
val driveItem = graphClient.drives(driveId).root().itemWithPath(currentPath).buildRequest().get()
if (driveItem!!.folder == null) {
throw IllegalStateException("OneDrive 中已存在同名文件: $currentPath")
}
} catch (e: GraphServiceException) {
if (e.responseCode == 404) {
try {
val newFolder = DriveItem()
newFolder.name = component
newFolder.folder = com.microsoft.graph.models.Folder()
graphClient.drives(driveId).root().itemWithPath(parentPath).children()
.buildRequest()
.post(newFolder)
} catch (e: GraphServiceException) {
if (e.error?.error?.code != "nameAlreadyExists") {
throw e
}
}
} else {
throw e
}
}
}
if (!currentPath.endsWith("/")) {
currentPath += "/"
}
return currentPath.trimStart('/')
}
private fun checkFileName(
graphClient: GraphServiceClient<Request>,
driveId: String,
folderPath: String,
fileName: String
): String {
try {
graphClient.drives(driveId).root().itemWithPath("$folderPath$fileName").buildRequest().get()
} catch (e: GraphServiceException) {
if (e.responseCode == 404) {
return fileName
} else {
throw e
}
}
val fileNameComponents = fileName.split(".")
val fileNameWithoutExtension = fileNameComponents.subList(0, fileNameComponents.size - 1).joinToString(".")
val fileExtension = fileNameComponents.last()
var i = 1
while (true) {
val newFileName = "$fileNameWithoutExtension ($i).$fileExtension"
try {
graphClient.drives(driveId).root().itemWithPath("$folderPath$newFileName").buildRequest().get()
i++
} catch (e: GraphServiceException) {
if (e.responseCode != 404) {
throw e
}
return newFileName
}
}
}
companion object {
val ONCE_CHUNK_SIZE = 320 * 1024
val MAX_CHUNK_SIZE = 192
}
}
enum class OneDriveTransferStatus {
QUEUING,
GETTING_FILE_INFO,
CREATING_UPLOAD_SESSION,
UPLOADING,
SUCCESS,
FAILURE
}
data class OneDriveTransferWorkerProgress(
val currentTask: OneDriveTransferTask,
val progress: AtomicDouble = AtomicDouble(),
var driveItem: DriveItem? = null,
var exception: Exception? = null,
var status: OneDriveTransferStatus = OneDriveTransferStatus.QUEUING
)
data class OneDriveTransferTask(
val tgUserId: Long,
val bot: BaseAbilityBot,
val service: OneDriveTransferService,
val document: Document,
val onedriveId: String,
val storagePath: String,
val extra: MutableMap<String, Any?> = mutableMapOf(),
val createdAt: Date = Date(),
var retryCount: Int = 0,
val id: UUID = UUID.randomUUID()
)