Compare commits
	
		
			1 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						
						
							
						
						c0126c900c
	
				 | 
					
					
						
							
								
								
									
										8
									
								
								README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										8
									
								
								README.md
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,8 @@
 | 
			
		||||
# OneDrive Transfer Bot
 | 
			
		||||
 | 
			
		||||
这是一个将 Telegram 文件中转至 OneDrive 的 Scalabot 机器人扩展。
 | 
			
		||||
 | 
			
		||||
## Install
 | 
			
		||||
 | 
			
		||||
### Telegram Client API Key
 | 
			
		||||
 | 
			
		||||
@ -1,94 +1,21 @@
 | 
			
		||||
import java.net.URI
 | 
			
		||||
 | 
			
		||||
plugins {
 | 
			
		||||
    kotlin("jvm") version "2.1.0"
 | 
			
		||||
    `maven-publish`
 | 
			
		||||
    kotlin("jvm") version "2.1.0" apply false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
group = "net.lamgc.scext"
 | 
			
		||||
version = "0.1.0-SNAPSHOT"
 | 
			
		||||
 | 
			
		||||
repositories {
 | 
			
		||||
    mavenLocal()
 | 
			
		||||
    mavenCentral()
 | 
			
		||||
allprojects {
 | 
			
		||||
 | 
			
		||||
    maven {
 | 
			
		||||
        url = URI.create("https://git.lamgc.me/api/packages/LamGC/maven")
 | 
			
		||||
        name = "lam-gitea"
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
dependencies {
 | 
			
		||||
    compileOnly("org.slf4j:slf4j-api:2.0.10")
 | 
			
		||||
    compileOnly("io.github.microutils:kotlin-logging:3.0.5")
 | 
			
		||||
    compileOnly("net.lamgc:scalabot-extension:0.8.0-1")
 | 
			
		||||
 | 
			
		||||
    implementation("com.fasterxml.jackson.core:jackson-core:2.16.1")
 | 
			
		||||
    implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
 | 
			
		||||
    implementation("com.fasterxml.jackson.core:jackson-annotations:2.16.1")
 | 
			
		||||
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.16.1")
 | 
			
		||||
 | 
			
		||||
    val exposedVersion = "0.45.0"
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-crypt:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-kotlin-datetime:$exposedVersion")
 | 
			
		||||
 | 
			
		||||
    implementation("org.xerial:sqlite-jdbc:3.44.1.0")
 | 
			
		||||
    implementation("mysql:mysql-connector-java:8.0.33")
 | 
			
		||||
    implementation("com.zaxxer:HikariCP:5.1.0")
 | 
			
		||||
 | 
			
		||||
    implementation("com.microsoft.graph:microsoft-graph:5.77.0")
 | 
			
		||||
    implementation("com.azure:azure-identity:1.14.2")
 | 
			
		||||
 | 
			
		||||
    testImplementation("org.jetbrains.kotlin:kotlin-test")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
tasks.test {
 | 
			
		||||
    useJUnitPlatform()
 | 
			
		||||
}
 | 
			
		||||
kotlin {
 | 
			
		||||
    jvmToolchain(17)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
val fatJar = task("fatJar", type = Jar::class) {
 | 
			
		||||
    archiveBaseName = "${project.name}-agent"
 | 
			
		||||
    manifest {
 | 
			
		||||
        attributes["Implementation-Title"] = "OneDrive-Transfer"
 | 
			
		||||
        attributes["Implementation-Version"] = version
 | 
			
		||||
        attributes["Main-Class"] = "net.lamgc.scext.onedrive_transfer.AgentMainKt"
 | 
			
		||||
    }
 | 
			
		||||
    duplicatesStrategy = DuplicatesStrategy.EXCLUDE
 | 
			
		||||
    from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) }))
 | 
			
		||||
    with(tasks.jar.get() as CopySpec)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
tasks {
 | 
			
		||||
    "jar" {
 | 
			
		||||
        dependsOn(fatJar)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
publishing {
 | 
			
		||||
    repositories {
 | 
			
		||||
        maven("https://git.lamgc.me/api/packages/LamGC/maven") {
 | 
			
		||||
            credentials {
 | 
			
		||||
                username = project.properties["repo.credentials.self-git.username"].toString()
 | 
			
		||||
                password = project.properties["repo.credentials.self-git.password"].toString()
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        mavenLocal()
 | 
			
		||||
    }
 | 
			
		||||
        mavenCentral()
 | 
			
		||||
 | 
			
		||||
    publications {
 | 
			
		||||
        create<MavenPublication>("maven") {
 | 
			
		||||
            from(components["java"])
 | 
			
		||||
 | 
			
		||||
            pom {
 | 
			
		||||
                name.set("ScalaExt-OneDriveTransfer")
 | 
			
		||||
                description.set("将 Telegram 中的文件转存至 OneDrive.")
 | 
			
		||||
            }
 | 
			
		||||
        maven {
 | 
			
		||||
            url = URI.create("https://git.lamgc.me/api/packages/LamGC/maven")
 | 
			
		||||
            name = "lam-gitea"
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										22
									
								
								gradle/libs.versions.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								gradle/libs.versions.toml
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,22 @@
 | 
			
		||||
[versions]
 | 
			
		||||
kotlin-version = "2.1.0"
 | 
			
		||||
ktor-version = "3.0.2"
 | 
			
		||||
logback-version = "1.4.14"
 | 
			
		||||
 | 
			
		||||
[libraries]
 | 
			
		||||
ktor-server-content-negotiation = { module = "io.ktor:ktor-server-content-negotiation-jvm", version.ref = "ktor-version" }
 | 
			
		||||
ktor-server-core = { module = "io.ktor:ktor-server-core-jvm", version.ref = "ktor-version" }
 | 
			
		||||
ktor-serialization-gson = { module = "io.ktor:ktor-serialization-gson-jvm", version.ref = "ktor-version" }
 | 
			
		||||
ktor-server-swagger = { module = "io.ktor:ktor-server-swagger-jvm", version.ref = "ktor-version" }
 | 
			
		||||
ktor-server-openapi = { module = "io.ktor:ktor-server-openapi", version.ref = "ktor-version" }
 | 
			
		||||
ktor-server-hsts = { module = "io.ktor:ktor-server-hsts-jvm", version.ref = "ktor-version" }
 | 
			
		||||
ktor-server-auth = { module = "io.ktor:ktor-server-auth-jvm", version.ref = "ktor-version" }
 | 
			
		||||
ktor-server-netty = { module = "io.ktor:ktor-server-netty-jvm", version.ref = "ktor-version" }
 | 
			
		||||
logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback-version" }
 | 
			
		||||
ktor-server-config-yaml = { module = "io.ktor:ktor-server-config-yaml-jvm", version.ref = "ktor-version" }
 | 
			
		||||
ktor-server-test-host = { module = "io.ktor:ktor-server-test-host-jvm", version.ref = "ktor-version" }
 | 
			
		||||
kotlin-test-junit = { module = "org.jetbrains.kotlin:kotlin-test-junit", version.ref = "kotlin-version" }
 | 
			
		||||
 | 
			
		||||
[plugins]
 | 
			
		||||
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin-version" }
 | 
			
		||||
ktor = { id = "io.ktor.plugin", version.ref = "ktor-version" }
 | 
			
		||||
							
								
								
									
										80
									
								
								onedrive-transfer-bot-extension/build.gradle.kts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										80
									
								
								onedrive-transfer-bot-extension/build.gradle.kts
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,80 @@
 | 
			
		||||
plugins {
 | 
			
		||||
    kotlin("jvm") version "2.1.0"
 | 
			
		||||
    `maven-publish`
 | 
			
		||||
    alias(libs.plugins.ktor)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
group = "net.lamgc.scext"
 | 
			
		||||
version = "0.1.0-SNAPSHOT"
 | 
			
		||||
 | 
			
		||||
dependencies {
 | 
			
		||||
    compileOnly("org.slf4j:slf4j-api:2.0.10")
 | 
			
		||||
    compileOnly("io.github.microutils:kotlin-logging:3.0.5")
 | 
			
		||||
    compileOnly("net.lamgc:scalabot-extension:0.8.0-1")
 | 
			
		||||
 | 
			
		||||
    implementation("com.fasterxml.jackson.core:jackson-core:2.16.1")
 | 
			
		||||
    implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
 | 
			
		||||
    implementation("com.fasterxml.jackson.core:jackson-annotations:2.16.1")
 | 
			
		||||
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.16.1")
 | 
			
		||||
 | 
			
		||||
    val exposedVersion = "0.45.0"
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-crypt:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-kotlin-datetime:$exposedVersion")
 | 
			
		||||
    implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion")
 | 
			
		||||
 | 
			
		||||
    implementation("org.xerial:sqlite-jdbc:3.44.1.0")
 | 
			
		||||
    implementation("mysql:mysql-connector-java:8.0.33")
 | 
			
		||||
    implementation("com.zaxxer:HikariCP:5.1.0")
 | 
			
		||||
 | 
			
		||||
    implementation("com.microsoft.graph:microsoft-graph:5.77.0")
 | 
			
		||||
    implementation("com.azure:azure-identity:1.14.2")
 | 
			
		||||
 | 
			
		||||
    implementation(libs.ktor.server.content.negotiation)
 | 
			
		||||
    implementation(libs.ktor.server.core)
 | 
			
		||||
    implementation(libs.ktor.serialization.gson)
 | 
			
		||||
    implementation(libs.ktor.server.swagger)
 | 
			
		||||
    implementation(libs.ktor.server.openapi)
 | 
			
		||||
    implementation(libs.ktor.server.hsts)
 | 
			
		||||
    implementation(libs.ktor.server.auth)
 | 
			
		||||
    implementation(libs.ktor.server.netty)
 | 
			
		||||
    implementation(libs.ktor.server.config.yaml)
 | 
			
		||||
 | 
			
		||||
    testImplementation("org.jetbrains.kotlin:kotlin-test")
 | 
			
		||||
 | 
			
		||||
    implementation(project(":onedrive-transfer-remote-common"))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
tasks.test {
 | 
			
		||||
    useJUnitPlatform()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
kotlin {
 | 
			
		||||
    jvmToolchain(17)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
publishing {
 | 
			
		||||
    repositories {
 | 
			
		||||
        maven("https://git.lamgc.me/api/packages/LamGC/maven") {
 | 
			
		||||
            credentials {
 | 
			
		||||
                username = project.properties["repo.credentials.self-git.username"].toString()
 | 
			
		||||
                password = project.properties["repo.credentials.self-git.password"].toString()
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        mavenLocal()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    publications {
 | 
			
		||||
        create<MavenPublication>("maven") {
 | 
			
		||||
            from(components["java"])
 | 
			
		||||
 | 
			
		||||
            pom {
 | 
			
		||||
                name.set("ScalaExt-OneDriveTransfer")
 | 
			
		||||
                description.set("将 Telegram 中的文件转存至 OneDrive.")
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -2,14 +2,33 @@ package net.lamgc.scext.onedrive_transfer
 | 
			
		||||
 | 
			
		||||
import com.microsoft.aad.msal4j.ITokenCacheAccessAspect
 | 
			
		||||
import com.microsoft.aad.msal4j.ITokenCacheAccessContext
 | 
			
		||||
import mu.KotlinLogging
 | 
			
		||||
import org.jetbrains.exposed.dao.IntEntity
 | 
			
		||||
import org.jetbrains.exposed.dao.IntEntityClass
 | 
			
		||||
import org.jetbrains.exposed.dao.LongEntity
 | 
			
		||||
import org.jetbrains.exposed.dao.LongEntityClass
 | 
			
		||||
import org.jetbrains.exposed.dao.id.EntityID
 | 
			
		||||
import org.jetbrains.exposed.dao.id.IntIdTable
 | 
			
		||||
import org.jetbrains.exposed.dao.id.LongIdTable
 | 
			
		||||
import org.jetbrains.exposed.sql.Database
 | 
			
		||||
import org.jetbrains.exposed.sql.javatime.datetime
 | 
			
		||||
import org.jetbrains.exposed.sql.transactions.transaction
 | 
			
		||||
 | 
			
		||||
object RegisteredRemoteTransferWorkers : IntIdTable() {
 | 
			
		||||
    val workerName = varchar("worker_name", 128).uniqueIndex()
 | 
			
		||||
    val token = varchar("token", 256).uniqueIndex()
 | 
			
		||||
    val registeredAt = datetime("registered_at")
 | 
			
		||||
    val lastContactAt = datetime("last_contact_at")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class RegisteredRemoteTransferWorker(id: EntityID<Int>) : IntEntity(id) {
 | 
			
		||||
    var workerName by RegisteredRemoteTransferWorkers.workerName
 | 
			
		||||
    var token by RegisteredRemoteTransferWorkers.token
 | 
			
		||||
    var registeredAt by RegisteredRemoteTransferWorkers.registeredAt
 | 
			
		||||
    var lastContactAt by RegisteredRemoteTransferWorkers.lastContactAt
 | 
			
		||||
 | 
			
		||||
    companion object : IntEntityClass<RegisteredRemoteTransferWorker>(RegisteredRemoteTransferWorkers)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
object OneDriveTransferSettings : LongIdTable() {
 | 
			
		||||
    val telegramUserId = long("tg_user_id").uniqueIndex()
 | 
			
		||||
    val accountId = varchar("account_id", 128)
 | 
			
		||||
@ -9,8 +9,15 @@ data class ExtensionConfig(
 | 
			
		||||
    val centralSetting: CentralSetting = CentralSetting()
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @property enable 是否启用主从模式.
 | 
			
		||||
 * @property port Web 服务器端口.
 | 
			
		||||
 * @property baseUrl Web 服务器的对外 URL.
 | 
			
		||||
 * @property registerToken 注册 Worker 所使用的 Token.
 | 
			
		||||
 */
 | 
			
		||||
data class CentralSetting(
 | 
			
		||||
    val enable: Boolean = false,
 | 
			
		||||
    val port: Int = 24860,
 | 
			
		||||
    val secret: String = "",
 | 
			
		||||
    val baseUrl: String = "http://localhost:${port}",
 | 
			
		||||
    val registerToken: String = ""
 | 
			
		||||
)
 | 
			
		||||
@ -10,6 +10,7 @@ import com.microsoft.graph.requests.GraphServiceClient
 | 
			
		||||
import com.microsoft.graph.tasks.IProgressCallback
 | 
			
		||||
import com.microsoft.graph.tasks.LargeFileUploadTask
 | 
			
		||||
import mu.KotlinLogging
 | 
			
		||||
import net.lamgc.scext.onedrive_transfer.remote.RemoteOneDriveTransferExecutorController
 | 
			
		||||
import okhttp3.Request
 | 
			
		||||
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
 | 
			
		||||
import org.telegram.telegrambots.meta.api.methods.GetFile
 | 
			
		||||
@ -20,11 +21,24 @@ import java.io.InputStream
 | 
			
		||||
import java.net.SocketTimeoutException
 | 
			
		||||
import java.util.*
 | 
			
		||||
import java.util.concurrent.*
 | 
			
		||||
import java.util.concurrent.atomic.AtomicReference
 | 
			
		||||
 | 
			
		||||
object OneDriveTransferCenter {
 | 
			
		||||
 | 
			
		||||
    val executor: OneDriveTransferTaskExecutor =
 | 
			
		||||
        LocalOneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, LinkedBlockingQueue())
 | 
			
		||||
    private val executorRef = AtomicReference<OneDriveTransferTaskExecutor>()
 | 
			
		||||
 | 
			
		||||
    private val executor: OneDriveTransferTaskExecutor
 | 
			
		||||
        get() = executorRef.get() ?: throw IllegalStateException("OneDriveTransferCenter 未初始化.")
 | 
			
		||||
 | 
			
		||||
    fun initial(config: ExtensionConfig) {
 | 
			
		||||
        val taskExecutor = if (config.centralSetting.enable) {
 | 
			
		||||
            RemoteOneDriveTransferExecutorController(config.centralSetting, DefaultOneDriveTransferCallback)
 | 
			
		||||
        } else {
 | 
			
		||||
            LocalOneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        executorRef.set(taskExecutor)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fun submitUploadTask(task: OneDriveTransferTask): Boolean = executor.submitTransferTask(task)
 | 
			
		||||
 | 
			
		||||
@ -74,7 +88,7 @@ interface OneDriveTransferTaskExecutor {
 | 
			
		||||
class LocalOneDriveTransferTaskExecutor(
 | 
			
		||||
    private val threadNum: Int,
 | 
			
		||||
    private val callback: OneDriveTransferCallback,
 | 
			
		||||
    private val taskQueue: BlockingQueue<OneDriveTransferTask>,
 | 
			
		||||
    private val taskQueue: BlockingQueue<OneDriveTransferTask> = LinkedBlockingQueue(),
 | 
			
		||||
    private val chunkSize: Int = 26
 | 
			
		||||
) : ThreadPoolExecutor(
 | 
			
		||||
    threadNum, threadNum, 0, TimeUnit.SECONDS,
 | 
			
		||||
@ -228,7 +242,8 @@ class LocalOneDriveTransferTaskExecutor(
 | 
			
		||||
            throw IllegalStateException("OneDrive 剩余空间不足.")
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        val filePath = checkAndGetPath(graphClient, task.onedriveId, task.storagePath, task.document.fileName)
 | 
			
		||||
        val filePath =
 | 
			
		||||
            OneDriveUtils.checkAndGetPath(graphClient, task.onedriveId, task.storagePath, task.document.fileName)
 | 
			
		||||
        logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
 | 
			
		||||
 | 
			
		||||
        if (file.fileSize < 4 * 1024 * 1024) {
 | 
			
		||||
@ -246,6 +261,7 @@ class LocalOneDriveTransferTaskExecutor(
 | 
			
		||||
                    DriveItemCreateUploadSessionParameterSet.newBuilder()
 | 
			
		||||
                        .withItem(DriveItemUploadableProperties().apply {
 | 
			
		||||
                            fileSize = file.fileSize
 | 
			
		||||
                            name = task.document.fileName
 | 
			
		||||
                        })
 | 
			
		||||
                        .build()
 | 
			
		||||
                )
 | 
			
		||||
@ -261,7 +277,6 @@ class LocalOneDriveTransferTaskExecutor(
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            val fileStream = getFileStream(task.bot, file.filePath)
 | 
			
		||||
 | 
			
		||||
            val largeFileUploadTask = LargeFileUploadTask(
 | 
			
		||||
                uploadSession,
 | 
			
		||||
                graphClient,
 | 
			
		||||
@ -291,7 +306,18 @@ class LocalOneDriveTransferTaskExecutor(
 | 
			
		||||
        return bot.telegramClient.downloadFileAsStream(filePath)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private fun checkAndGetPath(
 | 
			
		||||
    companion object {
 | 
			
		||||
        const val ONCE_CHUNK_SIZE = 320 * 1024
 | 
			
		||||
        const val MAX_CHUNK_SIZE = 192
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
object OneDriveUtils {
 | 
			
		||||
 | 
			
		||||
    private val logger = KotlinLogging.logger { }
 | 
			
		||||
 | 
			
		||||
    fun checkAndGetPath(
 | 
			
		||||
        graphClient: GraphServiceClient<Request>,
 | 
			
		||||
        driveId: String,
 | 
			
		||||
        storagePath: String,
 | 
			
		||||
@ -405,10 +431,6 @@ class LocalOneDriveTransferTaskExecutor(
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    companion object {
 | 
			
		||||
        const val ONCE_CHUNK_SIZE = 320 * 1024
 | 
			
		||||
        const val MAX_CHUNK_SIZE = 192
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -423,6 +445,11 @@ enum class OneDriveTransferStatus {
 | 
			
		||||
     */
 | 
			
		||||
    GETTING_FILE_INFO,
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 正在下载文件.
 | 
			
		||||
     */
 | 
			
		||||
    DOWNLOADING_FILE,
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 正在创建上传会话.
 | 
			
		||||
     */
 | 
			
		||||
@ -26,9 +26,9 @@ import java.net.MalformedURLException
 | 
			
		||||
import java.net.URL
 | 
			
		||||
import java.util.concurrent.TimeUnit
 | 
			
		||||
 | 
			
		||||
class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) : AbilityExtension {
 | 
			
		||||
class OneDriveTransferExtension(val bot: BaseAbilityBot, private val dataFolder: File) : AbilityExtension {
 | 
			
		||||
 | 
			
		||||
    private val logger = KotlinLogging.logger {  }
 | 
			
		||||
    private val logger = KotlinLogging.logger { }
 | 
			
		||||
 | 
			
		||||
    private val config: ExtensionConfig
 | 
			
		||||
    private val onedriveService: OneDriveTransferService
 | 
			
		||||
@ -44,9 +44,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
 | 
			
		||||
        config = loadConfiguration()
 | 
			
		||||
        val db = Database.connect("jdbc:sqlite:${File(dataFolder, "./data.db").canonicalPath}", "org.sqlite.JDBC")
 | 
			
		||||
        onedriveService = OneDriveTransferService(bot, config, db)
 | 
			
		||||
 | 
			
		||||
        OneDriveTransferCenter.initial(config)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fun loadConfiguration(): ExtensionConfig {
 | 
			
		||||
    private fun loadConfiguration(): ExtensionConfig {
 | 
			
		||||
        val configFile = File(this.dataFolder, "config.json")
 | 
			
		||||
        val objectMapper = ObjectMapper().registerKotlinModule()
 | 
			
		||||
        if (!configFile.exists()) {
 | 
			
		||||
@ -104,11 +106,13 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
 | 
			
		||||
                
 | 
			
		||||
                -------------------------------------------
 | 
			
		||||
                登录成功后会显示无法访问,是正常情况,请将地址栏的链接发回给机器人。
 | 
			
		||||
            """.trimIndent(), ctx.chatId())
 | 
			
		||||
            """.trimIndent(), ctx.chatId()
 | 
			
		||||
            )
 | 
			
		||||
        }
 | 
			
		||||
        .enableStats()
 | 
			
		||||
        .reply(Reply.of(
 | 
			
		||||
            {bot, upd ->
 | 
			
		||||
        .reply(
 | 
			
		||||
            Reply.of(
 | 
			
		||||
            { bot, upd ->
 | 
			
		||||
                try {
 | 
			
		||||
                    val account = onedriveService.updateAccount(upd.message.chat.id, URL(upd.message.text.trim()))
 | 
			
		||||
                    actionCache.invalidate(upd.message.chatId)
 | 
			
		||||
@ -117,7 +121,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
 | 
			
		||||
                        登录成功!
 | 
			
		||||
                        Microsoft 账号:${account.userName}
 | 
			
		||||
                        请使用 /select_drive 选择 OneDrive 驱动器以及设置上传路径。
 | 
			
		||||
                    """.trimIndent(), upd.message.chatId)
 | 
			
		||||
                    """.trimIndent(), upd.message.chatId
 | 
			
		||||
                    )
 | 
			
		||||
                } catch (e: MsalInteractionRequiredException) {
 | 
			
		||||
                    if (e.errorCode() == "AADSTS54005") {
 | 
			
		||||
                        bot.silent.send("登录失败,登录链接已过期,请重新登录。", upd.message.chatId)
 | 
			
		||||
@ -153,7 +158,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
 | 
			
		||||
                """
 | 
			
		||||
                当前账户已登录 OneDrive.
 | 
			
		||||
                Microsoft 账号: ${account.userName}
 | 
			
		||||
            """.trimIndent(), it.chatId())
 | 
			
		||||
            """.trimIndent(), it.chatId()
 | 
			
		||||
            )
 | 
			
		||||
        }
 | 
			
		||||
        .build()
 | 
			
		||||
 | 
			
		||||
@ -180,7 +186,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
 | 
			
		||||
            val msgContent = """
 | 
			
		||||
                当前账户已登录 OneDrive.
 | 
			
		||||
                Microsoft 账号: ${onedriveService.accountManager.getAccountByTgUserId(it.chatId())?.userName}
 | 
			
		||||
                当前正在使用的驱动器为:[${currentDrive?.driveType ?: "None" }] ${currentDrive?.name ?: "无"}
 | 
			
		||||
                当前正在使用的驱动器为:[${currentDrive?.driveType ?: "None"}] ${currentDrive?.name ?: "无"}
 | 
			
		||||
                """.trimIndent()
 | 
			
		||||
 | 
			
		||||
            SendMessage.builder()
 | 
			
		||||
@ -300,12 +306,15 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
 | 
			
		||||
                actionCache.invalidate(upd.message.chatId)
 | 
			
		||||
                bot.silent.execute(
 | 
			
		||||
                    SendMessage.builder()
 | 
			
		||||
                    .chatId(upd.message.chatId.toString())
 | 
			
		||||
                    .text("已取消设置.")
 | 
			
		||||
                    .replyMarkup(ReplyKeyboardRemove.builder()
 | 
			
		||||
                        .removeKeyboard(true)
 | 
			
		||||
                        .build())
 | 
			
		||||
                    .build())
 | 
			
		||||
                        .chatId(upd.message.chatId.toString())
 | 
			
		||||
                        .text("已取消设置.")
 | 
			
		||||
                        .replyMarkup(
 | 
			
		||||
                            ReplyKeyboardRemove.builder()
 | 
			
		||||
                                .removeKeyboard(true)
 | 
			
		||||
                                .build()
 | 
			
		||||
                        )
 | 
			
		||||
                        .build()
 | 
			
		||||
                )
 | 
			
		||||
                return@reply
 | 
			
		||||
            }
 | 
			
		||||
            val transferSetting = onedriveService.accountManager.getTransferSetting(upd.message.chatId)
 | 
			
		||||
@ -313,12 +322,15 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
 | 
			
		||||
                actionCache.invalidate(upd.message.chatId)
 | 
			
		||||
                bot.silent.execute(
 | 
			
		||||
                    SendMessage.builder()
 | 
			
		||||
                    .chatId(upd.message.chatId.toString())
 | 
			
		||||
                    .text("当前账户没有登录 OneDrive.")
 | 
			
		||||
                    .replyMarkup(ReplyKeyboardRemove.builder()
 | 
			
		||||
                        .removeKeyboard(true)
 | 
			
		||||
                        .build())
 | 
			
		||||
                    .build())
 | 
			
		||||
                        .chatId(upd.message.chatId.toString())
 | 
			
		||||
                        .text("当前账户没有登录 OneDrive.")
 | 
			
		||||
                        .replyMarkup(
 | 
			
		||||
                            ReplyKeyboardRemove.builder()
 | 
			
		||||
                                .removeKeyboard(true)
 | 
			
		||||
                                .build()
 | 
			
		||||
                        )
 | 
			
		||||
                        .build()
 | 
			
		||||
                )
 | 
			
		||||
                return@reply
 | 
			
		||||
            }
 | 
			
		||||
            onedriveService.accountManager.doSomething {
 | 
			
		||||
@ -327,17 +339,21 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
 | 
			
		||||
            actionCache.invalidate(upd.message.chatId)
 | 
			
		||||
            bot.silent.execute(
 | 
			
		||||
                DeleteMessage.builder()
 | 
			
		||||
                .chatId(upd.message.chatId.toString())
 | 
			
		||||
                .messageId(upd.message.messageId)
 | 
			
		||||
                .build())
 | 
			
		||||
                    .chatId(upd.message.chatId.toString())
 | 
			
		||||
                    .messageId(upd.message.messageId)
 | 
			
		||||
                    .build()
 | 
			
		||||
            )
 | 
			
		||||
            bot.silent.execute(
 | 
			
		||||
                SendMessage.builder()
 | 
			
		||||
                .chatId(upd.message.chatId.toString())
 | 
			
		||||
                .text("已设置上传路径为:$path")
 | 
			
		||||
                .replyMarkup(ReplyKeyboardRemove.builder()
 | 
			
		||||
                    .removeKeyboard(true)
 | 
			
		||||
                    .build())
 | 
			
		||||
                .build())
 | 
			
		||||
                    .chatId(upd.message.chatId.toString())
 | 
			
		||||
                    .text("已设置上传路径为:$path")
 | 
			
		||||
                    .replyMarkup(
 | 
			
		||||
                        ReplyKeyboardRemove.builder()
 | 
			
		||||
                            .removeKeyboard(true)
 | 
			
		||||
                            .build()
 | 
			
		||||
                    )
 | 
			
		||||
                    .build()
 | 
			
		||||
            )
 | 
			
		||||
        }, { upd ->
 | 
			
		||||
            upd.hasMessage() && upd.message.hasText() && actionCache.getIfPresent(upd.message.chatId) == "set_path"
 | 
			
		||||
        })
 | 
			
		||||
@ -1,12 +1,12 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer
 | 
			
		||||
 | 
			
		||||
import com.microsoft.aad.msal4j.*
 | 
			
		||||
import org.jetbrains.exposed.sql.Database
 | 
			
		||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
 | 
			
		||||
import org.jetbrains.exposed.sql.transactions.transaction
 | 
			
		||||
import com.microsoft.aad.msal4j.*
 | 
			
		||||
import java.sql.Connection
 | 
			
		||||
import java.net.URL
 | 
			
		||||
import java.net.URI
 | 
			
		||||
import java.net.URL
 | 
			
		||||
import java.sql.Connection
 | 
			
		||||
 | 
			
		||||
class OneDriveTransferSettingManager(private val authClient: ConfidentialClientApplication, private val db: Database) {
 | 
			
		||||
 | 
			
		||||
@ -17,7 +17,8 @@ class OneDriveTransferSettingManager(private val authClient: ConfidentialClientA
 | 
			
		||||
 | 
			
		||||
    fun getTransferSetting(userId: Long): OneDriveTransferSetting? {
 | 
			
		||||
        return transaction(db) {
 | 
			
		||||
            return@transaction OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
 | 
			
		||||
            return@transaction OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }
 | 
			
		||||
                .firstOrNull()
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -49,7 +50,8 @@ class OneDriveTransferSettingManager(private val authClient: ConfidentialClientA
 | 
			
		||||
        )
 | 
			
		||||
        val result = future.get()
 | 
			
		||||
        return transaction(db) {
 | 
			
		||||
            val account = OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
 | 
			
		||||
            val account =
 | 
			
		||||
                OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
 | 
			
		||||
            account?.apply {
 | 
			
		||||
                accountId = result.account().homeAccountId()
 | 
			
		||||
                userName = result.account().username()
 | 
			
		||||
@ -25,7 +25,7 @@ class OneDriveTransferService(
 | 
			
		||||
    private val config: ExtensionConfig,
 | 
			
		||||
    private val db: Database
 | 
			
		||||
) {
 | 
			
		||||
    private val logger = KotlinLogging.logger {  }
 | 
			
		||||
    private val logger = KotlinLogging.logger { }
 | 
			
		||||
 | 
			
		||||
    val accountManager: OneDriveTransferSettingManager
 | 
			
		||||
    private val authClient: ConfidentialClientApplication = ConfidentialClientApplication.builder(
 | 
			
		||||
@ -40,7 +40,7 @@ class OneDriveTransferService(
 | 
			
		||||
 | 
			
		||||
    init {
 | 
			
		||||
        transaction(db) {
 | 
			
		||||
            SchemaUtils.create(OneDriveTransferSettings, TokenCaches)
 | 
			
		||||
            SchemaUtils.create(OneDriveTransferSettings, TokenCaches, RegisteredRemoteTransferWorkers)
 | 
			
		||||
        }
 | 
			
		||||
        accountManager = OneDriveTransferSettingManager(authClient, db)
 | 
			
		||||
    }
 | 
			
		||||
@ -138,7 +138,10 @@ class OneDriveTransferService(
 | 
			
		||||
    companion object {
 | 
			
		||||
        private val THREAD_CURRENT_GRAPH_CLIENT = ThreadLocal<ClientCache>()
 | 
			
		||||
 | 
			
		||||
        fun createGraphClient(authClient: ConfidentialClientApplication, iAccount: IAccount): GraphServiceClient<Request> {
 | 
			
		||||
        fun createGraphClient(
 | 
			
		||||
            authClient: ConfidentialClientApplication,
 | 
			
		||||
            iAccount: IAccount
 | 
			
		||||
        ): GraphServiceClient<Request> {
 | 
			
		||||
            return GraphServiceClient.builder()
 | 
			
		||||
                .httpClient(HttpClients.createDefault(MsalAuthorizationProvider(authClient, iAccount)))
 | 
			
		||||
                .buildClient()
 | 
			
		||||
@ -151,7 +154,10 @@ private data class ClientCache(
 | 
			
		||||
    val client: GraphServiceClient<Request>,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
class MsalAuthorizationProvider(private val authClientApplication: ConfidentialClientApplication, private val iAccount: IAccount) : IAuthenticationProvider {
 | 
			
		||||
class MsalAuthorizationProvider(
 | 
			
		||||
    private val authClientApplication: ConfidentialClientApplication,
 | 
			
		||||
    private val iAccount: IAccount
 | 
			
		||||
) : IAuthenticationProvider {
 | 
			
		||||
    override fun getAuthorizationTokenAsync(requestUrl: URL): CompletableFuture<String> {
 | 
			
		||||
        return authClientApplication.acquireTokenSilently(
 | 
			
		||||
            SilentParameters.builder(OneDriveTransferSettingManager.OAUTH2_SCOPE, iAccount).build()
 | 
			
		||||
@ -0,0 +1,336 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer.remote
 | 
			
		||||
 | 
			
		||||
import com.microsoft.graph.models.DriveItemCreateUploadSessionParameterSet
 | 
			
		||||
import com.microsoft.graph.models.DriveItemUploadableProperties
 | 
			
		||||
import io.ktor.http.*
 | 
			
		||||
import io.ktor.server.application.*
 | 
			
		||||
import io.ktor.server.auth.*
 | 
			
		||||
import io.ktor.server.engine.*
 | 
			
		||||
import io.ktor.server.netty.*
 | 
			
		||||
import io.ktor.server.response.*
 | 
			
		||||
import io.ktor.server.routing.*
 | 
			
		||||
import io.ktor.util.collections.*
 | 
			
		||||
import mu.KotlinLogging
 | 
			
		||||
import net.lamgc.scext.onedrive_transfer.*
 | 
			
		||||
import net.lamgc.scext.onedrive_transfer.remote.request.*
 | 
			
		||||
import okhttp3.internal.toImmutableMap
 | 
			
		||||
import org.jetbrains.exposed.sql.SqlExpressionBuilder.greaterEq
 | 
			
		||||
import java.time.LocalDateTime
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap
 | 
			
		||||
import java.util.concurrent.ConcurrentLinkedDeque
 | 
			
		||||
 | 
			
		||||
class RemoteOneDriveTransferExecutorController(
 | 
			
		||||
    private val config: CentralSetting,
 | 
			
		||||
    private val callback: OneDriveTransferCallback
 | 
			
		||||
) : OneDriveTransferTaskExecutor {
 | 
			
		||||
 | 
			
		||||
    private val logger = KotlinLogging.logger { }
 | 
			
		||||
 | 
			
		||||
    private val pendingTasks = ConcurrentLinkedDeque<OneDriveTransferTask>()
 | 
			
		||||
    private val workingTasks = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
 | 
			
		||||
    private val preAllocatedTasks = ConcurrentHashMap<OneDriveTransferTask, PreAllocateTaskEntry>()
 | 
			
		||||
    private val cancelledTasks = ConcurrentSet<OneDriveTransferTask>()
 | 
			
		||||
 | 
			
		||||
    init {
 | 
			
		||||
        embeddedServer(Netty, port = config.port) {
 | 
			
		||||
            install(Authentication) {
 | 
			
		||||
                bearer(AUTHENTICATE_CONFIG_ID) {
 | 
			
		||||
                    authenticate {
 | 
			
		||||
                        val worker =
 | 
			
		||||
                            RegisteredRemoteTransferWorker.find { RegisteredRemoteTransferWorkers.token eq it.token }
 | 
			
		||||
                                .limit(1)
 | 
			
		||||
                                .firstOrNull()
 | 
			
		||||
 | 
			
		||||
                        if (worker != null) {
 | 
			
		||||
                            worker.lastContactAt = LocalDateTime.now()
 | 
			
		||||
                        }
 | 
			
		||||
                        return@authenticate worker
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            registerRemoteWorkerRegisterRouting()
 | 
			
		||||
            registerTaskAllocationRouting()
 | 
			
		||||
            registerTaskProgressUpdateRouting()
 | 
			
		||||
            registerFileUploadSessionRequestRouting()
 | 
			
		||||
        }.start(wait = false)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    override fun submitTransferTask(task: OneDriveTransferTask): Boolean {
 | 
			
		||||
        pendingTasks.addLast(task)
 | 
			
		||||
        try {
 | 
			
		||||
            callback.onTransferTaskCreated(task)
 | 
			
		||||
        } catch (e: Exception) {
 | 
			
		||||
            logger.error(e) { "Error occurred while calling onTransferTaskCreated." }
 | 
			
		||||
        }
 | 
			
		||||
        return true
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    override fun cancelTransferTask(task: OneDriveTransferTask): Boolean {
 | 
			
		||||
        if (pendingTasks.remove(task)) {
 | 
			
		||||
            try {
 | 
			
		||||
                callback.onTransferCancelled(task, null)
 | 
			
		||||
            } catch (e: Exception) {
 | 
			
		||||
                logger.error(e) { "Error occurred while calling onTransferCancelled." }
 | 
			
		||||
            }
 | 
			
		||||
            return true
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (workingTasks.values.any { it.currentTask == task }) {
 | 
			
		||||
            cancelledTasks.add(task)
 | 
			
		||||
            return true
 | 
			
		||||
        } else if (preAllocatedTasks.contains(task)) {
 | 
			
		||||
            preAllocatedTasks.remove(task)
 | 
			
		||||
            try {
 | 
			
		||||
                callback.onTransferCancelled(task, null)
 | 
			
		||||
            } catch (e: Exception) {
 | 
			
		||||
                logger.error(e) { "Error occurred while calling onTransferCancelled." }
 | 
			
		||||
            }
 | 
			
		||||
            return true
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return false
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    override fun getQueuedTransferTasks(): List<OneDriveTransferTask> {
 | 
			
		||||
        return this.pendingTasks.toList()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    override fun getWorkerCount(): Int {
 | 
			
		||||
        // 在近 1 小时内有联系的 Worker 数量
 | 
			
		||||
        val minusHours = LocalDateTime.now().minusHours(1)
 | 
			
		||||
        return RegisteredRemoteTransferWorker
 | 
			
		||||
            .count(RegisteredRemoteTransferWorkers.lastContactAt greaterEq minusHours).toInt()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    override fun getWorkingTasks(): Map<Int, OneDriveTransferWorkerProgress> {
 | 
			
		||||
        return workingTasks.toImmutableMap()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private fun Application.registerRemoteWorkerRegisterRouting() {
 | 
			
		||||
        routing {
 | 
			
		||||
            post<RegisterWorkerRequest> {
 | 
			
		||||
                if (config.registerToken != it.token) {
 | 
			
		||||
                    call.respond(HttpStatusCode.Forbidden, "Invalid token.")
 | 
			
		||||
                    return@post
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                val existsWorker =
 | 
			
		||||
                    RegisteredRemoteTransferWorker.find { RegisteredRemoteTransferWorkers.workerName eq it.workerName }
 | 
			
		||||
                        .firstOrNull()
 | 
			
		||||
 | 
			
		||||
                val workerToken = randomString(128)
 | 
			
		||||
                if (existsWorker != null) {
 | 
			
		||||
                    val now = LocalDateTime.now()
 | 
			
		||||
                    // 如果 existsWorker 的 lastContactAt 距离现在不超过 12 小时, 则拒绝重新注册
 | 
			
		||||
                    if (existsWorker.lastContactAt > now.minusHours(12)) {
 | 
			
		||||
                        call.respond(HttpStatusCode.Forbidden, "Worker already registered.")
 | 
			
		||||
                        return@post
 | 
			
		||||
                    } else {
 | 
			
		||||
                        existsWorker.token = workerToken
 | 
			
		||||
                        existsWorker.registeredAt = now
 | 
			
		||||
                        existsWorker.lastContactAt = now
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    RegisteredRemoteTransferWorker.new {
 | 
			
		||||
                        this.workerName = it.workerName
 | 
			
		||||
                        this.registeredAt = LocalDateTime.now()
 | 
			
		||||
                        this.lastContactAt = LocalDateTime.now()
 | 
			
		||||
                        this.token = workerToken
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                call.respond(HttpStatusCode.OK, RegisterWorkerResponse(true, "Worker registered.", workerToken))
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private fun Application.registerTaskAllocationRouting() {
 | 
			
		||||
        routing {
 | 
			
		||||
            authenticate(AUTHENTICATE_CONFIG_ID) {
 | 
			
		||||
                post<TaskAllocationRequest> { request ->
 | 
			
		||||
                    val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
 | 
			
		||||
                        ?: return@post call.respond(HttpStatusCode.Unauthorized)
 | 
			
		||||
 | 
			
		||||
                    // 寻找 pendingTasks 中 fileSize 不超过 worker 的限制的任务
 | 
			
		||||
                    val task = pendingTasks.find { task ->
 | 
			
		||||
                        task.document.fileSize <= request.acceptMaxSize && task !in cancelledTasks && preAllocatedTasks.computeIfAbsent(
 | 
			
		||||
                            task
 | 
			
		||||
                        ) {
 | 
			
		||||
                            PreAllocateTaskEntry(task, worker)
 | 
			
		||||
                        }.worker == worker
 | 
			
		||||
                    } ?: return@post call.respond(HttpStatusCode.NoContent)
 | 
			
		||||
 | 
			
		||||
                    call.respond(
 | 
			
		||||
                        HttpStatusCode.OK, TaskAllocationResponse(
 | 
			
		||||
                            true, TaskAllocationInfo(
 | 
			
		||||
                                taskId = task.id,
 | 
			
		||||
                                ownerBotToken = "",
 | 
			
		||||
                                fileId = task.document.fileId,
 | 
			
		||||
                                fileSize = task.document.fileSize
 | 
			
		||||
                            )
 | 
			
		||||
                        )
 | 
			
		||||
                    )
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                post<TaskAcceptRequest> {
 | 
			
		||||
                    val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
 | 
			
		||||
                        ?: return@post call.respond(HttpStatusCode.Unauthorized)
 | 
			
		||||
 | 
			
		||||
                    val preAllocatedTaskEntry = preAllocatedTasks.entries.find { entry ->
 | 
			
		||||
                        entry.key.id == it.taskId && entry.value.worker.id == worker.id
 | 
			
		||||
                    } ?: return@post call.respond(HttpStatusCode.OK, TaskAcceptResponse(false))
 | 
			
		||||
 | 
			
		||||
                    pendingTasks.remove(preAllocatedTaskEntry.key)
 | 
			
		||||
                    val existsTask = workingTasks[worker.id.value]
 | 
			
		||||
                    if (existsTask != null) {
 | 
			
		||||
                        if (cancelledTasks.remove(existsTask.currentTask)) {
 | 
			
		||||
                            try {
 | 
			
		||||
                                callback.onTransferCancelled(existsTask.currentTask, existsTask)
 | 
			
		||||
                            } catch (e: Exception) {
 | 
			
		||||
                                logger.error(e) { "Error occurred while calling onTransferCancelled." }
 | 
			
		||||
                            }
 | 
			
		||||
                        } else {
 | 
			
		||||
                            pendingTasks.addFirst(existsTask.currentTask)
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    val workerProgress = OneDriveTransferWorkerProgress(preAllocatedTaskEntry.key)
 | 
			
		||||
 | 
			
		||||
                    workingTasks[worker.id.value] = workerProgress
 | 
			
		||||
                    preAllocatedTasks.remove(preAllocatedTaskEntry.key)
 | 
			
		||||
 | 
			
		||||
                    try {
 | 
			
		||||
                        callback.onTransferTaskStart(workerProgress)
 | 
			
		||||
                    } catch (e: Exception) {
 | 
			
		||||
                        logger.error(e) { "Error occurred while calling onTransferTaskStart." }
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    call.respond(HttpStatusCode.OK, TaskAcceptResponse(true))
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private fun Application.registerTaskProgressUpdateRouting() {
 | 
			
		||||
        routing {
 | 
			
		||||
            authenticate(AUTHENTICATE_CONFIG_ID) {
 | 
			
		||||
                post<TaskProgressUpdateRequest> {
 | 
			
		||||
                    val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
 | 
			
		||||
                        ?: return@post call.respond(HttpStatusCode.Unauthorized)
 | 
			
		||||
 | 
			
		||||
                    val processingTask = workingTasks[worker.id.value]
 | 
			
		||||
                    if (processingTask == null || processingTask.currentTask.id != it.taskId) {
 | 
			
		||||
                        return@post call.respond(HttpStatusCode.BadRequest)
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    processingTask.progress.set(it.progress)
 | 
			
		||||
                    processingTask.status = when (it.status) {
 | 
			
		||||
                        TaskStatus.DOWNLOADING_FILE -> OneDriveTransferStatus.DOWNLOADING_FILE
 | 
			
		||||
                        TaskStatus.DOWNLOADED_FILE -> OneDriveTransferStatus.CREATING_UPLOAD_SESSION
 | 
			
		||||
                        TaskStatus.UPLOADING_FILE -> OneDriveTransferStatus.UPLOADING
 | 
			
		||||
                        TaskStatus.COMPLETED -> OneDriveTransferStatus.SUCCESS
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    if (cancelledTasks.contains(processingTask.currentTask)) {
 | 
			
		||||
                        cancelledTasks.remove(processingTask.currentTask)
 | 
			
		||||
                        workingTasks.remove(worker.id.value)
 | 
			
		||||
                        try {
 | 
			
		||||
                            callback.onTransferCancelled(processingTask.currentTask, processingTask)
 | 
			
		||||
                        } catch (e: Exception) {
 | 
			
		||||
                            logger.error(e) { "Error occurred while calling onTransferCancelled." }
 | 
			
		||||
                        }
 | 
			
		||||
                        return@post call.respond(
 | 
			
		||||
                            HttpStatusCode.OK,
 | 
			
		||||
                            TaskProgressUpdateResponse(TaskProgressUpdateStatus.CANCELLED)
 | 
			
		||||
                        )
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    try {
 | 
			
		||||
                        callback.onUploadProgress(processingTask)
 | 
			
		||||
                    } catch (e: Exception) {
 | 
			
		||||
                        logger.error(e) { "Error occurred while calling onUploadProgress." }
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    if (processingTask.status == OneDriveTransferStatus.SUCCESS) {
 | 
			
		||||
                        try {
 | 
			
		||||
                            callback.onTransferSuccess(processingTask.currentTask, processingTask)
 | 
			
		||||
                        } catch (e: Exception) {
 | 
			
		||||
                            logger.error(e) { "Error occurred while calling onTransferSuccess." }
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    return@post call.respond(
 | 
			
		||||
                        HttpStatusCode.OK,
 | 
			
		||||
                        TaskProgressUpdateResponse(TaskProgressUpdateStatus.CONTINUE)
 | 
			
		||||
                    )
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private fun Application.registerFileUploadSessionRequestRouting() {
 | 
			
		||||
        routing {
 | 
			
		||||
            authenticate(AUTHENTICATE_CONFIG_ID) {
 | 
			
		||||
                post<FileUploadSessionRequest> {
 | 
			
		||||
                    val worker = call.authentication.principal<RegisteredRemoteTransferWorker>()
 | 
			
		||||
                        ?: return@post call.respond(HttpStatusCode.Unauthorized)
 | 
			
		||||
 | 
			
		||||
                    val processingTask = workingTasks[worker.id.value]
 | 
			
		||||
                    if (processingTask == null || processingTask.currentTask.id != it.taskId) {
 | 
			
		||||
                        return@post call.respond(HttpStatusCode.BadRequest)
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    if (processingTask.status != OneDriveTransferStatus.CREATING_UPLOAD_SESSION) {
 | 
			
		||||
                        return@post call.respond(HttpStatusCode.BadRequest)
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    val task = processingTask.currentTask
 | 
			
		||||
                    val file = task.document
 | 
			
		||||
 | 
			
		||||
                    val graphClient = task.service.createGraphClient(task.tgUserId)
 | 
			
		||||
                    val drive = graphClient.drives(task.onedriveId).buildRequest().get()
 | 
			
		||||
                        ?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
 | 
			
		||||
                    if (file.fileSize > drive.quota!!.remaining!!) {
 | 
			
		||||
                        throw IllegalStateException("OneDrive 剩余空间不足.")
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    val filePath = OneDriveUtils.checkAndGetPath(
 | 
			
		||||
                        graphClient,
 | 
			
		||||
                        task.onedriveId,
 | 
			
		||||
                        task.storagePath,
 | 
			
		||||
                        task.document.fileName
 | 
			
		||||
                    )
 | 
			
		||||
                    logger.debug { "OneDrive 中转任务: ${task.document.fileName} -> $filePath" }
 | 
			
		||||
 | 
			
		||||
                    val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
 | 
			
		||||
                        .createUploadSession(
 | 
			
		||||
                            DriveItemCreateUploadSessionParameterSet.newBuilder()
 | 
			
		||||
                                .withItem(DriveItemUploadableProperties().apply {
 | 
			
		||||
                                    fileSize = file.fileSize
 | 
			
		||||
                                    name = task.document.fileName
 | 
			
		||||
                                })
 | 
			
		||||
                                .build()
 | 
			
		||||
                        )
 | 
			
		||||
                        .buildRequest()
 | 
			
		||||
                        .post() ?: throw IllegalStateException("无法创建 OneDrive 上传会话.")
 | 
			
		||||
 | 
			
		||||
                    val uploadUrl =
 | 
			
		||||
                        uploadSession.uploadUrl ?: return@post call.respond(HttpStatusCode.InternalServerError)
 | 
			
		||||
 | 
			
		||||
                    return@post call.respond(HttpStatusCode.OK, FileUploadSessionResponse(uploadUrl))
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    data class PreAllocateTaskEntry(
 | 
			
		||||
        val task: OneDriveTransferTask,
 | 
			
		||||
        val worker: RegisteredRemoteTransferWorker,
 | 
			
		||||
        val allocatedAt: LocalDateTime = LocalDateTime.now()
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    companion object {
 | 
			
		||||
        private const val AUTHENTICATE_CONFIG_ID = "worker-token"
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										18
									
								
								onedrive-transfer-remote-common/build.gradle.kts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								onedrive-transfer-remote-common/build.gradle.kts
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,18 @@
 | 
			
		||||
plugins {
 | 
			
		||||
    kotlin("jvm")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
group = "net.lamgc.scext"
 | 
			
		||||
version = "0.1.0-SNAPSHOT"
 | 
			
		||||
 | 
			
		||||
dependencies {
 | 
			
		||||
    testImplementation(kotlin("test"))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
kotlin {
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
tasks.test {
 | 
			
		||||
    useJUnitPlatform()
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,2 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer.remote.request
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,11 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer.remote.request
 | 
			
		||||
 | 
			
		||||
import java.util.*
 | 
			
		||||
 | 
			
		||||
data class FileUploadSessionRequest(
 | 
			
		||||
    val taskId: UUID
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
data class FileUploadSessionResponse(
 | 
			
		||||
    val uploadUrl: String
 | 
			
		||||
)
 | 
			
		||||
@ -0,0 +1,12 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer.remote.request
 | 
			
		||||
 | 
			
		||||
data class RegisterWorkerRequest(
 | 
			
		||||
    val workerName: String,
 | 
			
		||||
    val token: String,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
data class RegisterWorkerResponse(
 | 
			
		||||
    val success: Boolean,
 | 
			
		||||
    val message: String,
 | 
			
		||||
    val workerToken: String
 | 
			
		||||
)
 | 
			
		||||
@ -0,0 +1,21 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer.remote.request
 | 
			
		||||
 | 
			
		||||
import java.util.*
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 从节点接受任务请求.
 | 
			
		||||
 */
 | 
			
		||||
data class TaskAcceptRequest(
 | 
			
		||||
    val taskId: UUID
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 如果任务被该节点预分配, 则返回确认响应.
 | 
			
		||||
 *
 | 
			
		||||
 * 如果任务并非为该从节点预分配, 或者任务预分配过期, 被其他节点接受, 则返回拒绝响应.
 | 
			
		||||
 *
 | 
			
		||||
 * @property confirmed 是否确认接受任务.
 | 
			
		||||
 */
 | 
			
		||||
data class TaskAcceptResponse(
 | 
			
		||||
    val confirmed: Boolean = true
 | 
			
		||||
)
 | 
			
		||||
@ -0,0 +1,42 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer.remote.request
 | 
			
		||||
 | 
			
		||||
import java.util.*
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 从节点发送任务分配请求.
 | 
			
		||||
 *
 | 
			
		||||
 * 主节点将根据请求的 acceptMaxSize 进行预分配任务.
 | 
			
		||||
 *
 | 
			
		||||
 * @property acceptMaxSize 从节点可接受的最大文件大小.
 | 
			
		||||
 */
 | 
			
		||||
data class TaskAllocationRequest(
 | 
			
		||||
    val acceptMaxSize: Long
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 主节点返回的任务分配结果.
 | 
			
		||||
 *
 | 
			
		||||
 * @property allocated 是否成功分配了任务.
 | 
			
		||||
 * @property info 预分配的任务信息.
 | 
			
		||||
 */
 | 
			
		||||
data class TaskAllocationResponse(
 | 
			
		||||
    val allocated: Boolean,
 | 
			
		||||
    val info: TaskAllocationInfo? = null
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * 任务分配信息.
 | 
			
		||||
 *
 | 
			
		||||
 * 主节点将为从节点保留任务一段时间, 直至从节点接受任务, 或者预分配任务超时.
 | 
			
		||||
 *
 | 
			
		||||
 * @property taskId 任务 ID.
 | 
			
		||||
 * @property ownerBotToken 任务所属的 Bot Token.
 | 
			
		||||
 * @property fileId 文件 ID.
 | 
			
		||||
 * @property fileSize 文件大小.
 | 
			
		||||
 */
 | 
			
		||||
data class TaskAllocationInfo(
 | 
			
		||||
    val taskId: UUID,
 | 
			
		||||
    val ownerBotToken: String,
 | 
			
		||||
    val fileId: String,
 | 
			
		||||
    val fileSize: Long,
 | 
			
		||||
)
 | 
			
		||||
@ -0,0 +1,38 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer.remote.request
 | 
			
		||||
 | 
			
		||||
import java.util.*
 | 
			
		||||
 | 
			
		||||
data class TaskProgressUpdateRequest(
 | 
			
		||||
    val taskId: UUID,
 | 
			
		||||
    val status: TaskStatus,
 | 
			
		||||
    val progress: Double
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
data class TaskProgressUpdateResponse(
 | 
			
		||||
    val status: TaskProgressUpdateStatus
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
enum class TaskStatus {
 | 
			
		||||
 | 
			
		||||
    DOWNLOADING_FILE,
 | 
			
		||||
 | 
			
		||||
    DOWNLOADED_FILE,
 | 
			
		||||
 | 
			
		||||
    UPLOADING_FILE,
 | 
			
		||||
 | 
			
		||||
    COMPLETED
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
enum class TaskProgressUpdateStatus {
 | 
			
		||||
    /**
 | 
			
		||||
     * 任务可以继续进行.
 | 
			
		||||
     */
 | 
			
		||||
    CONTINUE,
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 任务已取消, Worker 应该放弃该任务.
 | 
			
		||||
     */
 | 
			
		||||
    CANCELLED,
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										36
									
								
								onedrive-transfer-remote-worker/build.gradle.kts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								onedrive-transfer-remote-worker/build.gradle.kts
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,36 @@
 | 
			
		||||
plugins {
 | 
			
		||||
    kotlin("jvm") version "2.1.0"
 | 
			
		||||
    application
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
group = "net.lamgc.scext"
 | 
			
		||||
version = "0.1.0-SNAPSHOT"
 | 
			
		||||
 | 
			
		||||
repositories {
 | 
			
		||||
    mavenCentral()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
application {
 | 
			
		||||
    mainClass = ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
dependencies {
 | 
			
		||||
    implementation(project(":onedrive-transfer-remote-common"))
 | 
			
		||||
 | 
			
		||||
    implementation("org.slf4j:slf4j-api:2.0.11")
 | 
			
		||||
    implementation("io.github.microutils:kotlin-logging:3.0.5")
 | 
			
		||||
    implementation("ch.qos.logback:logback-classic:1.5.15")
 | 
			
		||||
 | 
			
		||||
    implementation("com.github.ajalt.clikt:clikt:5.0.2")
 | 
			
		||||
    implementation("com.github.ajalt.clikt:clikt-markdown:5.0.2")
 | 
			
		||||
 | 
			
		||||
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
 | 
			
		||||
    implementation("org.jetbrains.kotlin:kotlin-reflect:2.1.0")
 | 
			
		||||
    implementation("com.google.code.gson:gson:2.10.1")
 | 
			
		||||
 | 
			
		||||
    testImplementation(kotlin("test"))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
tasks.test {
 | 
			
		||||
    useJUnitPlatform()
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										35
									
								
								onedrive-transfer-remote-worker/src/main/kotlin/Main.kt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								onedrive-transfer-remote-worker/src/main/kotlin/Main.kt
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,35 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer.remote.worker
 | 
			
		||||
 | 
			
		||||
import com.github.ajalt.clikt.core.CliktCommand
 | 
			
		||||
import com.github.ajalt.clikt.core.main
 | 
			
		||||
import com.github.ajalt.clikt.parameters.options.*
 | 
			
		||||
import com.github.ajalt.clikt.parameters.types.ulong
 | 
			
		||||
import java.io.File
 | 
			
		||||
import kotlin.math.min
 | 
			
		||||
 | 
			
		||||
class Main : CliktCommand("overdrive-transfer-worker") {
 | 
			
		||||
 | 
			
		||||
    private val host: String by option()
 | 
			
		||||
        .required()
 | 
			
		||||
        .help("Controller URL.")
 | 
			
		||||
 | 
			
		||||
    private val tempDir: String by option()
 | 
			
		||||
        .help("Temp directory.")
 | 
			
		||||
        .default("./work-temp/")
 | 
			
		||||
 | 
			
		||||
    private val maxAllowedFileSize by option()
 | 
			
		||||
        .ulong()
 | 
			
		||||
        .help("Max allowed file size.")
 | 
			
		||||
        .defaultLazy {
 | 
			
		||||
            // 获取所在硬盘的大小,最大不超过 4 G
 | 
			
		||||
            val disk = File(tempDir).toPath().root.toFile()
 | 
			
		||||
            return@defaultLazy min(disk.usableSpace, 536870912).toULong()
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    override fun run() {
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fun main(args: Array<String>) = Main().main(args)
 | 
			
		||||
@ -2,4 +2,6 @@ plugins {
 | 
			
		||||
    id("org.gradle.toolchains.foojay-resolver-convention") version "0.5.0"
 | 
			
		||||
}
 | 
			
		||||
rootProject.name = "onedrive-transfer"
 | 
			
		||||
 | 
			
		||||
include("onedrive-transfer-bot-extension")
 | 
			
		||||
include("onedrive-transfer-remote-common")
 | 
			
		||||
include("onedrive-transfer-remote-worker")
 | 
			
		||||
 | 
			
		||||
@ -1,5 +0,0 @@
 | 
			
		||||
package net.lamgc.scext.onedrive_transfer
 | 
			
		||||
 | 
			
		||||
fun main() {
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user