Compare commits

...

1 Commits
main ... wip

Author SHA1 Message Date
c0126c900c
feat: 还在进行的主从节点重构工作. 2025-01-13 10:35:29 +08:00
27 changed files with 798 additions and 136 deletions

8
README.md Normal file
View File

@ -0,0 +1,8 @@
# OneDrive Transfer Bot
这是一个将 Telegram 文件中转至 OneDrive 的 Scalabot 机器人扩展。
## Install
### Telegram Client API Key

View File

@ -1,94 +1,21 @@
import java.net.URI
plugins {
kotlin("jvm") version "2.1.0"
`maven-publish`
kotlin("jvm") version "2.1.0" apply false
}
group = "net.lamgc.scext"
version = "0.1.0-SNAPSHOT"
repositories {
mavenLocal()
mavenCentral()
allprojects {
maven {
url = URI.create("https://git.lamgc.me/api/packages/LamGC/maven")
name = "lam-gitea"
}
}
dependencies {
compileOnly("org.slf4j:slf4j-api:2.0.10")
compileOnly("io.github.microutils:kotlin-logging:3.0.5")
compileOnly("net.lamgc:scalabot-extension:0.8.0-1")
implementation("com.fasterxml.jackson.core:jackson-core:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.16.1")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.16.1")
val exposedVersion = "0.45.0"
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-crypt:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-kotlin-datetime:$exposedVersion")
implementation("org.xerial:sqlite-jdbc:3.44.1.0")
implementation("mysql:mysql-connector-java:8.0.33")
implementation("com.zaxxer:HikariCP:5.1.0")
implementation("com.microsoft.graph:microsoft-graph:5.77.0")
implementation("com.azure:azure-identity:1.14.2")
testImplementation("org.jetbrains.kotlin:kotlin-test")
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(17)
}
val fatJar = task("fatJar", type = Jar::class) {
archiveBaseName = "${project.name}-agent"
manifest {
attributes["Implementation-Title"] = "OneDrive-Transfer"
attributes["Implementation-Version"] = version
attributes["Main-Class"] = "net.lamgc.scext.onedrive_transfer.AgentMainKt"
}
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) }))
with(tasks.jar.get() as CopySpec)
}
tasks {
"jar" {
dependsOn(fatJar)
}
}
publishing {
repositories {
maven("https://git.lamgc.me/api/packages/LamGC/maven") {
credentials {
username = project.properties["repo.credentials.self-git.username"].toString()
password = project.properties["repo.credentials.self-git.password"].toString()
}
}
mavenLocal()
}
mavenCentral()
publications {
create<MavenPublication>("maven") {
from(components["java"])
pom {
name.set("ScalaExt-OneDriveTransfer")
description.set("将 Telegram 中的文件转存至 OneDrive.")
}
maven {
url = URI.create("https://git.lamgc.me/api/packages/LamGC/maven")
name = "lam-gitea"
}
}

22
gradle/libs.versions.toml Normal file
View File

@ -0,0 +1,22 @@
[versions]
kotlin-version = "2.1.0"
ktor-version = "3.0.2"
logback-version = "1.4.14"
[libraries]
ktor-server-content-negotiation = { module = "io.ktor:ktor-server-content-negotiation-jvm", version.ref = "ktor-version" }
ktor-server-core = { module = "io.ktor:ktor-server-core-jvm", version.ref = "ktor-version" }
ktor-serialization-gson = { module = "io.ktor:ktor-serialization-gson-jvm", version.ref = "ktor-version" }
ktor-server-swagger = { module = "io.ktor:ktor-server-swagger-jvm", version.ref = "ktor-version" }
ktor-server-openapi = { module = "io.ktor:ktor-server-openapi", version.ref = "ktor-version" }
ktor-server-hsts = { module = "io.ktor:ktor-server-hsts-jvm", version.ref = "ktor-version" }
ktor-server-auth = { module = "io.ktor:ktor-server-auth-jvm", version.ref = "ktor-version" }
ktor-server-netty = { module = "io.ktor:ktor-server-netty-jvm", version.ref = "ktor-version" }
logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback-version" }
ktor-server-config-yaml = { module = "io.ktor:ktor-server-config-yaml-jvm", version.ref = "ktor-version" }
ktor-server-test-host = { module = "io.ktor:ktor-server-test-host-jvm", version.ref = "ktor-version" }
kotlin-test-junit = { module = "org.jetbrains.kotlin:kotlin-test-junit", version.ref = "kotlin-version" }
[plugins]
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin-version" }
ktor = { id = "io.ktor.plugin", version.ref = "ktor-version" }

View File

@ -0,0 +1,80 @@
plugins {
kotlin("jvm") version "2.1.0"
`maven-publish`
alias(libs.plugins.ktor)
}
group = "net.lamgc.scext"
version = "0.1.0-SNAPSHOT"
dependencies {
compileOnly("org.slf4j:slf4j-api:2.0.10")
compileOnly("io.github.microutils:kotlin-logging:3.0.5")
compileOnly("net.lamgc:scalabot-extension:0.8.0-1")
implementation("com.fasterxml.jackson.core:jackson-core:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.16.1")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.16.1")
val exposedVersion = "0.45.0"
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-crypt:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-kotlin-datetime:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion")
implementation("org.xerial:sqlite-jdbc:3.44.1.0")
implementation("mysql:mysql-connector-java:8.0.33")
implementation("com.zaxxer:HikariCP:5.1.0")
implementation("com.microsoft.graph:microsoft-graph:5.77.0")
implementation("com.azure:azure-identity:1.14.2")
implementation(libs.ktor.server.content.negotiation)
implementation(libs.ktor.server.core)
implementation(libs.ktor.serialization.gson)
implementation(libs.ktor.server.swagger)
implementation(libs.ktor.server.openapi)
implementation(libs.ktor.server.hsts)
implementation(libs.ktor.server.auth)
implementation(libs.ktor.server.netty)
implementation(libs.ktor.server.config.yaml)
testImplementation("org.jetbrains.kotlin:kotlin-test")
implementation(project(":onedrive-transfer-remote-common"))
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(17)
}
publishing {
repositories {
maven("https://git.lamgc.me/api/packages/LamGC/maven") {
credentials {
username = project.properties["repo.credentials.self-git.username"].toString()
password = project.properties["repo.credentials.self-git.password"].toString()
}
}
mavenLocal()
}
publications {
create<MavenPublication>("maven") {
from(components["java"])
pom {
name.set("ScalaExt-OneDriveTransfer")
description.set("将 Telegram 中的文件转存至 OneDrive.")
}
}
}
}

View File

@ -2,14 +2,33 @@ package net.lamgc.scext.onedrive_transfer
import com.microsoft.aad.msal4j.ITokenCacheAccessAspect
import com.microsoft.aad.msal4j.ITokenCacheAccessContext
import mu.KotlinLogging
import org.jetbrains.exposed.dao.IntEntity
import org.jetbrains.exposed.dao.IntEntityClass
import org.jetbrains.exposed.dao.LongEntity
import org.jetbrains.exposed.dao.LongEntityClass
import org.jetbrains.exposed.dao.id.EntityID
import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.dao.id.LongIdTable
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.javatime.datetime
import org.jetbrains.exposed.sql.transactions.transaction
object RegisteredRemoteTransferWorkers : IntIdTable() {
val workerName = varchar("worker_name", 128).uniqueIndex()
val token = varchar("token", 256).uniqueIndex()
val registeredAt = datetime("registered_at")
val lastContactAt = datetime("last_contact_at")
}
class RegisteredRemoteTransferWorker(id: EntityID<Int>) : IntEntity(id) {
var workerName by RegisteredRemoteTransferWorkers.workerName
var token by RegisteredRemoteTransferWorkers.token
var registeredAt by RegisteredRemoteTransferWorkers.registeredAt
var lastContactAt by RegisteredRemoteTransferWorkers.lastContactAt
companion object : IntEntityClass<RegisteredRemoteTransferWorker>(RegisteredRemoteTransferWorkers)
}
object OneDriveTransferSettings : LongIdTable() {
val telegramUserId = long("tg_user_id").uniqueIndex()
val accountId = varchar("account_id", 128)

View File

@ -9,8 +9,15 @@ data class ExtensionConfig(
val centralSetting: CentralSetting = CentralSetting()
)
/**
* @property enable 是否启用主从模式.
* @property port Web 服务器端口.
* @property baseUrl Web 服务器的对外 URL.
* @property registerToken 注册 Worker 所使用的 Token.
*/
data class CentralSetting(
val enable: Boolean = false,
val port: Int = 24860,
val secret: String = "",
val baseUrl: String = "http://localhost:${port}",
val registerToken: String = ""
)

View File

@ -10,6 +10,7 @@ import com.microsoft.graph.requests.GraphServiceClient
import com.microsoft.graph.tasks.IProgressCallback
import com.microsoft.graph.tasks.LargeFileUploadTask
import mu.KotlinLogging
import net.lamgc.scext.onedrive_transfer.remote.RemoteOneDriveTransferExecutorController
import okhttp3.Request
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.GetFile
@ -20,11 +21,24 @@ import java.io.InputStream
import java.net.SocketTimeoutException
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicReference
object OneDriveTransferCenter {
val executor: OneDriveTransferTaskExecutor =
LocalOneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, LinkedBlockingQueue())
private val executorRef = AtomicReference<OneDriveTransferTaskExecutor>()
private val executor: OneDriveTransferTaskExecutor
get() = executorRef.get() ?: throw IllegalStateException("OneDriveTransferCenter 未初始化.")
fun initial(config: ExtensionConfig) {
val taskExecutor = if (config.centralSetting.enable) {
RemoteOneDriveTransferExecutorController(config.centralSetting, DefaultOneDriveTransferCallback)
} else {
LocalOneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback)
}
executorRef.set(taskExecutor)
}
fun submitUploadTask(task: OneDriveTransferTask): Boolean = executor.submitTransferTask(task)
@ -74,7 +88,7 @@ interface OneDriveTransferTaskExecutor {
class LocalOneDriveTransferTaskExecutor(
private val threadNum: Int,
private val callback: OneDriveTransferCallback,
private val taskQueue: BlockingQueue<OneDriveTransferTask>,
private val taskQueue: BlockingQueue<OneDriveTransferTask> = LinkedBlockingQueue(),
private val chunkSize: Int = 26
) : ThreadPoolExecutor(
threadNum, threadNum, 0, TimeUnit.SECONDS,
@ -228,7 +242,8 @@ class LocalOneDriveTransferTaskExecutor(
throw IllegalStateException("OneDrive 剩余空间不足.")
}
val filePath = checkAndGetPath(graphClient, task.onedriveId, task.storagePath, task.document.fileName)
val filePath =
OneDriveUtils.checkAndGetPath(graphClient, task.onedriveId, task.storagePath, task.document.fileName)
logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
if (file.fileSize < 4 * 1024 * 1024) {
@ -246,6 +261,7 @@ class LocalOneDriveTransferTaskExecutor(
DriveItemCreateUploadSessionParameterSet.newBuilder()
.withItem(DriveItemUploadableProperties().apply {
fileSize = file.fileSize
name = task.document.fileName
})
.build()
)
@ -261,7 +277,6 @@ class LocalOneDriveTransferTaskExecutor(
}
val fileStream = getFileStream(task.bot, file.filePath)
val largeFileUploadTask = LargeFileUploadTask(
uploadSession,
graphClient,
@ -291,7 +306,18 @@ class LocalOneDriveTransferTaskExecutor(
return bot.telegramClient.downloadFileAsStream(filePath)
}
private fun checkAndGetPath(
companion object {
const val ONCE_CHUNK_SIZE = 320 * 1024
const val MAX_CHUNK_SIZE = 192
}
}
object OneDriveUtils {
private val logger = KotlinLogging.logger { }
fun checkAndGetPath(
graphClient: GraphServiceClient<Request>,
driveId: String,
storagePath: String,
@ -405,10 +431,6 @@ class LocalOneDriveTransferTaskExecutor(
}
}
companion object {
const val ONCE_CHUNK_SIZE = 320 * 1024
const val MAX_CHUNK_SIZE = 192
}
}
@ -423,6 +445,11 @@ enum class OneDriveTransferStatus {
*/
GETTING_FILE_INFO,
/**
* 正在下载文件.
*/
DOWNLOADING_FILE,
/**
* 正在创建上传会话.
*/

View File

@ -26,9 +26,9 @@ import java.net.MalformedURLException
import java.net.URL
import java.util.concurrent.TimeUnit
class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : AbilityExtension {
class OneDriveTransferExtension(val bot: BaseAbilityBot, private val dataFolder: File) : AbilityExtension {
private val logger = KotlinLogging.logger { }
private val logger = KotlinLogging.logger { }
private val config: ExtensionConfig
private val onedriveService: OneDriveTransferService
@ -44,9 +44,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
config = loadConfiguration()
val db = Database.connect("jdbc:sqlite:${File(dataFolder, "./data.db").canonicalPath}", "org.sqlite.JDBC")
onedriveService = OneDriveTransferService(bot, config, db)
OneDriveTransferCenter.initial(config)
}
fun loadConfiguration(): ExtensionConfig {
private fun loadConfiguration(): ExtensionConfig {
val configFile = File(this.dataFolder, "config.json")
val objectMapper = ObjectMapper().registerKotlinModule()
if (!configFile.exists()) {
@ -104,11 +106,13 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
-------------------------------------------
登录成功后会显示无法访问是正常情况请将地址栏的链接发回给机器人
""".trimIndent(), ctx.chatId())
""".trimIndent(), ctx.chatId()
)
}
.enableStats()
.reply(Reply.of(
{bot, upd ->
.reply(
Reply.of(
{ bot, upd ->
try {
val account = onedriveService.updateAccount(upd.message.chat.id, URL(upd.message.text.trim()))
actionCache.invalidate(upd.message.chatId)
@ -117,7 +121,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
登录成功
Microsoft 账号${account.userName}
请使用 /select_drive 选择 OneDrive 驱动器以及设置上传路径
""".trimIndent(), upd.message.chatId)
""".trimIndent(), upd.message.chatId
)
} catch (e: MsalInteractionRequiredException) {
if (e.errorCode() == "AADSTS54005") {
bot.silent.send("登录失败,登录链接已过期,请重新登录。", upd.message.chatId)
@ -153,7 +158,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
"""
当前账户已登录 OneDrive.
Microsoft 账号 ${account.userName}
""".trimIndent(), it.chatId())
""".trimIndent(), it.chatId()
)
}
.build()
@ -180,7 +186,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
val msgContent = """
当前账户已登录 OneDrive.
Microsoft 账号 ${onedriveService.accountManager.getAccountByTgUserId(it.chatId())?.userName}
当前正在使用的驱动器为[${currentDrive?.driveType ?: "None" }] ${currentDrive?.name ?: ""}
当前正在使用的驱动器为[${currentDrive?.driveType ?: "None"}] ${currentDrive?.name ?: ""}
""".trimIndent()
SendMessage.builder()
@ -300,12 +306,15 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
actionCache.invalidate(upd.message.chatId)
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("已取消设置.")
.replyMarkup(ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build())
.build())
.chatId(upd.message.chatId.toString())
.text("已取消设置.")
.replyMarkup(
ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build()
)
.build()
)
return@reply
}
val transferSetting = onedriveService.accountManager.getTransferSetting(upd.message.chatId)
@ -313,12 +322,15 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
actionCache.invalidate(upd.message.chatId)
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("当前账户没有登录 OneDrive.")
.replyMarkup(ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build())
.build())
.chatId(upd.message.chatId.toString())
.text("当前账户没有登录 OneDrive.")
.replyMarkup(
ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build()
)
.build()
)
return@reply
}
onedriveService.accountManager.doSomething {
@ -327,17 +339,21 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
actionCache.invalidate(upd.message.chatId)
bot.silent.execute(
DeleteMessage.builder()
.chatId(upd.message.chatId.toString())
.messageId(upd.message.messageId)
.build())
.chatId(upd.message.chatId.toString())
.messageId(upd.message.messageId)
.build()
)
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("已设置上传路径为:$path")
.replyMarkup(ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build())
.build())
.chatId(upd.message.chatId.toString())
.text("已设置上传路径为:$path")
.replyMarkup(
ReplyKeyboardRemove.builder()
.removeKeyboard(true)
.build()
)
.build()
)
}, { upd ->
upd.hasMessage() && upd.message.hasText() && actionCache.getIfPresent(upd.message.chatId) == "set_path"
})

View File

@ -1,12 +1,12 @@
package net.lamgc.scext.onedrive_transfer
import com.microsoft.aad.msal4j.*
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
import com.microsoft.aad.msal4j.*
import java.sql.Connection
import java.net.URL
import java.net.URI
import java.net.URL
import java.sql.Connection
class OneDriveTransferSettingManager(private val authClient: ConfidentialClientApplication, private val db: Database) {
@ -17,7 +17,8 @@ class OneDriveTransferSettingManager(private val authClient: ConfidentialClientA
fun getTransferSetting(userId: Long): OneDriveTransferSetting? {
return transaction(db) {
return@transaction OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
return@transaction OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }
.firstOrNull()
}
}
@ -49,7 +50,8 @@ class OneDriveTransferSettingManager(private val authClient: ConfidentialClientA
)
val result = future.get()
return transaction(db) {
val account = OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
val account =
OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
account?.apply {
accountId = result.account().homeAccountId()
userName = result.account().username()

View File

@ -25,7 +25,7 @@ class OneDriveTransferService(
private val config: ExtensionConfig,
private val db: Database
) {
private val logger = KotlinLogging.logger { }
private val logger = KotlinLogging.logger { }
val accountManager: OneDriveTransferSettingManager
private val authClient: ConfidentialClientApplication = ConfidentialClientApplication.builder(
@ -40,7 +40,7 @@ class OneDriveTransferService(
init {
transaction(db) {
SchemaUtils.create(OneDriveTransferSettings, TokenCaches)
SchemaUtils.create(OneDriveTransferSettings, TokenCaches, RegisteredRemoteTransferWorkers)
}
accountManager = OneDriveTransferSettingManager(authClient, db)
}
@ -138,7 +138,10 @@ class OneDriveTransferService(
companion object {
private val THREAD_CURRENT_GRAPH_CLIENT = ThreadLocal<ClientCache>()
fun createGraphClient(authClient: ConfidentialClientApplication, iAccount: IAccount): GraphServiceClient<Request> {
fun createGraphClient(
authClient: ConfidentialClientApplication,
iAccount: IAccount
): GraphServiceClient<Request> {
return GraphServiceClient.builder()
.httpClient(HttpClients.createDefault(MsalAuthorizationProvider(authClient, iAccount)))
.buildClient()
@ -151,7 +154,10 @@ private data class ClientCache(
val client: GraphServiceClient<Request>,
)
class MsalAuthorizationProvider(private val authClientApplication: ConfidentialClientApplication, private val iAccount: IAccount) : IAuthenticationProvider {
class MsalAuthorizationProvider(
private val authClientApplication: ConfidentialClientApplication,
private val iAccount: IAccount
) : IAuthenticationProvider {
override fun getAuthorizationTokenAsync(requestUrl: URL): CompletableFuture<String> {
return authClientApplication.acquireTokenSilently(
SilentParameters.builder(OneDriveTransferSettingManager.OAUTH2_SCOPE, iAccount).build()

View File

@ -0,0 +1,336 @@
package net.lamgc.scext.onedrive_transfer.remote
import com.microsoft.graph.models.DriveItemCreateUploadSessionParameterSet
import com.microsoft.graph.models.DriveItemUploadableProperties
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.auth.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.util.collections.*
import mu.KotlinLogging
import net.lamgc.scext.onedrive_transfer.*
import net.lamgc.scext.onedrive_transfer.remote.request.*
import okhttp3.internal.toImmutableMap
import org.jetbrains.exposed.sql.SqlExpressionBuilder.greaterEq
import java.time.LocalDateTime
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedDeque
class RemoteOneDriveTransferExecutorController(
private val config: CentralSetting,
private val callback: OneDriveTransferCallback
) : OneDriveTransferTaskExecutor {
private val logger = KotlinLogging.logger { }
private val pendingTasks = ConcurrentLinkedDeque<OneDriveTransferTask>()
private val workingTasks = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
private val preAllocatedTasks = ConcurrentHashMap<OneDriveTransferTask, PreAllocateTaskEntry>()
private val cancelledTasks = ConcurrentSet<OneDriveTransferTask>()
init {
embeddedServer(Netty, port = config.port) {
install(Authentication) {
bearer(AUTHENTICATE_CONFIG_ID) {
authenticate {
val worker =
RegisteredRemoteTransferWorker.find { RegisteredRemoteTransferWorkers.token eq it.token }
.limit(1)
.firstOrNull()
if (worker != null) {
worker.lastContactAt = LocalDateTime.now()
}
return@authenticate worker
}
}
}
registerRemoteWorkerRegisterRouting()
registerTaskAllocationRouting()
registerTaskProgressUpdateRouting()
registerFileUploadSessionRequestRouting()
}.start(wait = false)
}
override fun submitTransferTask(task: OneDriveTransferTask): Boolean {
pendingTasks.addLast(task)
try {
callback.onTransferTaskCreated(task)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferTaskCreated." }
}
return true
}
override fun cancelTransferTask(task: OneDriveTransferTask): Boolean {
if (pendingTasks.remove(task)) {
try {
callback.onTransferCancelled(task, null)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferCancelled." }
}
return true
}
if (workingTasks.values.any { it.currentTask == task }) {
cancelledTasks.add(task)
return true
} else if (preAllocatedTasks.contains(task)) {
preAllocatedTasks.remove(task)
try {
callback.onTransferCancelled(task, null)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferCancelled." }
}
return true
}
return false
}
override fun getQueuedTransferTasks(): List<OneDriveTransferTask> {
return this.pendingTasks.toList()
}
override fun getWorkerCount(): Int {
// 在近 1 小时内有联系的 Worker 数量
val minusHours = LocalDateTime.now().minusHours(1)
return RegisteredRemoteTransferWorker
.count(RegisteredRemoteTransferWorkers.lastContactAt greaterEq minusHours).toInt()
}
override fun getWorkingTasks(): Map<Int, OneDriveTransferWorkerProgress> {
return workingTasks.toImmutableMap()
}
private fun Application.registerRemoteWorkerRegisterRouting() {
routing {
post<RegisterWorkerRequest> {
if (config.registerToken != it.token) {
call.respond(HttpStatusCode.Forbidden, "Invalid token.")
return@post
}
val existsWorker =
RegisteredRemoteTransferWorker.find { RegisteredRemoteTransferWorkers.workerName eq it.workerName }
.firstOrNull()
val workerToken = randomString(128)
if (existsWorker != null) {
val now = LocalDateTime.now()
// 如果 existsWorker 的 lastContactAt 距离现在不超过 12 小时, 则拒绝重新注册
if (existsWorker.lastContactAt > now.minusHours(12)) {
call.respond(HttpStatusCode.Forbidden, "Worker already registered.")
return@post
} else {
existsWorker.token = workerToken
existsWorker.registeredAt = now
existsWorker.lastContactAt = now
}
} else {
RegisteredRemoteTransferWorker.new {
this.workerName = it.workerName
this.registeredAt = LocalDateTime.now()
this.lastContactAt = LocalDateTime.now()
this.token = workerToken
}
}
call.respond(HttpStatusCode.OK, RegisterWorkerResponse(true, "Worker registered.", workerToken))
}
}
}
private fun Application.registerTaskAllocationRouting() {
routing {
authenticate(AUTHENTICATE_CONFIG_ID) {
post<TaskAllocationRequest> { request ->
val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
?: return@post call.respond(HttpStatusCode.Unauthorized)
// 寻找 pendingTasks 中 fileSize 不超过 worker 的限制的任务
val task = pendingTasks.find { task ->
task.document.fileSize <= request.acceptMaxSize && task !in cancelledTasks && preAllocatedTasks.computeIfAbsent(
task
) {
PreAllocateTaskEntry(task, worker)
}.worker == worker
} ?: return@post call.respond(HttpStatusCode.NoContent)
call.respond(
HttpStatusCode.OK, TaskAllocationResponse(
true, TaskAllocationInfo(
taskId = task.id,
ownerBotToken = "",
fileId = task.document.fileId,
fileSize = task.document.fileSize
)
)
)
}
post<TaskAcceptRequest> {
val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
?: return@post call.respond(HttpStatusCode.Unauthorized)
val preAllocatedTaskEntry = preAllocatedTasks.entries.find { entry ->
entry.key.id == it.taskId && entry.value.worker.id == worker.id
} ?: return@post call.respond(HttpStatusCode.OK, TaskAcceptResponse(false))
pendingTasks.remove(preAllocatedTaskEntry.key)
val existsTask = workingTasks[worker.id.value]
if (existsTask != null) {
if (cancelledTasks.remove(existsTask.currentTask)) {
try {
callback.onTransferCancelled(existsTask.currentTask, existsTask)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferCancelled." }
}
} else {
pendingTasks.addFirst(existsTask.currentTask)
}
}
val workerProgress = OneDriveTransferWorkerProgress(preAllocatedTaskEntry.key)
workingTasks[worker.id.value] = workerProgress
preAllocatedTasks.remove(preAllocatedTaskEntry.key)
try {
callback.onTransferTaskStart(workerProgress)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferTaskStart." }
}
call.respond(HttpStatusCode.OK, TaskAcceptResponse(true))
}
}
}
}
private fun Application.registerTaskProgressUpdateRouting() {
routing {
authenticate(AUTHENTICATE_CONFIG_ID) {
post<TaskProgressUpdateRequest> {
val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
?: return@post call.respond(HttpStatusCode.Unauthorized)
val processingTask = workingTasks[worker.id.value]
if (processingTask == null || processingTask.currentTask.id != it.taskId) {
return@post call.respond(HttpStatusCode.BadRequest)
}
processingTask.progress.set(it.progress)
processingTask.status = when (it.status) {
TaskStatus.DOWNLOADING_FILE -> OneDriveTransferStatus.DOWNLOADING_FILE
TaskStatus.DOWNLOADED_FILE -> OneDriveTransferStatus.CREATING_UPLOAD_SESSION
TaskStatus.UPLOADING_FILE -> OneDriveTransferStatus.UPLOADING
TaskStatus.COMPLETED -> OneDriveTransferStatus.SUCCESS
}
if (cancelledTasks.contains(processingTask.currentTask)) {
cancelledTasks.remove(processingTask.currentTask)
workingTasks.remove(worker.id.value)
try {
callback.onTransferCancelled(processingTask.currentTask, processingTask)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferCancelled." }
}
return@post call.respond(
HttpStatusCode.OK,
TaskProgressUpdateResponse(TaskProgressUpdateStatus.CANCELLED)
)
}
try {
callback.onUploadProgress(processingTask)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onUploadProgress." }
}
if (processingTask.status == OneDriveTransferStatus.SUCCESS) {
try {
callback.onTransferSuccess(processingTask.currentTask, processingTask)
} catch (e: Exception) {
logger.error(e) { "Error occurred while calling onTransferSuccess." }
}
}
return@post call.respond(
HttpStatusCode.OK,
TaskProgressUpdateResponse(TaskProgressUpdateStatus.CONTINUE)
)
}
}
}
}
private fun Application.registerFileUploadSessionRequestRouting() {
routing {
authenticate(AUTHENTICATE_CONFIG_ID) {
post<FileUploadSessionRequest> {
val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
?: return@post call.respond(HttpStatusCode.Unauthorized)
val processingTask = workingTasks[worker.id.value]
if (processingTask == null || processingTask.currentTask.id != it.taskId) {
return@post call.respond(HttpStatusCode.BadRequest)
}
if (processingTask.status != OneDriveTransferStatus.CREATING_UPLOAD_SESSION) {
return@post call.respond(HttpStatusCode.BadRequest)
}
val task = processingTask.currentTask
val file = task.document
val graphClient = task.service.createGraphClient(task.tgUserId)
val drive = graphClient.drives(task.onedriveId).buildRequest().get()
?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
if (file.fileSize > drive.quota!!.remaining!!) {
throw IllegalStateException("OneDrive 剩余空间不足.")
}
val filePath = OneDriveUtils.checkAndGetPath(
graphClient,
task.onedriveId,
task.storagePath,
task.document.fileName
)
logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
.createUploadSession(
DriveItemCreateUploadSessionParameterSet.newBuilder()
.withItem(DriveItemUploadableProperties().apply {
fileSize = file.fileSize
name = task.document.fileName
})
.build()
)
.buildRequest()
.post() ?: throw IllegalStateException("无法创建 OneDrive 上传会话.")
val uploadUrl =
uploadSession.uploadUrl ?: return@post call.respond(HttpStatusCode.InternalServerError)
return@post call.respond(HttpStatusCode.OK, FileUploadSessionResponse(uploadUrl))
}
}
}
}
data class PreAllocateTaskEntry(
val task: OneDriveTransferTask,
val worker: RegisteredRemoteTransferWorker,
val allocatedAt: LocalDateTime = LocalDateTime.now()
)
companion object {
private const val AUTHENTICATE_CONFIG_ID = "worker-token"
}
}

View File

@ -0,0 +1,18 @@
plugins {
kotlin("jvm")
}
group = "net.lamgc.scext"
version = "0.1.0-SNAPSHOT"
dependencies {
testImplementation(kotlin("test"))
}
kotlin {
}
tasks.test {
useJUnitPlatform()
}

View File

@ -0,0 +1,2 @@
package net.lamgc.scext.onedrive_transfer.remote.request

View File

@ -0,0 +1,11 @@
package net.lamgc.scext.onedrive_transfer.remote.request
import java.util.*
data class FileUploadSessionRequest(
val taskId: UUID
)
data class FileUploadSessionResponse(
val uploadUrl: String
)

View File

@ -0,0 +1,12 @@
package net.lamgc.scext.onedrive_transfer.remote.request
data class RegisterWorkerRequest(
val workerName: String,
val token: String,
)
data class RegisterWorkerResponse(
val success: Boolean,
val message: String,
val workerToken: String
)

View File

@ -0,0 +1,21 @@
package net.lamgc.scext.onedrive_transfer.remote.request
import java.util.*
/**
* 从节点接受任务请求.
*/
data class TaskAcceptRequest(
val taskId: UUID
)
/**
* 如果任务被该节点预分配, 则返回确认响应.
*
* 如果任务并非为该从节点预分配, 或者任务预分配过期, 被其他节点接受, 则返回拒绝响应.
*
* @property confirmed 是否确认接受任务.
*/
data class TaskAcceptResponse(
val confirmed: Boolean = true
)

View File

@ -0,0 +1,42 @@
package net.lamgc.scext.onedrive_transfer.remote.request
import java.util.*
/**
* 从节点发送任务分配请求.
*
* 主节点将根据请求的 acceptMaxSize 进行预分配任务.
*
* @property acceptMaxSize 从节点可接受的最大文件大小.
*/
data class TaskAllocationRequest(
val acceptMaxSize: Long
)
/**
* 主节点返回的任务分配结果.
*
* @property allocated 是否成功分配了任务.
* @property info 预分配的任务信息.
*/
data class TaskAllocationResponse(
val allocated: Boolean,
val info: TaskAllocationInfo? = null
)
/**
* 任务分配信息.
*
* 主节点将为从节点保留任务一段时间, 直至从节点接受任务, 或者预分配任务超时.
*
* @property taskId 任务 ID.
* @property ownerBotToken 任务所属的 Bot Token.
* @property fileId 文件 ID.
* @property fileSize 文件大小.
*/
data class TaskAllocationInfo(
val taskId: UUID,
val ownerBotToken: String,
val fileId: String,
val fileSize: Long,
)

View File

@ -0,0 +1,38 @@
package net.lamgc.scext.onedrive_transfer.remote.request
import java.util.*
data class TaskProgressUpdateRequest(
val taskId: UUID,
val status: TaskStatus,
val progress: Double
)
data class TaskProgressUpdateResponse(
val status: TaskProgressUpdateStatus
)
enum class TaskStatus {
DOWNLOADING_FILE,
DOWNLOADED_FILE,
UPLOADING_FILE,
COMPLETED
}
enum class TaskProgressUpdateStatus {
/**
* 任务可以继续进行.
*/
CONTINUE,
/**
* 任务已取消, Worker 应该放弃该任务.
*/
CANCELLED,
}

View File

@ -0,0 +1,36 @@
plugins {
kotlin("jvm") version "2.1.0"
application
}
group = "net.lamgc.scext"
version = "0.1.0-SNAPSHOT"
repositories {
mavenCentral()
}
application {
mainClass = ""
}
dependencies {
implementation(project(":onedrive-transfer-remote-common"))
implementation("org.slf4j:slf4j-api:2.0.11")
implementation("io.github.microutils:kotlin-logging:3.0.5")
implementation("ch.qos.logback:logback-classic:1.5.15")
implementation("com.github.ajalt.clikt:clikt:5.0.2")
implementation("com.github.ajalt.clikt:clikt-markdown:5.0.2")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
implementation("org.jetbrains.kotlin:kotlin-reflect:2.1.0")
implementation("com.google.code.gson:gson:2.10.1")
testImplementation(kotlin("test"))
}
tasks.test {
useJUnitPlatform()
}

View File

@ -0,0 +1,35 @@
package net.lamgc.scext.onedrive_transfer.remote.worker
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.main
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.ulong
import java.io.File
import kotlin.math.min
class Main : CliktCommand("overdrive-transfer-worker") {
private val host: String by option()
.required()
.help("Controller URL.")
private val tempDir: String by option()
.help("Temp directory.")
.default("./work-temp/")
private val maxAllowedFileSize by option()
.ulong()
.help("Max allowed file size.")
.defaultLazy {
// 获取所在硬盘的大小,最大不超过 4 G
val disk = File(tempDir).toPath().root.toFile()
return@defaultLazy min(disk.usableSpace, 536870912).toULong()
}
override fun run() {
}
}
fun main(args: Array<String>) = Main().main(args)

View File

@ -2,4 +2,6 @@ plugins {
id("org.gradle.toolchains.foojay-resolver-convention") version "0.5.0"
}
rootProject.name = "onedrive-transfer"
include("onedrive-transfer-bot-extension")
include("onedrive-transfer-remote-common")
include("onedrive-transfer-remote-worker")

View File

@ -1,5 +0,0 @@
package net.lamgc.scext.onedrive_transfer
fun main() {
}