feat: 还在进行的主从节点重构工作.

This commit is contained in:
2025-01-13 10:35:29 +08:00
parent 68a1c23469
commit c0126c900c
27 changed files with 798 additions and 136 deletions

View File

@ -0,0 +1,80 @@
plugins {
kotlin("jvm") version "2.1.0"
`maven-publish`
alias(libs.plugins.ktor)
}
group = "net.lamgc.scext"
version = "0.1.0-SNAPSHOT"
dependencies {
compileOnly("org.slf4j:slf4j-api:2.0.10")
compileOnly("io.github.microutils:kotlin-logging:3.0.5")
compileOnly("net.lamgc:scalabot-extension:0.8.0-1")
implementation("com.fasterxml.jackson.core:jackson-core:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.16.1")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.16.1")
val exposedVersion = "0.45.0"
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-crypt:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-kotlin-datetime:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion")
implementation("org.xerial:sqlite-jdbc:3.44.1.0")
implementation("mysql:mysql-connector-java:8.0.33")
implementation("com.zaxxer:HikariCP:5.1.0")
implementation("com.microsoft.graph:microsoft-graph:5.77.0")
implementation("com.azure:azure-identity:1.14.2")
implementation(libs.ktor.server.content.negotiation)
implementation(libs.ktor.server.core)
implementation(libs.ktor.serialization.gson)
implementation(libs.ktor.server.swagger)
implementation(libs.ktor.server.openapi)
implementation(libs.ktor.server.hsts)
implementation(libs.ktor.server.auth)
implementation(libs.ktor.server.netty)
implementation(libs.ktor.server.config.yaml)
testImplementation("org.jetbrains.kotlin:kotlin-test")
implementation(project(":onedrive-transfer-remote-common"))
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(17)
}
publishing {
repositories {
maven("https://git.lamgc.me/api/packages/LamGC/maven") {
credentials {
username = project.properties["repo.credentials.self-git.username"].toString()
password = project.properties["repo.credentials.self-git.password"].toString()
}
}
mavenLocal()
}
publications {
create<MavenPublication>("maven") {
from(components["java"])
pom {
name.set("ScalaExt-OneDriveTransfer")
description.set("将 Telegram 中的文件转存至 OneDrive.")
}
}
}
}

View File

@ -0,0 +1,89 @@
package net.lamgc.scext.onedrive_transfer
import com.microsoft.aad.msal4j.ITokenCacheAccessAspect
import com.microsoft.aad.msal4j.ITokenCacheAccessContext
import org.jetbrains.exposed.dao.IntEntity
import org.jetbrains.exposed.dao.IntEntityClass
import org.jetbrains.exposed.dao.LongEntity
import org.jetbrains.exposed.dao.LongEntityClass
import org.jetbrains.exposed.dao.id.EntityID
import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.dao.id.LongIdTable
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.javatime.datetime
import org.jetbrains.exposed.sql.transactions.transaction
object RegisteredRemoteTransferWorkers : IntIdTable() {
val workerName = varchar("worker_name", 128).uniqueIndex()
val token = varchar("token", 256).uniqueIndex()
val registeredAt = datetime("registered_at")
val lastContactAt = datetime("last_contact_at")
}
class RegisteredRemoteTransferWorker(id: EntityID<Int>) : IntEntity(id) {
var workerName by RegisteredRemoteTransferWorkers.workerName
var token by RegisteredRemoteTransferWorkers.token
var registeredAt by RegisteredRemoteTransferWorkers.registeredAt
var lastContactAt by RegisteredRemoteTransferWorkers.lastContactAt
companion object : IntEntityClass<RegisteredRemoteTransferWorker>(RegisteredRemoteTransferWorkers)
}
object OneDriveTransferSettings : LongIdTable() {
val telegramUserId = long("tg_user_id").uniqueIndex()
val accountId = varchar("account_id", 128)
val userName = varchar("user_name", 96)
val driveId = varchar("drive_id", 256)
val storagePath = varchar("storage_path", 512)
}
class OneDriveTransferSetting(id: EntityID<Long>) : LongEntity(id) {
var telegramUserId by OneDriveTransferSettings.telegramUserId
var accountId by OneDriveTransferSettings.accountId
var userName by OneDriveTransferSettings.userName
var driveId by OneDriveTransferSettings.driveId
var storagePath by OneDriveTransferSettings.storagePath
companion object : LongEntityClass<OneDriveTransferSetting>(OneDriveTransferSettings)
}
object TokenCaches : LongIdTable() {
val clientId = varchar("client_id", 256).uniqueIndex()
val cache = text("cache_data")
}
class TokenCache(id: EntityID<Long>) : LongEntity(id) {
var clientId by TokenCaches.clientId
var cache by TokenCaches.cache
companion object : LongEntityClass<TokenCache>(TokenCaches)
}
class DatabaseTokenCache(private val db: Database) : ITokenCacheAccessAspect {
override fun beforeCacheAccess(context: ITokenCacheAccessContext) {
transaction(db) {
TokenCache.find { TokenCaches.clientId eq context.clientId() }.firstOrNull()?.let {
context.tokenCache().deserialize(it.cache)
}
}
}
override fun afterCacheAccess(context: ITokenCacheAccessContext) {
if (!context.hasCacheChanged()) {
return
}
transaction(db) {
val existCache =
TokenCache.find { TokenCaches.clientId eq context.clientId() }.firstOrNull()
if (existCache == null) {
TokenCache.new {
clientId = context.clientId()
cache = context.tokenCache().serialize()
}
} else {
existCache.cache = context.tokenCache().serialize()
}
}
}
}

View File

@ -0,0 +1,252 @@
package net.lamgc.scext.onedrive_transfer
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.util.concurrent.RateLimiter
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import java.util.*
@Suppress("UnstableApiUsage")
object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
private val logger = KotlinLogging.logger { }
private val progressUpdateRateLimiterMap = WeakHashMap<BaseAbilityBot, RateLimiter>()
private val progressUpdateUserRateLimiterMap = CacheBuilder.newBuilder()
.expireAfterAccess(15, java.util.concurrent.TimeUnit.MINUTES)
.softValues()
.initialCapacity(12)
.concurrencyLevel(4)
.build(object : CacheLoader<BotUserRateLimitToken, RateLimiter>() {
override fun load(key: BotUserRateLimitToken): RateLimiter {
return RateLimiter.create(3.0)
}
})
private fun BaseAbilityBot.rateLimiterPerBot(): RateLimiter {
return progressUpdateRateLimiterMap[this] ?: RateLimiter.create(25.0).also {
progressUpdateRateLimiterMap[this] = it
}
}
private fun BaseAbilityBot.rateLimiterPerUser(userId: Long): RateLimiter {
val token = BotUserRateLimitToken(this, userId)
return progressUpdateUserRateLimiterMap[token]
}
private fun BaseAbilityBot.tryAcquireRateLimit(userId: Long): Boolean {
return if (rateLimiterPerUser(userId).tryAcquire()) {
rateLimiterPerBot().tryAcquire()
} else {
false
}
}
override fun onTransferTaskCreated(task: OneDriveTransferTask) {
if (task.extra["infoMessageId"] != null) {
val deleteMessage = DeleteMessage.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.build()
try {
task.bot.telegramClient.execute(deleteMessage)
} catch (e: Exception) {
logger.debug(e) { "删除旧状态消息时出错, 忽略该异常." }
}
}
val msg = if (task.retryCount == 0) {
"""
OneDrive 中转任务已创建
正在排队中...
文件名: ${task.document.fileName}
-------------------------------------------------
#Queuing
""".trimIndent()
} else {
"""
OneDrive 中转任务已创建
正在排队中...
文件名: ${task.document.fileName}
上次错误信息:${(task.extra["lastError"] as Exception?)?.message ?: "(没有错误信息)"}
重试次数:${task.retryCount}
-------------------------------------------------
#Queuing
""".trimIndent()
}
val message = task.bot.telegramClient.execute(
SendMessage.builder()
.text(msg)
.chatId(task.extra["chatId"].toString().toLong())
.replyToMessageId(task.extra["messageId"].toString().toInt())
.build()
)
task.extra["infoMessageId"] = message.messageId
}
override fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务开始执行
正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件)
文件名: ${progress.currentTask.document.fileName}
-------------------------------------------------
#Starting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
正在获取文件信息...
文件名: ${progress.currentTask.document.fileName}
重试次数:$retryCount / $maxRetryCount
-------------------------------------------------
重试并不等同于获取文件失败,由于 Telegram Bot API 需要下载文件后
才会获取文件信息,因此需要重试以等待文件下载完成。
#GettingFileInfo
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGotFileInfo(
progress: OneDriveTransferWorkerProgress,
file: org.telegram.telegrambots.meta.api.objects.File
) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
已获取文件信息,正在创建 OneDrive 上传会话...
文件名: ${progress.currentTask.document.fileName}
文件大小:${file.fileSize / 1024} KB
-------------------------------------------------
#UploadStarting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onUploadProgress(progress: OneDriveTransferWorkerProgress) {
if (!progress.currentTask.bot.tryAcquireRateLimit(progress.currentTask.tgUserId)) {
return
}
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行中
文件名: ${progress.currentTask.document.fileName}
进度:${String.format("%.3f", progress.progress.get() * 100)}%
-------------------------------------------------
#Processing
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行失败
文件名: ${task.document.fileName}
错误信息:${progress.exception?.message}
重试次数:${task.retryCount}
任务将会追加至队列尾部进行重试。
-------------------------------------------------
#Failure #Error
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
progress.currentTask.extra["lastError"] = progress.exception
}
if (task.retryCount < 5) {
runBlocking {
delay(10000)
task.retryCount++
OneDriveTransferCenter.submitUploadTask(task)
}
}
}
override fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行成功
文件名: ${task.document.fileName}
OneDrive 文件路径:${progress.driveItem?.webUrl}
------------------------------------------------
#Success
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务已取消
文件名: ${task.document.fileName}
-------------------------------------------------
#Cancelled
""".trimIndent()
)
.build().orSendMessage(task.bot, task.extra["messageId"].toString().toInt())
if (newMessage != null) {
task.extra["infoMessageId"] = newMessage.messageId
}
}
}
private data class BotUserRateLimitToken(
val bot: BaseAbilityBot,
val userId: Long
)

View File

@ -0,0 +1,3 @@
package net.lamgc.scext.onedrive_transfer
class OneDriveNotLoginException : Exception("OneDrive 未登录")

View File

@ -0,0 +1,23 @@
package net.lamgc.scext.onedrive_transfer
data class ExtensionConfig(
val clientId: String = "",
val clientSecret: String = "",
val useCommandPrefix: Boolean = true,
val maxFileSize: Long = 1024L * 1024 * 1024 * 4,
val maxTransferSize: Long = 1024L * 1024 * 1024 * 20,
val centralSetting: CentralSetting = CentralSetting()
)
/**
* @property enable 是否启用主从模式.
* @property port Web 服务器端口.
* @property baseUrl Web 服务器的对外 URL.
* @property registerToken 注册 Worker 所使用的 Token.
*/
data class CentralSetting(
val enable: Boolean = false,
val port: Int = 24860,
val baseUrl: String = "http://localhost:${port}",
val registerToken: String = ""
)

View File

@ -0,0 +1,17 @@
package net.lamgc.scext.onedrive_transfer
import net.lamgc.scalabot.extension.BotExtensionCreateOptions
import net.lamgc.scalabot.extension.BotExtensionFactory
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.abilitybots.api.util.AbilityExtension
import java.io.File
class ExtensionFactory : BotExtensionFactory {
override fun createExtensionInstance(
bot: BaseAbilityBot,
dataFolder: File,
createOptions: BotExtensionCreateOptions
): AbilityExtension {
return OneDriveTransferExtension(bot, dataFolder)
}
}

View File

@ -0,0 +1,493 @@
package net.lamgc.scext.onedrive_transfer
import com.google.common.util.concurrent.AtomicDouble
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.microsoft.graph.http.GraphServiceException
import com.microsoft.graph.models.DriveItem
import com.microsoft.graph.models.DriveItemCreateUploadSessionParameterSet
import com.microsoft.graph.models.DriveItemUploadableProperties
import com.microsoft.graph.requests.GraphServiceClient
import com.microsoft.graph.tasks.IProgressCallback
import com.microsoft.graph.tasks.LargeFileUploadTask
import mu.KotlinLogging
import net.lamgc.scext.onedrive_transfer.remote.RemoteOneDriveTransferExecutorController
import okhttp3.Request
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.GetFile
import org.telegram.telegrambots.meta.api.objects.Document
import org.telegram.telegrambots.meta.exceptions.TelegramApiException
import java.io.File
import java.io.InputStream
import java.net.SocketTimeoutException
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicReference
object OneDriveTransferCenter {
private val executorRef = AtomicReference<OneDriveTransferTaskExecutor>()
private val executor: OneDriveTransferTaskExecutor
get() = executorRef.get() ?: throw IllegalStateException("OneDriveTransferCenter 未初始化.")
fun initial(config: ExtensionConfig) {
val taskExecutor = if (config.centralSetting.enable) {
RemoteOneDriveTransferExecutorController(config.centralSetting, DefaultOneDriveTransferCallback)
} else {
LocalOneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback)
}
executorRef.set(taskExecutor)
}
fun submitUploadTask(task: OneDriveTransferTask): Boolean = executor.submitTransferTask(task)
fun cancelUploadTask(task: OneDriveTransferTask): Boolean = executor.cancelTransferTask(task)
fun getQueueingTaskCount(): Int = executor.getQueuedTransferTasks().size
fun getProcessingTasks(): Map<Int, OneDriveTransferWorkerProgress> =
Collections.unmodifiableMap(executor.getWorkingTasks())
fun getWorkerCount(): Int = executor.getWorkerCount()
}
interface OneDriveTransferCallback {
fun onTransferTaskCreated(task: OneDriveTransferTask)
fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress)
fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int)
fun onGotFileInfo(progress: OneDriveTransferWorkerProgress, file: org.telegram.telegrambots.meta.api.objects.File)
fun onUploadProgress(progress: OneDriveTransferWorkerProgress)
fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?)
}
interface OneDriveTransferTaskExecutor {
fun submitTransferTask(task: OneDriveTransferTask): Boolean
fun cancelTransferTask(task: OneDriveTransferTask): Boolean
fun getQueuedTransferTasks(): List<OneDriveTransferTask>
fun getWorkerCount(): Int
fun getWorkingTasks(): Map<Int, OneDriveTransferWorkerProgress>
}
/**
* OneDrive 中转任务执行器.
* @param threadNum 线程数量.
* @param callback OneDrive 中转任务回调.
* @param taskQueue 任务队列.
* @param chunkSize 上传块大小的倍率, 实际上传块大小为 320 KiB * chunkSize.
*/
class LocalOneDriveTransferTaskExecutor(
private val threadNum: Int,
private val callback: OneDriveTransferCallback,
private val taskQueue: BlockingQueue<OneDriveTransferTask> = LinkedBlockingQueue(),
private val chunkSize: Int = 26
) : ThreadPoolExecutor(
threadNum, threadNum, 0, TimeUnit.SECONDS,
ArrayBlockingQueue(threadNum * 2),
ThreadFactoryBuilder()
.setNameFormat("Transfer Worker %d")
.build()
), OneDriveTransferTaskExecutor {
private val logger = KotlinLogging.logger { }
private val threadStatusMap = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
private val threadMap = ConcurrentHashMap<Int, Thread>()
init {
if (chunkSize > MAX_CHUNK_SIZE) {
throw IllegalArgumentException("chunkSize 不能大于 $MAX_CHUNK_SIZE")
}
for (i in 0 until threadNum) {
execute(createWorker(i))
}
}
override fun submitTransferTask(task: OneDriveTransferTask): Boolean {
if (taskQueue.remainingCapacity() > 0) {
callback.onTransferTaskCreated(task)
return taskQueue.offer(task)
}
return false
}
override fun cancelTransferTask(task: OneDriveTransferTask): Boolean {
if (!taskQueue.remove(task)) {
for (i in 0 until threadNum) {
if (threadStatusMap[i]?.currentTask?.id == task.id) {
threadMap[i]?.interrupt()
return true
}
}
return false
}
try {
callback.onTransferCancelled(task, null)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务取消回调失败: ${e.message}" }
}
return true
}
override fun getQueuedTransferTasks(): List<OneDriveTransferTask> = taskQueue.toList()
override fun getWorkerCount(): Int = threadNum
override fun getWorkingTasks(): Map<Int, OneDriveTransferWorkerProgress> = threadStatusMap
private fun createWorker(id: Int): Runnable = Runnable {
threadMap[id] = Thread.currentThread()
logger.info { "下载线程 $id 已启动." }
while (!isTerminating) {
val task = taskQueue.take()
logger.info { "线程 $id 开始执行任务: ${task.document.fileName}" }
val progress = OneDriveTransferWorkerProgress(task)
threadStatusMap[id] = progress
try {
callback.onTransferTaskStart(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务开始回调失败: ${e.message}" }
}
try {
doTransferFile(task, progress)
logger.info { "OneDrive 中转任务执行成功: ${task.document.fileName}" }
try {
callback.onTransferSuccess(task, progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务成功回调失败: ${e.message}" }
}
} catch (e: Exception) {
if (e is InterruptedException) {
logger.info { "线程 $id 任务被取消: ${task.document.fileName}" }
try {
callback.onTransferCancelled(task, progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务取消回调失败: ${e.message}" }
}
continue
}
logger.warn(e) { "OneDrive 中转任务执行失败: ${e.message}" }
progress.status = OneDriveTransferStatus.FAILURE
try {
callback.onTransferFailure(task, progress.apply {
this.exception = e
})
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务失败回调失败: ${e.message}" }
}
} finally {
logger.info { "线程 $id 任务执行完毕: ${task.document.fileName}" }
threadStatusMap.remove(id)
}
}
logger.info { "下载线程 $id 已停止." }
}
private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
var tempFile: org.telegram.telegrambots.meta.api.objects.File
logger.debug { "开始获取文件信息..." }
progress.status = OneDriveTransferStatus.GETTING_FILE_INFO
var retryCount = 0
val maxRetryCount = 100
while (true) {
try {
callback.onGettingFileInfo(progress, retryCount, maxRetryCount)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务获取文件信息回调失败: ${e.message}" }
}
try {
tempFile = task.bot.telegramClient.execute(
GetFile.builder()
.fileId(task.document.fileId)
.build()
)
break
} catch (e: TelegramApiException) {
if (e.cause !is SocketTimeoutException) {
throw e
}
if (++retryCount > maxRetryCount) {
throw IllegalStateException("GetFile 等待超时", e)
}
logger.debug { "GetFile 接口调用超时, API 端可能正在下载文件, 5 秒后重新调用该接口...($retryCount/$maxRetryCount)" }
Thread.sleep(10000)
}
}
val file = tempFile
logger.debug { "成功获取文件信息:$tempFile" }
try {
callback.onGotFileInfo(progress, file)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务成功获取文件信息回调失败: ${e.message}" }
}
progress.status = OneDriveTransferStatus.CREATING_UPLOAD_SESSION
val graphClient = task.service.createGraphClient(task.tgUserId)
val drive = graphClient.drives(task.onedriveId).buildRequest().get()
?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
if (file.fileSize > drive.quota!!.remaining!!) {
throw IllegalStateException("OneDrive 剩余空间不足.")
}
val filePath =
OneDriveUtils.checkAndGetPath(graphClient, task.onedriveId, task.storagePath, task.document.fileName)
logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
if (file.fileSize < 4 * 1024 * 1024) {
val fileBytes = getFileStream(task.bot, file.filePath).readAllBytes()
progress.status = OneDriveTransferStatus.UPLOADING
val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content()
.buildRequest()
.put(fileBytes)
progress.driveItem = driveItem
progress.progress.set(1.0)
progress.status = OneDriveTransferStatus.SUCCESS
} else {
val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
.createUploadSession(
DriveItemCreateUploadSessionParameterSet.newBuilder()
.withItem(DriveItemUploadableProperties().apply {
fileSize = file.fileSize
name = task.document.fileName
})
.build()
)
.buildRequest()
.post() ?: throw IllegalStateException("无法创建 OneDrive 上传会话.")
val progressCallback = IProgressCallback { current, max ->
progress.progress.set(current.toDouble() / max.toDouble())
try {
callback.onUploadProgress(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务进度回调失败: ${e.message}" }
}
}
val fileStream = getFileStream(task.bot, file.filePath)
val largeFileUploadTask = LargeFileUploadTask(
uploadSession,
graphClient,
fileStream,
file.fileSize,
DriveItem::class.java
)
progress.status = OneDriveTransferStatus.UPLOADING
val uploadResult = largeFileUploadTask.upload(ONCE_CHUNK_SIZE * chunkSize, null, progressCallback)
progress.driveItem = uploadResult.responseBody
progress.progress.set(1.0)
progress.status = OneDriveTransferStatus.SUCCESS
}
}
private fun getFileStream(bot: BaseAbilityBot, filePath: String): InputStream {
try {
val localFile = File(filePath)
if (localFile.exists()) {
logger.debug { "本地存在文件:$filePath" }
return localFile.inputStream()
}
} catch (e: Exception) {
logger.debug(e) { "无法从本地获取文件:$filePath" }
}
return bot.telegramClient.downloadFileAsStream(filePath)
}
companion object {
const val ONCE_CHUNK_SIZE = 320 * 1024
const val MAX_CHUNK_SIZE = 192
}
}
object OneDriveUtils {
private val logger = KotlinLogging.logger { }
fun checkAndGetPath(
graphClient: GraphServiceClient<Request>,
driveId: String,
storagePath: String,
originFileName: String
): String {
val folderPath = checkAndCreateFolder(graphClient, driveId, storagePath)
val fileName = checkFileName(graphClient, driveId, folderPath, originFileName)
return "$folderPath$fileName"
}
private fun checkAndCreateFolder(
graphClient: GraphServiceClient<Request>,
driveId: String,
folderPath: String
): String {
if (folderPath.trim() == "/") {
return ""
}
try {
val testPath = if (folderPath.startsWith('/')) {
folderPath.trimStart('/')
} else {
folderPath
}
graphClient.drives(driveId).root().itemWithPath(testPath).buildRequest().get()
logger.debug { "OneDrive 文件夹已存在:$testPath" }
return if (testPath.endsWith('/')) {
testPath
} else {
"$testPath/"
}
} catch (e: GraphServiceException) {
if (e.responseCode != 404) {
throw e
}
}
val path = folderPath.trim('/')
val pathComponents = path.split("/")
logger.debug { "PathComponents = $pathComponents" }
var parentPath: String
var currentPath = "/"
for (component in pathComponents) {
if (component.trim().isEmpty()) {
continue
}
parentPath = currentPath
currentPath += "$component/"
logger.debug { "CurrentPath = $currentPath" }
try {
val driveItem = graphClient.drives(driveId).root().itemWithPath(currentPath).buildRequest().get()
if (driveItem!!.folder == null) {
throw IllegalStateException("OneDrive 中已存在同名文件: $currentPath")
}
} catch (e: GraphServiceException) {
if (e.responseCode == 404) {
try {
val newFolder = DriveItem()
newFolder.name = component
newFolder.folder = com.microsoft.graph.models.Folder()
graphClient.drives(driveId).root().itemWithPath(parentPath).children()
.buildRequest()
.post(newFolder)
} catch (e: GraphServiceException) {
if (e.error?.error?.code != "nameAlreadyExists") {
throw e
}
}
} else {
throw e
}
}
}
if (!currentPath.endsWith("/")) {
currentPath += "/"
}
return currentPath.trimStart('/')
}
private fun checkFileName(
graphClient: GraphServiceClient<Request>,
driveId: String,
folderPath: String,
fileName: String
): String {
try {
graphClient.drives(driveId).root().itemWithPath("$folderPath$fileName").buildRequest().get()
} catch (e: GraphServiceException) {
if (e.responseCode == 404) {
return fileName
} else {
throw e
}
}
val fileNameComponents = fileName.split(".")
val fileNameWithoutExtension = fileNameComponents.subList(0, fileNameComponents.size - 1).joinToString(".")
val fileExtension = fileNameComponents.last()
var i = 1
while (true) {
val newFileName = "$fileNameWithoutExtension ($i).$fileExtension"
try {
graphClient.drives(driveId).root().itemWithPath("$folderPath$newFileName").buildRequest().get()
i++
} catch (e: GraphServiceException) {
if (e.responseCode != 404) {
throw e
}
return newFileName
}
}
}
}
enum class OneDriveTransferStatus {
/**
* 正在排队.
*/
QUEUING,
/**
* 正在从 Telegram 获取文件信息.
*/
GETTING_FILE_INFO,
/**
* 正在下载文件.
*/
DOWNLOADING_FILE,
/**
* 正在创建上传会话.
*/
CREATING_UPLOAD_SESSION,
/**
* 上传中.
*/
UPLOADING,
/**
* 上传成功.
*/
SUCCESS,
/**
* 上传失败.
*/
FAILURE
}
data class OneDriveTransferWorkerProgress(
val currentTask: OneDriveTransferTask,
val progress: AtomicDouble = AtomicDouble(),
var driveItem: DriveItem? = null,
var exception: Exception? = null,
var status: OneDriveTransferStatus = OneDriveTransferStatus.QUEUING
)
data class OneDriveTransferTask(
val tgUserId: Long,
val bot: BaseAbilityBot,
val service: OneDriveTransferService,
val document: Document,
val onedriveId: String,
val storagePath: String,
val extra: MutableMap<String, Any?> = mutableMapOf(),
val createdAt: Date = Date(),
var retryCount: Int = 0,
val id: UUID = UUID.randomUUID()
)

View File

@ -0,0 +1,472 @@
package net.lamgc.scext.onedrive_transfer
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import com.google.common.cache.CacheBuilder
import com.microsoft.aad.msal4j.MsalInteractionRequiredException
import mu.KotlinLogging
import org.jetbrains.exposed.sql.Database
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.abilitybots.api.objects.Ability
import org.telegram.telegrambots.abilitybots.api.objects.Locality
import org.telegram.telegrambots.abilitybots.api.objects.Privacy
import org.telegram.telegrambots.abilitybots.api.objects.Reply
import org.telegram.telegrambots.abilitybots.api.util.AbilityExtension
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import org.telegram.telegrambots.meta.api.objects.replykeyboard.InlineKeyboardMarkup
import org.telegram.telegrambots.meta.api.objects.replykeyboard.ReplyKeyboardMarkup
import org.telegram.telegrambots.meta.api.objects.replykeyboard.ReplyKeyboardRemove
import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.InlineKeyboardButton
import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.InlineKeyboardRow
import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.KeyboardRow
import java.io.File
import java.net.MalformedURLException
import java.net.URL
import java.util.concurrent.TimeUnit
class OneDriveTransferExtension(val bot: BaseAbilityBot, private val dataFolder: File) : AbilityExtension {
private val logger = KotlinLogging.logger { }
private val config: ExtensionConfig
private val onedriveService: OneDriveTransferService
private val buttonTokenCache = CacheBuilder.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.build<String, String>()
private val actionCache = CacheBuilder.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.build<Long, String>()
init {
config = loadConfiguration()
val db = Database.connect("jdbc:sqlite:${File(dataFolder, "./data.db").canonicalPath}", "org.sqlite.JDBC")
onedriveService = OneDriveTransferService(bot, config, db)
OneDriveTransferCenter.initial(config)
}
private fun loadConfiguration(): ExtensionConfig {
val configFile = File(this.dataFolder, "config.json")
val objectMapper = ObjectMapper().registerKotlinModule()
if (!configFile.exists()) {
configFile.createNewFile()
objectMapper.writeValue(configFile, ExtensionConfig())
return ExtensionConfig()
}
return objectMapper.readValue(configFile, ExtensionConfig::class.java)
}
fun startToUsage(): Ability = Ability.builder()
.named("start")
.info("查看 OneDrive Transfer 的帮助信息.")
.privacy(Privacy.PUBLIC)
.locality(Locality.USER)
.action {
it.bot().silent.send(
"""
OneDrive Transfer 是一个 Telegram 机器人,
可以将 Telegram 中的文件上传到 OneDrive 中转。
/login 登录 OneDrive 账户
/logout 登出 OneDrive 账户。
/my 查看当前 OneDrive 账户信息。
/select_drive 选择 OneDrive 驱动器。
/set_path 设置 OneDrive 中转路径。
/cancel 取消当前操作(仅限 set_path 命令)。
/start 查看帮助信息。
--------------------------------------------
基本用法:
1. 先使用 /login 命令,按照提示登录要存储文件的 OneDrive 账户。
2. 登录完成后执行 /select_drive 选择 OneDrive 驱动器;
3. 选好后直接转发文件给机器人就好,机器人会自动上传到 OneDrive 中转。
文件默认保存在 "Telegram Files/",如果需要修改可以使用 /set_path 命令。
""".trimIndent(), it.chatId()
)
}
.build()
fun loginOneDrive(): Ability = Ability
.builder()
.named("login")
.info("登录 OneDrive 账户.")
.locality(Locality.USER)
.privacy(Privacy.PUBLIC)
.action { ctx ->
val url = onedriveService.createLoginUrl(ctx.chatId())
actionCache.put(ctx.chatId(), "login")
ctx.bot().silent.send(
"""
请使用以下链接进行登录:
$url
-------------------------------------------
登录成功后会显示无法访问,是正常情况,请将地址栏的链接发回给机器人。
""".trimIndent(), ctx.chatId()
)
}
.enableStats()
.reply(
Reply.of(
{ bot, upd ->
try {
val account = onedriveService.updateAccount(upd.message.chat.id, URL(upd.message.text.trim()))
actionCache.invalidate(upd.message.chatId)
bot.silent.send(
"""
登录成功!
Microsoft 账号:${account.userName}
请使用 /select_drive 选择 OneDrive 驱动器以及设置上传路径。
""".trimIndent(), upd.message.chatId
)
} catch (e: MsalInteractionRequiredException) {
if (e.errorCode() == "AADSTS54005") {
bot.silent.send("登录失败,登录链接已过期,请重新登录。", upd.message.chatId)
} else {
bot.silent.send("登录失败,错误代码:${e.errorCode()}", upd.message.chatId)
}
actionCache.invalidate(upd.message.chatId)
} catch (e: MalformedURLException) {
bot.silent.send("链接格式错误,请发送正确的登录链接。", upd.message.chatId)
} catch (e: Exception) {
logger.error(e) { "处理 Oauth2 令牌时发生错误." }
bot.silent.send("处理 Oauth2 令牌时发生错误,请稍后重试。", upd.message.chatId)
actionCache.invalidate(upd.message.chatId)
}
},
{ upd -> upd.hasMessage() && upd.message.hasText() },
{ upd -> actionCache.getIfPresent(upd.message.chatId) == "login" }
))
.build()
fun status(): Ability = Ability.builder()
.named("my")
.info("查看当前 OneDrive 设定信息.")
.locality(Locality.USER)
.privacy(Privacy.PUBLIC)
.action {
val account = onedriveService.accountManager.getAccountByTgUserId(it.chatId())
if (account == null) {
it.bot().silent.send("当前账户未登录 OneDrive.", it.chatId())
return@action
}
it.bot().silent.send(
"""
当前账户已登录 OneDrive.
Microsoft 账号: ${account.userName}
""".trimIndent(), it.chatId()
)
}
.build()
fun selectDrive(): Ability = Ability.builder()
.named("select_drive")
.info("选择 OneDrive 驱动器.")
.locality(Locality.USER)
.privacy(Privacy.PUBLIC)
.action {
val currentDrive = try {
onedriveService.getCurrentDrive(it.chatId())
} catch (e: OneDriveNotLoginException) {
it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId())
return@action
} catch (e: Exception) {
null
}
val drives = onedriveService.listDriversByUserId(it.chatId())
if (drives.isEmpty()) {
it.bot().silent.send("当前账户没有 OneDrive 驱动器.", it.chatId())
return@action
}
val msgContent = """
当前账户已登录 OneDrive.
Microsoft 账号: ${onedriveService.accountManager.getAccountByTgUserId(it.chatId())?.userName}
当前正在使用的驱动器为:[${currentDrive?.driveType ?: "None"}] ${currentDrive?.name ?: "无"}
""".trimIndent()
SendMessage.builder()
.disableWebPagePreview(true)
.chatId(it.chatId().toString())
.text(msgContent)
.replyMarkup(InlineKeyboardMarkup.builder().apply {
for (drive in drives) {
keyboardRow(
InlineKeyboardRow(
InlineKeyboardButton.builder()
.text("[${drive.driveType}] ${drive.name}")
.callbackData(
buttonTokenCache.putToken(
"${drive.id}",
tokenPrefix = "select_drive:"
)
)
.build()
)
)
}
}.build())
.build().let { msg ->
it.bot().silent.execute(msg)
}
}
.reply({ bot, upd ->
val driveId = buttonTokenCache.getIfPresent(upd.callbackQuery.data.substringAfter(':'))
if (driveId == null) {
bot.silent.send("无效的驱动器 ID.", upd.callbackQuery.message.chatId)
return@reply
}
val drive = try {
onedriveService.listDriversByUserId(upd.callbackQuery.from.id).firstOrNull { it.id == driveId }
} catch (e: OneDriveNotLoginException) {
bot.silent.send("当前账户没有登录 OneDrive.", upd.callbackQuery.message.chatId)
return@reply
}
if (drive == null) {
bot.silent.send("无效的驱动器 ID.", upd.callbackQuery.message.chatId)
return@reply
}
onedriveService.setDrive(upd.callbackQuery.from.id, driveId)
val editMessageText = EditMessageText.builder()
.chatId(upd.callbackQuery.message.chatId)
.messageId(upd.callbackQuery.message.messageId)
.replyMarkup(
InlineKeyboardMarkup.builder()
.clearKeyboard()
.build()
)
.text(
"""
已选择驱动器:[${drive.driveType}] ${drive.name}
请使用 /set_path 设置上传路径。
""".trimIndent()
)
.build()
bot.silent.execute(editMessageText)
}, { upd ->
upd.hasCallbackQuery() && upd.callbackQuery.data != null && upd.callbackQuery.data.startsWith("select_drive:")
})
.build()
fun setPath(): Ability = Ability.builder()
.named("set_path")
.info("设置 OneDrive 中转路径.")
.locality(Locality.USER)
.privacy(Privacy.PUBLIC)
.action {
val currentDrive = try {
onedriveService.getCurrentDrive(it.chatId())
} catch (e: OneDriveNotLoginException) {
it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId())
return@action
}
if (currentDrive == null) {
it.bot().silent.send("当前账户没有选择 OneDrive 驱动器.", it.chatId())
return@action
}
val transferSetting = onedriveService.accountManager.getTransferSetting(it.chatId())
if (transferSetting == null) {
it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId())
return@action
}
val msgContent = """
当前账户已登录 OneDrive.
Microsoft 账号: ${transferSetting.userName}
当前正在使用的驱动器为:[${currentDrive.driveType}] ${currentDrive.name}
当前上传路径为:${transferSetting.storagePath}
-------------------------------------------------
请发送新的上传路径,或者发送 /cancel 取消设置。
""".trimIndent()
actionCache.put(it.chatId(), "set_path")
SendMessage.builder()
.disableWebPagePreview(true)
.chatId(it.chatId().toString())
.text(msgContent)
.replyMarkup(ReplyKeyboardMarkup.builder().apply {
this.oneTimeKeyboard(true)
this.selective(false)
this.inputFieldPlaceholder("OneDrive 上传路径")
this.keyboardRow(KeyboardRow().apply {
add("Telegram Files/")
})
this.resizeKeyboard(true)
this.isPersistent(false)
}.build())
.build().let { msg ->
it.bot().silent.execute(msg)
}
}
.reply({ bot, upd ->
val path = upd.message.text.trim()
if (path == "/cancel") {
actionCache.invalidate(upd.message.chatId)
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("已取消设置.")
.replyMarkup(
ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build()
)
.build()
)
return@reply
}
val transferSetting = onedriveService.accountManager.getTransferSetting(upd.message.chatId)
if (transferSetting == null) {
actionCache.invalidate(upd.message.chatId)
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("当前账户没有登录 OneDrive.")
.replyMarkup(
ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build()
)
.build()
)
return@reply
}
onedriveService.accountManager.doSomething {
transferSetting.storagePath = path
}
actionCache.invalidate(upd.message.chatId)
bot.silent.execute(
DeleteMessage.builder()
.chatId(upd.message.chatId.toString())
.messageId(upd.message.messageId)
.build()
)
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("已设置上传路径为:$path")
.replyMarkup(
ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build()
)
.build()
)
}, { upd ->
upd.hasMessage() && upd.message.hasText() && actionCache.getIfPresent(upd.message.chatId) == "set_path"
})
.build()
fun logout(): Ability = Ability.builder()
.named("logout")
.info("登出 OneDrive 账户.")
.locality(Locality.USER)
.privacy(Privacy.PUBLIC)
.action {
val transferSetting = onedriveService.accountManager.getTransferSetting(it.chatId())
if (transferSetting == null) {
it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId())
return@action
}
onedriveService.accountManager.doSomething {
transferSetting.delete()
}
it.bot().silent.send("已登出 OneDrive.", it.chatId())
}
.build()
fun uploadDocument(): Reply = Reply.of(
{ bot, upd ->
val document = upd.message.document
if (document == null) {
bot.silent.send("请发送文件.", upd.message.chatId)
return@of
}
if (document.fileSize > config.maxFileSize) {
bot.silent.send("文件大小超过限制.", upd.message.chatId)
return@of
}
onedriveService.submitUploadDocumentTask(upd.message.chatId, upd.message.messageId, document)
},
{ upd -> upd.hasMessage() && upd.message.hasDocument() },
{ upd ->
try {
val transferSetting = onedriveService.accountManager.getTransferSetting(upd.message.chatId)
transferSetting != null
} catch (e: OneDriveNotLoginException) {
false
}
}
)
fun adminStatus(): Ability = Ability.builder()
.named("status")
.info("查看当前中转状态.(仅限管理员)")
.privacy(Privacy.CREATOR)
.locality(Locality.USER)
.action {
val processingTasks = OneDriveTransferCenter.getProcessingTasks()
val msg = StringBuilder(
"""
当前排队任务数:${OneDriveTransferCenter.getQueueingTaskCount()}
当前处理任务数:${processingTasks.size} / ${OneDriveTransferCenter.getWorkerCount()}
--------------------------------------------------------------
""".trimIndent()
).append('\n')
for (workerId in 0..<OneDriveTransferCenter.getWorkerCount()) {
val task = processingTasks[workerId]
if (task == null) {
msg.append("Worker $workerId: 空闲\n")
} else {
msg.append(
"Worker $workerId: [${task.status}] ${
String.format(
"%.3f",
task.progress.get() * 100.0
)
}% ${task.currentTask.document.fileSize} Bytes\n"
)
}
}
it.bot().silent.send(msg.toString(), it.chatId())
}
.build()
fun cancelTask(): Ability = Ability.builder()
.named("cancel_task")
.info("取消指定的任务(回复对应的文件或状态消息)")
.privacy(Privacy.PUBLIC)
.locality(Locality.USER)
.action {
if (!it.update().message.isReply) {
it.bot().silent.send("请回复要取消的消息.", it.chatId())
return@action
}
val replyMessage = it.update().message.replyToMessage
val result = onedriveService.cancelTask(it.chatId(), replyMessage.messageId)
val msg = if (result) {
"已取消任务."
} else {
"任务已完成,无法取消."
}
val sendMessage = SendMessage.builder()
.chatId(it.chatId().toString())
.text(msg)
.replyToMessageId(it.update().message.messageId)
.build()
it.bot().silent.execute(sendMessage)
}
.build()
private fun Ability.AbilityBuilder.named(name: String): Ability.AbilityBuilder {
return if (config.useCommandPrefix) {
name("odt_$name")
} else {
name(name)
}
}
}

View File

@ -0,0 +1,96 @@
package net.lamgc.scext.onedrive_transfer
import com.microsoft.aad.msal4j.*
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
import java.net.URI
import java.net.URL
import java.sql.Connection
class OneDriveTransferSettingManager(private val authClient: ConfidentialClientApplication, private val db: Database) {
init {
TransactionManager.manager.defaultIsolationLevel = Connection.TRANSACTION_SERIALIZABLE
}
fun getTransferSetting(userId: Long): OneDriveTransferSetting? {
return transaction(db) {
return@transaction OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }
.firstOrNull()
}
}
fun <R> doSomething(handle: () -> R): R {
return transaction(db) {
handle()
}
}
fun createAuthorizationRequest(userId: Long): URL {
val parameters = AuthorizationRequestUrlParameters
.builder("http://localhost:45678/", OAUTH2_SCOPE)
.responseMode(ResponseMode.QUERY)
.prompt(Prompt.SELECT_ACCOUNT)
.state(userId.toString())
.build()
return authClient.getAuthorizationRequestUrl(parameters)
}
fun updateAccount(userId: Long, redirectUrl: URL, checkUserId: Boolean = true): OneDriveTransferSetting {
val queries = redirectUrl.getQueryMap()
if (checkUserId && userId != queries["state"]?.toLong()) {
throw IllegalArgumentException("State 不等于 userId.")
}
val future = authClient.acquireToken(
AuthorizationCodeParameters
.builder(queries["code"], URI.create("http://localhost:45678/"))
.build()
)
val result = future.get()
return transaction(db) {
val account =
OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
account?.apply {
accountId = result.account().homeAccountId()
userName = result.account().username()
}
?: OneDriveTransferSetting.new {
val drive = try {
OneDriveTransferService.createGraphClient(authClient, result.account())
.me().drive()
.buildRequest().get()
} catch (e: Exception) {
null
}
telegramUserId = userId
accountId = result.account().homeAccountId()
userName = result.account().username()
driveId = drive?.id ?: ""
storagePath = "Telegram Files/"
}
}
}
fun getAccountByTgUserId(userId: Long): OneDriveTransferSetting? {
return transaction(db) {
OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
}
}
companion object {
const val AUTHORITY = "https://login.microsoftonline.com/common"
val OAUTH2_SCOPE = setOf(
"User.Read",
"Files.Read",
"Files.ReadWrite",
"Files.Read.All",
"Files.ReadWrite.All",
"Sites.Read.All",
"Sites.ReadWrite.All",
"offline_access"
)
}
}

View File

@ -0,0 +1,167 @@
package net.lamgc.scext.onedrive_transfer
import com.microsoft.aad.msal4j.ClientCredentialFactory
import com.microsoft.aad.msal4j.ConfidentialClientApplication
import com.microsoft.aad.msal4j.IAccount
import com.microsoft.aad.msal4j.SilentParameters
import com.microsoft.graph.authentication.IAuthenticationProvider
import com.microsoft.graph.httpcore.HttpClients
import com.microsoft.graph.models.Drive
import com.microsoft.graph.requests.GraphServiceClient
import mu.KotlinLogging
import okhttp3.Request
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.transactions.transaction
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.objects.Document
import java.net.URL
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CopyOnWriteArrayList
class OneDriveTransferService(
private val bot: BaseAbilityBot,
private val config: ExtensionConfig,
private val db: Database
) {
private val logger = KotlinLogging.logger { }
val accountManager: OneDriveTransferSettingManager
private val authClient: ConfidentialClientApplication = ConfidentialClientApplication.builder(
config.clientId,
ClientCredentialFactory.createFromSecret(config.clientSecret),
)
.authority(OneDriveTransferSettingManager.AUTHORITY)
.setTokenCacheAccessAspect(DatabaseTokenCache(db))
.build()
private val tasks = CopyOnWriteArrayList<OneDriveTransferTask>()
init {
transaction(db) {
SchemaUtils.create(OneDriveTransferSettings, TokenCaches, RegisteredRemoteTransferWorkers)
}
accountManager = OneDriveTransferSettingManager(authClient, db)
}
fun createGraphClient(userId: Long): GraphServiceClient<Request> {
val cache = THREAD_CURRENT_GRAPH_CLIENT.get()
if (cache?.tgUserId == userId) {
return cache.client
}
val serviceClient = accountManager.getTransferSetting(userId)?.let {
authClient.accounts.get().firstOrNull { account ->
account.homeAccountId() == it.accountId
}
}?.let { Companion.createGraphClient(authClient, it) } ?: throw OneDriveNotLoginException()
THREAD_CURRENT_GRAPH_CLIENT.set(ClientCache(userId, serviceClient))
return serviceClient
}
fun createLoginUrl(userId: Long) =
accountManager.createAuthorizationRequest(userId)
fun updateAccount(userId: Long, redirectUrl: URL) =
accountManager.updateAccount(userId, redirectUrl)
fun listDriversByUserId(userId: Long): List<Drive> {
val sites = try {
val sites = createGraphClient(userId).sites()
.buildRequest().get()?.currentPage ?: emptyList()
sites.map {
it.drive
}.filterNotNull().toList()
} catch (e: Exception) {
logger.debug(e) { "获取 OneDrive 站点失败, 可能是用户没有权限或不是组织账号." }
emptyList()
}
val drive = createGraphClient(userId).me().drive().buildRequest().get()
val drives = createGraphClient(userId).drives()
.buildRequest()
.get()?.currentPage ?: emptyList()
return mutableListOf<Drive>().apply {
if (drive != null) {
add(drive)
}
addAll(drives)
addAll(sites)
}
}
fun setDrive(userId: Long, driveId: String) {
val transferSetting =
accountManager.getTransferSetting(userId) ?: throw OneDriveNotLoginException()
accountManager.doSomething {
transferSetting.driveId = driveId
}
}
fun getCurrentDrive(userId: Long): Drive? {
val transferSetting =
accountManager.getTransferSetting(userId) ?: throw OneDriveNotLoginException()
val graphClient = createGraphClient(userId)
return graphClient.drives(transferSetting.driveId)
.buildRequest()
.get()
}
fun submitUploadDocumentTask(userId: Long, messageId: Int, document: Document) {
val transferSetting =
accountManager.getTransferSetting(userId) ?: throw OneDriveNotLoginException()
val task = OneDriveTransferTask(
userId,
bot,
this,
document,
transferSetting.driveId,
transferSetting.storagePath,
).apply {
extra["chatId"] = userId
extra["messageId"] = messageId
}
val submitted = OneDriveTransferCenter.submitUploadTask(task)
if (submitted) {
tasks.add(task)
}
}
fun cancelTask(userId: Long, replyMessageId: Int): Boolean {
val task = tasks.find {
it.tgUserId == userId &&
(it.extra["infoMessageId"] == replyMessageId || it.extra["messageId"] == replyMessageId)
} ?: throw NoSuchElementException()
return OneDriveTransferCenter.cancelUploadTask(task)
}
companion object {
private val THREAD_CURRENT_GRAPH_CLIENT = ThreadLocal<ClientCache>()
fun createGraphClient(
authClient: ConfidentialClientApplication,
iAccount: IAccount
): GraphServiceClient<Request> {
return GraphServiceClient.builder()
.httpClient(HttpClients.createDefault(MsalAuthorizationProvider(authClient, iAccount)))
.buildClient()
}
}
}
private data class ClientCache(
val tgUserId: Long,
val client: GraphServiceClient<Request>,
)
class MsalAuthorizationProvider(
private val authClientApplication: ConfidentialClientApplication,
private val iAccount: IAccount
) : IAuthenticationProvider {
override fun getAuthorizationTokenAsync(requestUrl: URL): CompletableFuture<String> {
return authClientApplication.acquireTokenSilently(
SilentParameters.builder(OneDriveTransferSettingManager.OAUTH2_SCOPE, iAccount).build()
).thenApply { it.accessToken() }
}
}

View File

@ -0,0 +1,64 @@
package net.lamgc.scext.onedrive_transfer
import com.google.common.cache.Cache
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import org.telegram.telegrambots.meta.api.objects.message.Message
import org.telegram.telegrambots.meta.exceptions.TelegramApiRequestException
import java.net.URL
import java.net.URLDecoder
fun URL.getQueryMap(): Map<String, String> {
val queryMap = mutableMapOf<String, String>()
query.split("&").forEach {
val pair = it.split("=")
queryMap[pair[0]] = URLDecoder.decode(pair[1], "UTF-8")
}
return queryMap
}
// 生成大小写字母加数字的随机字符串
fun randomString(length: Int): String {
val charPool = ('a'..'z') + ('A'..'Z') + ('0'..'9')
return (1..length)
.map { kotlin.random.Random.nextInt(0, charPool.size) }
.map(charPool::get)
.joinToString("")
}
fun Cache<String, String>.putToken(value: String, tokenLength: Int = 16, tokenPrefix: String = ""): String {
val token = randomString(tokenLength)
put(token, value)
return "$tokenPrefix$token"
}
fun EditMessageText.orSendMessage(bot: BaseAbilityBot, replyMessageId: Int, tryRemove: Boolean = true): Message? {
try {
bot.telegramClient.execute(this)
return null
} catch (e: TelegramApiRequestException) {
if (e.errorCode != 400 || e.apiResponse != "message can't be edited") {
throw e
}
return bot.telegramClient.execute(
SendMessage.builder()
.replyToMessageId(replyMessageId)
.chatId(chatId.toString())
.text(text)
.build()
)
} finally {
if (tryRemove) {
try {
bot.telegramClient.execute(
DeleteMessage.builder()
.messageId(messageId)
.build()
)
} catch (_: Exception) {
}
}
}
}

View File

@ -0,0 +1,336 @@
package net.lamgc.scext.onedrive_transfer.remote
import com.microsoft.graph.models.DriveItemCreateUploadSessionParameterSet
import com.microsoft.graph.models.DriveItemUploadableProperties
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.auth.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.util.collections.*
import mu.KotlinLogging
import net.lamgc.scext.onedrive_transfer.*
import net.lamgc.scext.onedrive_transfer.remote.request.*
import okhttp3.internal.toImmutableMap
import org.jetbrains.exposed.sql.SqlExpressionBuilder.greaterEq
import java.time.LocalDateTime
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedDeque
class RemoteOneDriveTransferExecutorController(
private val config: CentralSetting,
private val callback: OneDriveTransferCallback
) : OneDriveTransferTaskExecutor {
private val logger = KotlinLogging.logger { }
private val pendingTasks = ConcurrentLinkedDeque<OneDriveTransferTask>()
private val workingTasks = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
private val preAllocatedTasks = ConcurrentHashMap<OneDriveTransferTask, PreAllocateTaskEntry>()
private val cancelledTasks = ConcurrentSet<OneDriveTransferTask>()
init {
embeddedServer(Netty, port = config.port) {
install(Authentication) {
bearer(AUTHENTICATE_CONFIG_ID) {
authenticate {
val worker =
RegisteredRemoteTransferWorker.find { RegisteredRemoteTransferWorkers.token eq it.token }
.limit(1)
.firstOrNull()
if (worker != null) {
worker.lastContactAt = LocalDateTime.now()
}
return@authenticate worker
}
}
}
registerRemoteWorkerRegisterRouting()
registerTaskAllocationRouting()
registerTaskProgressUpdateRouting()
registerFileUploadSessionRequestRouting()
}.start(wait = false)
}
override fun submitTransferTask(task: OneDriveTransferTask): Boolean {
pendingTasks.addLast(task)
try {
callback.onTransferTaskCreated(task)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferTaskCreated." }
}
return true
}
override fun cancelTransferTask(task: OneDriveTransferTask): Boolean {
if (pendingTasks.remove(task)) {
try {
callback.onTransferCancelled(task, null)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferCancelled." }
}
return true
}
if (workingTasks.values.any { it.currentTask == task }) {
cancelledTasks.add(task)
return true
} else if (preAllocatedTasks.contains(task)) {
preAllocatedTasks.remove(task)
try {
callback.onTransferCancelled(task, null)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferCancelled." }
}
return true
}
return false
}
override fun getQueuedTransferTasks(): List<OneDriveTransferTask> {
return this.pendingTasks.toList()
}
override fun getWorkerCount(): Int {
// 在近 1 小时内有联系的 Worker 数量
val minusHours = LocalDateTime.now().minusHours(1)
return RegisteredRemoteTransferWorker
.count(RegisteredRemoteTransferWorkers.lastContactAt greaterEq minusHours).toInt()
}
override fun getWorkingTasks(): Map<Int, OneDriveTransferWorkerProgress> {
return workingTasks.toImmutableMap()
}
private fun Application.registerRemoteWorkerRegisterRouting() {
routing {
post<RegisterWorkerRequest> {
if (config.registerToken != it.token) {
call.respond(HttpStatusCode.Forbidden, "Invalid token.")
return@post
}
val existsWorker =
RegisteredRemoteTransferWorker.find { RegisteredRemoteTransferWorkers.workerName eq it.workerName }
.firstOrNull()
val workerToken = randomString(128)
if (existsWorker != null) {
val now = LocalDateTime.now()
// 如果 existsWorker 的 lastContactAt 距离现在不超过 12 小时, 则拒绝重新注册
if (existsWorker.lastContactAt > now.minusHours(12)) {
call.respond(HttpStatusCode.Forbidden, "Worker already registered.")
return@post
} else {
existsWorker.token = workerToken
existsWorker.registeredAt = now
existsWorker.lastContactAt = now
}
} else {
RegisteredRemoteTransferWorker.new {
this.workerName = it.workerName
this.registeredAt = LocalDateTime.now()
this.lastContactAt = LocalDateTime.now()
this.token = workerToken
}
}
call.respond(HttpStatusCode.OK, RegisterWorkerResponse(true, "Worker registered.", workerToken))
}
}
}
private fun Application.registerTaskAllocationRouting() {
routing {
authenticate(AUTHENTICATE_CONFIG_ID) {
post<TaskAllocationRequest> { request ->
val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
?: return@post call.respond(HttpStatusCode.Unauthorized)
// 寻找 pendingTasks 中 fileSize 不超过 worker 的限制的任务
val task = pendingTasks.find { task ->
task.document.fileSize <= request.acceptMaxSize && task !in cancelledTasks && preAllocatedTasks.computeIfAbsent(
task
) {
PreAllocateTaskEntry(task, worker)
}.worker == worker
} ?: return@post call.respond(HttpStatusCode.NoContent)
call.respond(
HttpStatusCode.OK, TaskAllocationResponse(
true, TaskAllocationInfo(
taskId = task.id,
ownerBotToken = "",
fileId = task.document.fileId,
fileSize = task.document.fileSize
)
)
)
}
post<TaskAcceptRequest> {
val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
?: return@post call.respond(HttpStatusCode.Unauthorized)
val preAllocatedTaskEntry = preAllocatedTasks.entries.find { entry ->
entry.key.id == it.taskId && entry.value.worker.id == worker.id
} ?: return@post call.respond(HttpStatusCode.OK, TaskAcceptResponse(false))
pendingTasks.remove(preAllocatedTaskEntry.key)
val existsTask = workingTasks[worker.id.value]
if (existsTask != null) {
if (cancelledTasks.remove(existsTask.currentTask)) {
try {
callback.onTransferCancelled(existsTask.currentTask, existsTask)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferCancelled." }
}
} else {
pendingTasks.addFirst(existsTask.currentTask)
}
}
val workerProgress = OneDriveTransferWorkerProgress(preAllocatedTaskEntry.key)
workingTasks[worker.id.value] = workerProgress
preAllocatedTasks.remove(preAllocatedTaskEntry.key)
try {
callback.onTransferTaskStart(workerProgress)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferTaskStart." }
}
call.respond(HttpStatusCode.OK, TaskAcceptResponse(true))
}
}
}
}
private fun Application.registerTaskProgressUpdateRouting() {
routing {
authenticate(AUTHENTICATE_CONFIG_ID) {
post<TaskProgressUpdateRequest> {
val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
?: return@post call.respond(HttpStatusCode.Unauthorized)
val processingTask = workingTasks[worker.id.value]
if (processingTask == null || processingTask.currentTask.id != it.taskId) {
return@post call.respond(HttpStatusCode.BadRequest)
}
processingTask.progress.set(it.progress)
processingTask.status = when (it.status) {
TaskStatus.DOWNLOADING_FILE -> OneDriveTransferStatus.DOWNLOADING_FILE
TaskStatus.DOWNLOADED_FILE -> OneDriveTransferStatus.CREATING_UPLOAD_SESSION
TaskStatus.UPLOADING_FILE -> OneDriveTransferStatus.UPLOADING
TaskStatus.COMPLETED -> OneDriveTransferStatus.SUCCESS
}
if (cancelledTasks.contains(processingTask.currentTask)) {
cancelledTasks.remove(processingTask.currentTask)
workingTasks.remove(worker.id.value)
try {
callback.onTransferCancelled(processingTask.currentTask, processingTask)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferCancelled." }
}
return@post call.respond(
HttpStatusCode.OK,
TaskProgressUpdateResponse(TaskProgressUpdateStatus.CANCELLED)
)
}
try {
callback.onUploadProgress(processingTask)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onUploadProgress." }
}
if (processingTask.status == OneDriveTransferStatus.SUCCESS) {
try {
callback.onTransferSuccess(processingTask.currentTask, processingTask)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferSuccess." }
}
}
return@post call.respond(
HttpStatusCode.OK,
TaskProgressUpdateResponse(TaskProgressUpdateStatus.CONTINUE)
)
}
}
}
}
private fun Application.registerFileUploadSessionRequestRouting() {
routing {
authenticate(AUTHENTICATE_CONFIG_ID) {
post<FileUploadSessionRequest> {
val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
?: return@post call.respond(HttpStatusCode.Unauthorized)
val processingTask = workingTasks[worker.id.value]
if (processingTask == null || processingTask.currentTask.id != it.taskId) {
return@post call.respond(HttpStatusCode.BadRequest)
}
if (processingTask.status != OneDriveTransferStatus.CREATING_UPLOAD_SESSION) {
return@post call.respond(HttpStatusCode.BadRequest)
}
val task = processingTask.currentTask
val file = task.document
val graphClient = task.service.createGraphClient(task.tgUserId)
val drive = graphClient.drives(task.onedriveId).buildRequest().get()
?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
if (file.fileSize > drive.quota!!.remaining!!) {
throw IllegalStateException("OneDrive 剩余空间不足.")
}
val filePath = OneDriveUtils.checkAndGetPath(
graphClient,
task.onedriveId,
task.storagePath,
task.document.fileName
)
logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
.createUploadSession(
DriveItemCreateUploadSessionParameterSet.newBuilder()
.withItem(DriveItemUploadableProperties().apply {
fileSize = file.fileSize
name = task.document.fileName
})
.build()
)
.buildRequest()
.post() ?: throw IllegalStateException("无法创建 OneDrive 上传会话.")
val uploadUrl =
uploadSession.uploadUrl ?: return@post call.respond(HttpStatusCode.InternalServerError)
return@post call.respond(HttpStatusCode.OK, FileUploadSessionResponse(uploadUrl))
}
}
}
}
data class PreAllocateTaskEntry(
val task: OneDriveTransferTask,
val worker: RegisteredRemoteTransferWorker,
val allocatedAt: LocalDateTime = LocalDateTime.now()
)
companion object {
private const val AUTHENTICATE_CONFIG_ID = "worker-token"
}
}

View File

@ -0,0 +1 @@
net.lamgc.scext.onedrive_transfer.ExtensionFactory