feat: 推了很多进度。

This commit is contained in:
LamGC 2024-01-08 08:57:59 +08:00
parent aa84499d62
commit 4e07309066
Signed by: LamGC
GPG Key ID: 6C5AE2A913941E1D
8 changed files with 401 additions and 68 deletions

View File

@ -2,6 +2,7 @@ import java.net.URI
plugins {
kotlin("jvm") version "1.9.21"
`maven-publish`
}
group = "net.lamgc.scext"
@ -19,8 +20,14 @@ repositories {
dependencies {
compileOnly("org.slf4j:slf4j-api:2.0.10")
compileOnly("io.github.microutils:kotlin-logging:3.0.5")
implementation("ch.qos.logback:logback-classic:1.4.14")
compileOnly("net.lamgc:scalabot-extension:0.6.1")
implementation("com.fasterxml.jackson.core:jackson-core:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.16.1")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.16.1")
val exposedVersion = "0.45.0"
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-crypt:$exposedVersion")
@ -44,3 +51,27 @@ tasks.test {
kotlin {
jvmToolchain(17)
}
publishing {
repositories {
// maven("https://git.lamgc.me/api/packages/LamGC/maven") {
// credentials {
// username = project.properties["repo.credentials.self-git.username"].toString()
// password = project.properties["repo.credentials.self-git.password"].toString()
// }
// }
mavenLocal()
}
publications {
create<MavenPublication>("maven") {
from(components["java"])
pom {
name.set("ScalaExt-OneDriveTransfer")
description.set("将 Telegram 中的文件转存至 OneDrive.")
}
}
}
}

View File

@ -9,19 +9,22 @@ import org.jetbrains.exposed.dao.id.LongIdTable
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.transactions.transaction
object MicrosoftAccounts : LongIdTable() {
object OneDriveTransferSettings : LongIdTable() {
val telegramUserId = long("tg_user_id").uniqueIndex()
val accountId = varchar("account_id", 128)
val userName = varchar("user_name", 96)
val driveId = varchar("drive_id", 256)
val storagePath = varchar("storage_path", 512)
}
class MicrosoftAccount(id: EntityID<Long>) : LongEntity(id) {
var telegramUserId by MicrosoftAccounts.telegramUserId
var accountId by MicrosoftAccounts.accountId
var userName by MicrosoftAccounts.accountId
class OneDriveTransferSetting(id: EntityID<Long>) : LongEntity(id) {
var telegramUserId by OneDriveTransferSettings.telegramUserId
var accountId by OneDriveTransferSettings.accountId
var userName by OneDriveTransferSettings.userName
var driveId by OneDriveTransferSettings.driveId
var storagePath by OneDriveTransferSettings.storagePath
companion object : LongEntityClass<MicrosoftAccount>(MicrosoftAccounts)
companion object : LongEntityClass<OneDriveTransferSetting>(OneDriveTransferSettings)
}
object TokenCaches : LongIdTable() {

View File

@ -1,7 +1,9 @@
package net.lamgc.scext.onedrive_transfer
data class ExtensionConfig(
val clientId: String,
val clientSecret: String,
val useCommandPrefix: Boolean
val clientId: String = "",
val clientSecret: String = "",
val useCommandPrefix: Boolean = true,
val maxFileSize: Long = 1024L * 1024 * 1024 * 4,
val maxTransferSize: Long = 1024L * 1024 * 1024 * 20,
)

View File

@ -0,0 +1,177 @@
package net.lamgc.scext.onedrive_transfer
import com.google.common.util.concurrent.AtomicDouble
import com.microsoft.graph.models.DriveItem
import com.microsoft.graph.models.DriveItemCreateUploadSessionParameterSet
import com.microsoft.graph.models.DriveItemUploadableProperties
import com.microsoft.graph.requests.GraphServiceClient
import com.microsoft.graph.tasks.IProgressCallback
import com.microsoft.graph.tasks.LargeFileUploadTask
import mu.KotlinLogging
import okhttp3.Request
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.GetFile
import org.telegram.telegrambots.meta.api.objects.Document
import java.util.*
import java.util.concurrent.*
import kotlin.math.log
object OneDriveTransferCenter {
private val queue = PriorityBlockingQueue(100,
compareBy<OneDriveTransferTask> { it.document.fileSize }.thenBy { it.createdAt.time })
fun submitUploadTask(task: OneDriveTransferTask) {
queue.put(task)
}
}
object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
override fun onProgress(progress: OneDriveTransferWorkerProgress) {
TODO("Not yet implemented")
}
override fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
task.bot.silent().send("""
OneDrive 中转任务执行失败
文件名${task.document.fileName}
""".trimIndent(), task.tgUserId)
}
override fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
task.bot.silent().send("""
OneDrive 中转任务执行成功
文件名${task.document.fileName}
""".trimIndent(), task.tgUserId)
}
}
interface OneDriveTransferCallback {
fun onProgress(progress: OneDriveTransferWorkerProgress)
fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress)
}
class OneDriveTransferTaskExecutor(
threadNum: Int,
val callback: OneDriveTransferCallback,
val taskQueue: PriorityBlockingQueue<OneDriveTransferTask>
) : ThreadPoolExecutor(threadNum, threadNum, 0, TimeUnit.SECONDS, ArrayBlockingQueue(50)) {
private val logger = KotlinLogging.logger { }
val threadStatusMap = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
init {
for (i in 0 until threadNum) {
submit(createWorker(i))
}
}
private fun createWorker(id: Int): Runnable = Runnable {
while (Thread.interrupted()) {
val task = taskQueue.take()
val progress = OneDriveTransferWorkerProgress(task)
threadStatusMap[id] = progress
try {
doTransferFile(task, progress)
callback.onTransferSuccess(task, progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务执行失败: ${e.message}" }
callback.onTransferFailure(task, progress.apply {
this.exception = e
})
} finally {
threadStatusMap.remove(id)
}
}
}
private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val file = task.bot.execute(
GetFile.builder()
.fileId(task.document.fileId)
.build()
)
val graphClient = task.service.createGraphClient(task.tgUserId) ?: throw IllegalStateException("未登录 OneDrive.")
val drive = graphClient.drives(task.onedriveId).buildRequest().get() ?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
if (file.fileSize > drive.quota!!.remaining!!) {
throw IllegalStateException("OneDrive 剩余空间不足.")
}
// TODO: 需要完善一下文件夹位置,文件名冲突处理的问题.
graphClient.drives(task.onedriveId).root().itemWithPath(task.storagePath).buildRequest().get()
val filePath = checkAndGetPath(graphClient, task.storagePath, task.document.fileName)
if (file.fileSize < 4 * 1024 * 1024) {
val fileBytes = task.bot.downloadFileAsStream(file).readAllBytes()
val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content()
.buildRequest()
.put(fileBytes)
progress.driveItem = driveItem
progress.progress.set(1.0)
} else {
val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
.createUploadSession(DriveItemCreateUploadSessionParameterSet().apply {
this.item = DriveItemUploadableProperties().apply {
this.fileSize = file.fileSize
}
})
.buildRequest()
.post() ?: throw IllegalStateException("无法创建 OneDrive 上传会话.")
val progressCallback = IProgressCallback { current, max ->
progress.progress.set(current.toDouble() / max.toDouble())
try {
callback.onProgress(progress)
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务进度回调失败: ${e.message}" }
}
}
val fileStream = task.bot.downloadFileAsStream(file)
val largeFileUploadTask = LargeFileUploadTask(
uploadSession,
graphClient,
fileStream,
file.fileSize,
DriveItem::class.java
)
val uploadResult = largeFileUploadTask.upload(4 * 1024 * 1024, null, progressCallback)
if (progress.progress.get() == 1.0 && progress.exception != null) {
progress.driveItem = uploadResult.responseBody
}
}
}
private fun checkAndGetPath(graphClient: GraphServiceClient<Request>, storagePath: String, fileName: String): String {
val path = if (storagePath.endsWith("/")) {
storagePath
} else {
"$storagePath/"
}
return "$path$fileName"
}
}
data class OneDriveTransferWorkerProgress(
val currentTask: OneDriveTransferTask,
val progress: AtomicDouble = AtomicDouble(),
var driveItem: DriveItem? = null,
var exception: Exception? = null
)
data class OneDriveTransferTask(
val tgUserId: Long,
val bot: BaseAbilityBot,
val service: OneDriveTransferService,
val document: Document,
val onedriveId: String,
val storagePath: String,
val extra: MutableMap<String, Any> = mutableMapOf(),
val createdAt: Date = Date()
)

View File

@ -1,11 +1,12 @@
package net.lamgc.scext.onedrive_transfer
import com.google.gson.Gson
import com.microsoft.aad.msal4j.*
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import mu.KotlinLogging
import org.jetbrains.exposed.sql.Database
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.abilitybots.api.objects.Ability
import org.telegram.abilitybots.api.objects.Ability.AbilityBuilder
import org.telegram.abilitybots.api.objects.Locality
import org.telegram.abilitybots.api.objects.Privacy
import org.telegram.abilitybots.api.objects.Reply
@ -18,37 +19,33 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
private val logger = KotlinLogging.logger { }
private val config: ExtensionConfig
private val authClient: ConfidentialClientApplication
private val accountManager: MicrosoftAccountManager
private val onedriveService: OneDriveTransferService
init {
val configFile = File(dataFolder, "config.json")
config = loadConfiguration()
val db = Database.connect("jdbc:sqlite:${File(dataFolder, "./data.db").canonicalPath}", "org.sqlite.JDBC")
onedriveService = OneDriveTransferService(bot, config, db)
}
fun loadConfiguration(): ExtensionConfig {
val configFile = File(this.dataFolder, "config.json")
val objectMapper = ObjectMapper().registerKotlinModule()
if (!configFile.exists()) {
configFile.createNewFile()
configFile.writeText("{}")
objectMapper.writeValue(configFile, ExtensionConfig())
return ExtensionConfig()
}
config = Gson().fromJson(configFile.reader(), ExtensionConfig::class.java)
val db = Database.connect("jdbc:sqlite:${File(dataFolder, "./data.db").canonicalPath}", "org.sqlite.JDBC")
authClient = ConfidentialClientApplication.builder(
config.clientId,
ClientCredentialFactory.createFromSecret(config.clientSecret),
)
.authority(MicrosoftAccountManager.AUTHORITY)
.setTokenCacheAccessAspect(DatabaseTokenCache(db))
.build()
accountManager = MicrosoftAccountManager(authClient, db)
onedriveService = OneDriveTransferService(config, accountManager, authClient)
return objectMapper.readValue(configFile, ExtensionConfig::class.java)
}
fun loginOneDrive(): Ability = Ability
.builder()
.name("odt_login")
.named("login")
.info("登录 OneDrive 账户.")
.locality(Locality.USER)
.privacy(Privacy.PUBLIC)
.action { ctx ->
val url = accountManager.createAuthorizationRequest(ctx.user().id)
val url = onedriveService.createLoginUrl(ctx.chatId())
ctx.bot().silent().send("""
请使用以下链接进行登录
$url
@ -59,9 +56,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
}
.reply(Reply.of(
{bot, upd ->
val token = MicrosoftAccountManager.getTokenFromUrl(URL(upd.message.text.trim()))
try {
val account = accountManager.updateAccount(upd.message.chat.id, token)
val account = onedriveService.updateAccount(upd.message.chat.id, URL(upd.message.text.trim()))
bot.silent().send("""
登录成功
Microsoft 账号${account.userName}
@ -77,9 +73,10 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
return@of false
}
try {
URL(upd.message.text)
URL(upd.message.text.trim())
return@of true
} catch (e: Exception) {
bot.silent().send("链接格式错误,请重新发送。", upd.message.chatId)
return@of false
}
}
@ -87,14 +84,45 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
.build()
fun status(): Ability = Ability.builder()
.name("")
.named("my")
.info("查看当前 OneDrive 设定信息.")
.locality(Locality.USER)
.privacy(Privacy.PUBLIC)
.action {
val account = onedriveService.accountManager.getAccountByTgUserId(it.chatId())
if (account == null) {
it.bot().silent().send("当前账户未登录 OneDrive.", it.chatId())
return@action
}
it.bot().silent().send("""
当前账户已登录 OneDrive.
Microsoft 账号${account.userName}
""".trimIndent(), it.chatId())
}
.build()
private fun setCommandName(name: String): String {
if (config.useCommandPrefix) {
return "odt_$name"
fun selectDrive(): Ability = Ability.builder()
.named("select_drive")
.info("选择 OneDrive 驱动器.")
.locality(Locality.USER)
.privacy(Privacy.PUBLIC)
.action {
val currentDrive = onedriveService.getCurrentDrive(it.chatId())
val drives = onedriveService.listDriversByUserId(it.chatId())
if (drives.isEmpty()) {
it.bot().silent().send("当前账户没有 OneDrive 驱动器.", it.chatId())
return@action
}
}
.build()
private fun AbilityBuilder.named(name: String): AbilityBuilder {
return if (config.useCommandPrefix) {
name("odt_$name")
} else {
name(name)
}
return name
}
}

View File

@ -9,16 +9,16 @@ import java.net.URL
import java.net.URI
import java.util.concurrent.ExecutionException
class MicrosoftAccountManager(private val authClient: ConfidentialClientApplication, private val db: Database) {
class OneDriveTransferSettingManager(private val authClient: ConfidentialClientApplication, private val db: Database) {
init {
TransactionManager.manager.defaultIsolationLevel = Connection.TRANSACTION_SERIALIZABLE
}
fun getMicrosoftByTelegramUser(userId: Long): MicrosoftAccount? {
fun getTransferSetting(userId: Long): OneDriveTransferSetting? {
return transaction(db) {
return@transaction MicrosoftAccount.find { MicrosoftAccounts.telegramUserId eq userId }.firstOrNull()
return@transaction OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
}
}
@ -27,45 +27,41 @@ class MicrosoftAccountManager(private val authClient: ConfidentialClientApplicat
.builder("http://localhost:45678/", OAUTH2_SCOPE)
.responseMode(ResponseMode.QUERY)
.prompt(Prompt.SELECT_ACCOUNT)
.state(userId.toString())
.build()
return authClient.getAuthorizationRequestUrl(parameters)
}
fun updateAccount(userId: Long, token: String): MicrosoftAccount {
fun updateAccount(userId: Long, redirectUrl: URL, checkUserId: Boolean = true): OneDriveTransferSetting {
val queries = redirectUrl.getQueryMap()
if (checkUserId && userId != queries["state"]?.toLong()) {
throw IllegalArgumentException("State 不等于 userId.")
}
val future = authClient.acquireToken(
AuthorizationCodeParameters
.builder(token, URI.create("http://localhost:45678/"))
.builder(queries["code"], URI.create("http://localhost:45678/"))
.build()
)
val result = future.get()
return transaction(db) {
val account = MicrosoftAccount.find { MicrosoftAccounts.telegramUserId eq userId }.firstOrNull()
val account = OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
account?.apply {
accountId = result.account().homeAccountId()
userName = result.account().username()
}
?: MicrosoftAccount.new {
?: OneDriveTransferSetting.new {
telegramUserId = userId
accountId = result.account().homeAccountId()
userName = result.account().username()
driveId = ""
storagePath = "Telegram Files/"
}
}
}
fun getAccountByTgUserId(userId: Long): MicrosoftAccount? {
fun getAccountByTgUserId(userId: Long): OneDriveTransferSetting? {
return transaction(db) {
MicrosoftAccount.find { MicrosoftAccounts.telegramUserId eq userId }.firstOrNull()
}
}
fun getAccessToken(account: MicrosoftAccount): IAuthenticationResult? {
val iAccount = authClient.accounts.get().find { it.homeAccountId() == account.accountId }
val silentParameters = SilentParameters.builder(OAUTH2_SCOPE, iAccount).build()
val future = authClient.acquireTokenSilently(silentParameters)
try {
return future.get()
} catch (e: ExecutionException) {
throw e.cause ?: e
OneDriveTransferSetting.find { OneDriveTransferSettings.telegramUserId eq userId }.firstOrNull()
}
}
@ -77,14 +73,9 @@ class MicrosoftAccountManager(private val authClient: ConfidentialClientApplicat
"Files.ReadWrite",
"Files.Read.All",
"Files.ReadWrite.All",
"Sites.Read.All",
"Sites.ReadWrite.All",
"offline_access"
)
fun getTokenFromUrl(url: URL): String {
return url.query.split("&").find { it.startsWith("code=") }?.substring(5) ?: throw IllegalArgumentException(
"Invalid URL."
)
}
}
}
}

View File

@ -1,13 +1,101 @@
package net.lamgc.scext.onedrive_transfer
import com.microsoft.aad.msal4j.ClientCredentialFactory
import com.microsoft.aad.msal4j.ConfidentialClientApplication
import com.microsoft.aad.msal4j.IAccount
import com.microsoft.aad.msal4j.SilentParameters
import com.microsoft.graph.authentication.IAuthenticationProvider
import com.microsoft.graph.httpcore.HttpClients
import com.microsoft.graph.models.Drive
import com.microsoft.graph.requests.GraphServiceClient
import okhttp3.Request
import org.jetbrains.exposed.sql.Database
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.objects.Document
import java.net.URL
import java.util.concurrent.CompletableFuture
class OneDriveTransferService(
private val bot: BaseAbilityBot,
private val config: ExtensionConfig,
private val accountManager: MicrosoftAccountManager,
private val authClient: ConfidentialClientApplication
private val db: Database
) {
val accountManager: OneDriveTransferSettingManager
val authClient: ConfidentialClientApplication = ConfidentialClientApplication.builder(
config.clientId,
ClientCredentialFactory.createFromSecret(config.clientSecret),
)
.authority(OneDriveTransferSettingManager.AUTHORITY)
.setTokenCacheAccessAspect(DatabaseTokenCache(db))
.build()
init {
accountManager = OneDriveTransferSettingManager(authClient, db)
}
}
fun createGraphClient(userId: Long): GraphServiceClient<Request>? {
val cache = THREAD_CURRENT_GRAPH_CLIENT.get()
if (cache?.tgUserId == userId) {
return cache.client
}
val serviceClient = accountManager.getTransferSetting(userId)?.let {
authClient.accounts.get().firstOrNull { account ->
account.homeAccountId() == it.accountId
}
}?.let {
GraphServiceClient.builder()
.httpClient(HttpClients.createDefault(MsalAuthorizationProvider(authClient, it)))
.buildClient()
}
if (serviceClient == null) {
return null
}
THREAD_CURRENT_GRAPH_CLIENT.set(ClientCache(userId, serviceClient))
return serviceClient
}
fun createLoginUrl(userId: Long) =
accountManager.createAuthorizationRequest(userId)
fun updateAccount(userId: Long, redirectUrl: URL) =
accountManager.updateAccount(userId, redirectUrl)
fun listDriversByUserId(userId: Long, page: Int = 1, size: Int = 10): List<Drive> {
return createGraphClient(userId)!!.drives()
.buildRequest()
.skip(page * size)
.get()?.currentPage ?: emptyList()
}
fun getCurrentDrive(userId: Long): Drive {
val transferSetting =
accountManager.getTransferSetting(userId) ?: throw IllegalStateException("未登录 OneDrive.")
val graphClient = createGraphClient(userId) ?: throw IllegalStateException("未登录 OneDrive.")
return graphClient.drives(transferSetting.driveId)
.buildRequest()
.get() ?: throw IllegalStateException("无法获取当前 OneDrive 驱动器.")
}
fun submitUploadDocumentTask(userId: Long, document: Document) {
}
companion object {
private val THREAD_CURRENT_GRAPH_CLIENT = ThreadLocal<ClientCache>()
}
}
private data class ClientCache(
val tgUserId: Long,
val client: GraphServiceClient<Request>,
)
class MsalAuthorizationProvider(private val authClientApplication: ConfidentialClientApplication, private val iAccount: IAccount) : IAuthenticationProvider {
override fun getAuthorizationTokenAsync(requestUrl: URL): CompletableFuture<String> {
return authClientApplication.acquireTokenSilently(
SilentParameters.builder(OneDriveTransferSettingManager.OAUTH2_SCOPE, iAccount).build()
).thenApply { it.accessToken() }
}
}

13
src/main/kotlin/Utils.kt Normal file
View File

@ -0,0 +1,13 @@
package net.lamgc.scext.onedrive_transfer
import java.net.URL
import java.net.URLDecoder
fun URL.getQueryMap(): Map<String, String> {
val queryMap = mutableMapOf<String, String>()
query.split("&").forEach {
val pair = it.split("=")
queryMap[pair[0]] = URLDecoder.decode(pair[1], "UTF-8")
}
return queryMap
}