feat: 支持 Scalabot v0.8.0, 并且准备增加主从节点功能.

This commit is contained in:
LamGC 2024-12-13 09:37:43 +08:00
parent f0ea396229
commit 68a1c23469
Signed by: LamGC
GPG Key ID: 6C5AE2A913941E1D
9 changed files with 425 additions and 280 deletions

View File

@ -1,7 +1,7 @@
import java.net.URI
plugins {
kotlin("jvm") version "1.9.21"
kotlin("jvm") version "2.1.0"
`maven-publish`
}
@ -9,6 +9,7 @@ group = "net.lamgc.scext"
version = "0.1.0-SNAPSHOT"
repositories {
mavenLocal()
mavenCentral()
maven {
@ -20,7 +21,7 @@ repositories {
dependencies {
compileOnly("org.slf4j:slf4j-api:2.0.10")
compileOnly("io.github.microutils:kotlin-logging:3.0.5")
compileOnly("net.lamgc:scalabot-extension:0.6.1")
compileOnly("net.lamgc:scalabot-extension:0.8.0-1")
implementation("com.fasterxml.jackson.core:jackson-core:2.16.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
@ -39,7 +40,7 @@ dependencies {
implementation("com.zaxxer:HikariCP:5.1.0")
implementation("com.microsoft.graph:microsoft-graph:5.77.0")
implementation("com.azure:azure-identity:1.11.1")
implementation("com.azure:azure-identity:1.14.2")
testImplementation("org.jetbrains.kotlin:kotlin-test")
}
@ -51,6 +52,24 @@ kotlin {
jvmToolchain(17)
}
val fatJar = task("fatJar", type = Jar::class) {
archiveBaseName = "${project.name}-agent"
manifest {
attributes["Implementation-Title"] = "OneDrive-Transfer"
attributes["Implementation-Version"] = version
attributes["Main-Class"] = "net.lamgc.scext.onedrive_transfer.AgentMainKt"
}
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) }))
with(tasks.jar.get() as CopySpec)
}
tasks {
"jar" {
dependsOn(fatJar)
}
}
publishing {
repositories {
maven("https://git.lamgc.me/api/packages/LamGC/maven") {

View File

@ -0,0 +1,5 @@
package net.lamgc.scext.onedrive_transfer
fun main() {
}

View File

@ -0,0 +1,252 @@
package net.lamgc.scext.onedrive_transfer
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.util.concurrent.RateLimiter
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import java.util.*
@Suppress("UnstableApiUsage")
object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
private val logger = KotlinLogging.logger { }
private val progressUpdateRateLimiterMap = WeakHashMap<BaseAbilityBot, RateLimiter>()
private val progressUpdateUserRateLimiterMap = CacheBuilder.newBuilder()
.expireAfterAccess(15, java.util.concurrent.TimeUnit.MINUTES)
.softValues()
.initialCapacity(12)
.concurrencyLevel(4)
.build(object : CacheLoader<BotUserRateLimitToken, RateLimiter>() {
override fun load(key: BotUserRateLimitToken): RateLimiter {
return RateLimiter.create(3.0)
}
})
private fun BaseAbilityBot.rateLimiterPerBot(): RateLimiter {
return progressUpdateRateLimiterMap[this] ?: RateLimiter.create(25.0).also {
progressUpdateRateLimiterMap[this] = it
}
}
private fun BaseAbilityBot.rateLimiterPerUser(userId: Long): RateLimiter {
val token = BotUserRateLimitToken(this, userId)
return progressUpdateUserRateLimiterMap[token]
}
private fun BaseAbilityBot.tryAcquireRateLimit(userId: Long): Boolean {
return if (rateLimiterPerUser(userId).tryAcquire()) {
rateLimiterPerBot().tryAcquire()
} else {
false
}
}
override fun onTransferTaskCreated(task: OneDriveTransferTask) {
if (task.extra["infoMessageId"] != null) {
val deleteMessage = DeleteMessage.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.build()
try {
task.bot.telegramClient.execute(deleteMessage)
} catch (e: Exception) {
logger.debug(e) { "删除旧状态消息时出错, 忽略该异常." }
}
}
val msg = if (task.retryCount == 0) {
"""
OneDrive 中转任务已创建
正在排队中...
文件名 ${task.document.fileName}
-------------------------------------------------
#Queuing
""".trimIndent()
} else {
"""
OneDrive 中转任务已创建
正在排队中...
文件名 ${task.document.fileName}
上次错误信息${(task.extra["lastError"] as Exception?)?.message ?: "(没有错误信息)"}
重试次数${task.retryCount}
-------------------------------------------------
#Queuing
""".trimIndent()
}
val message = task.bot.telegramClient.execute(
SendMessage.builder()
.text(msg)
.chatId(task.extra["chatId"].toString().toLong())
.replyToMessageId(task.extra["messageId"].toString().toInt())
.build()
)
task.extra["infoMessageId"] = message.messageId
}
override fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务开始执行
正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件)
文件名 ${progress.currentTask.document.fileName}
-------------------------------------------------
#Starting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
正在获取文件信息...
文件名 ${progress.currentTask.document.fileName}
重试次数$retryCount / $maxRetryCount
-------------------------------------------------
重试并不等同于获取文件失败由于 Telegram Bot API 需要下载文件后
才会获取文件信息因此需要重试以等待文件下载完成
#GettingFileInfo
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGotFileInfo(
progress: OneDriveTransferWorkerProgress,
file: org.telegram.telegrambots.meta.api.objects.File
) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
已获取文件信息正在创建 OneDrive 上传会话...
文件名 ${progress.currentTask.document.fileName}
文件大小${file.fileSize / 1024} KB
-------------------------------------------------
#UploadStarting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onUploadProgress(progress: OneDriveTransferWorkerProgress) {
if (!progress.currentTask.bot.tryAcquireRateLimit(progress.currentTask.tgUserId)) {
return
}
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行中
文件名 ${progress.currentTask.document.fileName}
进度${String.format("%.3f", progress.progress.get() * 100)}%
-------------------------------------------------
#Processing
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行失败
文件名 ${task.document.fileName}
错误信息${progress.exception?.message}
重试次数${task.retryCount}
任务将会追加至队列尾部进行重试
-------------------------------------------------
#Failure #Error
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
progress.currentTask.extra["lastError"] = progress.exception
}
if (task.retryCount < 5) {
runBlocking {
delay(10000)
task.retryCount++
OneDriveTransferCenter.submitUploadTask(task)
}
}
}
override fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行成功
文件名 ${task.document.fileName}
OneDrive 文件路径${progress.driveItem?.webUrl}
------------------------------------------------
#Success
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务已取消
文件名 ${task.document.fileName}
-------------------------------------------------
#Cancelled
""".trimIndent()
)
.build().orSendMessage(task.bot, task.extra["messageId"].toString().toInt())
if (newMessage != null) {
task.extra["infoMessageId"] = newMessage.messageId
}
}
}
private data class BotUserRateLimitToken(
val bot: BaseAbilityBot,
val userId: Long
)

View File

@ -6,4 +6,11 @@ data class ExtensionConfig(
val useCommandPrefix: Boolean = true,
val maxFileSize: Long = 1024L * 1024 * 1024 * 4,
val maxTransferSize: Long = 1024L * 1024 * 1024 * 20,
val centralSetting: CentralSetting = CentralSetting()
)
data class CentralSetting(
val enable: Boolean = false,
val port: Int = 24860,
val secret: String = "",
)

View File

@ -1,12 +1,17 @@
package net.lamgc.scext.onedrive_transfer
import net.lamgc.scalabot.extension.BotExtensionCreateOptions
import net.lamgc.scalabot.extension.BotExtensionFactory
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.abilitybots.api.util.AbilityExtension
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.abilitybots.api.util.AbilityExtension
import java.io.File
class ExtensionFactory : BotExtensionFactory {
override fun createExtensionInstance(bot: BaseAbilityBot, dataFolder: File): AbilityExtension {
override fun createExtensionInstance(
bot: BaseAbilityBot,
dataFolder: File,
createOptions: BotExtensionCreateOptions
): AbilityExtension {
return OneDriveTransferExtension(bot, dataFolder)
}
}

View File

@ -9,15 +9,10 @@ import com.microsoft.graph.models.DriveItemUploadableProperties
import com.microsoft.graph.requests.GraphServiceClient
import com.microsoft.graph.tasks.IProgressCallback
import com.microsoft.graph.tasks.LargeFileUploadTask
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import okhttp3.Request
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.GetFile
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import org.telegram.telegrambots.meta.api.objects.Document
import org.telegram.telegrambots.meta.exceptions.TelegramApiException
import java.io.File
@ -28,218 +23,19 @@ import java.util.concurrent.*
object OneDriveTransferCenter {
val executor =
OneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, LinkedBlockingQueue())
val executor: OneDriveTransferTaskExecutor =
LocalOneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, LinkedBlockingQueue())
fun submitUploadTask(task: OneDriveTransferTask): Boolean = executor.submitTransferTask(task)
fun cancelUploadTask(task: OneDriveTransferTask): Boolean = executor.cancelTransferTask(task)
fun getQueueingTaskCount(): Int = executor.taskQueue.size
fun getQueueingTaskCount(): Int = executor.getQueuedTransferTasks().size
fun getProcessingTasks(): Map<Int, OneDriveTransferWorkerProgress> =
Collections.unmodifiableMap(executor.threadStatusMap)
Collections.unmodifiableMap(executor.getWorkingTasks())
fun getProcessThreadNum(): Int = executor.threadNum
}
object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
private val logger = KotlinLogging.logger { }
override fun onTransferTaskCreated(task: OneDriveTransferTask) {
if (task.extra["infoMessageId"] != null) {
val deleteMessage = DeleteMessage.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.build()
try {
task.bot.execute(deleteMessage)
} catch (e: Exception) {
logger.debug(e) { "删除旧状态消息时出错, 忽略该异常." }
}
}
val msg = if (task.retryCount == 0) {
"""
OneDrive 中转任务已创建
正在排队中...
文件名 ${task.document.fileName}
-------------------------------------------------
#Queuing
""".trimIndent()
} else {
"""
OneDrive 中转任务已创建
正在排队中...
文件名 ${task.document.fileName}
上次错误信息${(task.extra["lastError"] as Exception?)?.message ?: "(没有错误信息)"}
重试次数${task.retryCount}
-------------------------------------------------
#Queuing
""".trimIndent()
}
val message = task.bot.execute(
SendMessage.builder()
.text(msg)
.chatId(task.extra["chatId"].toString().toLong())
.replyToMessageId(task.extra["messageId"].toString().toInt())
.build()
)
task.extra["infoMessageId"] = message.messageId
}
override fun onTransferTaskStart(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务开始执行
正在获取文件信息...(需要一些时间从 Telegram 服务器下载文件)
文件名 ${progress.currentTask.document.fileName}
-------------------------------------------------
#Starting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGettingFileInfo(progress: OneDriveTransferWorkerProgress, retryCount: Int, maxRetryCount: Int) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
正在获取文件信息...
文件名 ${progress.currentTask.document.fileName}
重试次数$retryCount / $maxRetryCount
-------------------------------------------------
重试并不等同于获取文件失败由于 Telegram Bot API 需要下载文件后
才会获取文件信息因此需要重试以等待文件下载完成
#GettingFileInfo
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onGotFileInfo(
progress: OneDriveTransferWorkerProgress,
file: org.telegram.telegrambots.meta.api.objects.File
) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
已获取文件信息正在创建 OneDrive 上传会话...
文件名 ${progress.currentTask.document.fileName}
文件大小${file.fileSize / 1024} KB
-------------------------------------------------
#UploadStarting
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onUploadProgress(progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(progress.currentTask.extra["chatId"].toString().toLong())
.messageId(progress.currentTask.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行中
文件名 ${progress.currentTask.document.fileName}
进度${String.format("%.3f", progress.progress.get() * 100)}%
-------------------------------------------------
#Processing
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferFailure(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行失败
文件名 ${task.document.fileName}
错误信息${progress.exception?.message}
重试次数${task.retryCount}
任务将会追加至队列尾部进行重试
-------------------------------------------------
#Failure #Error
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
progress.currentTask.extra["lastError"] = progress.exception
}
if (task.retryCount < 5) {
runBlocking {
delay(10000)
task.retryCount++
OneDriveTransferCenter.submitUploadTask(task)
}
}
}
override fun onTransferSuccess(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务执行成功
文件名 ${task.document.fileName}
OneDrive 文件路径${progress.driveItem?.webUrl}
------------------------------------------------
#Success
""".trimIndent()
)
.build().orSendMessage(progress.currentTask.bot, progress.currentTask.extra["messageId"].toString().toInt())
if (newMessage != null) {
progress.currentTask.extra["infoMessageId"] = newMessage.messageId
}
}
override fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?) {
val newMessage = EditMessageText.builder()
.chatId(task.extra["chatId"].toString().toLong())
.messageId(task.extra["infoMessageId"].toString().toInt())
.text(
"""
OneDrive 中转任务已取消
文件名 ${task.document.fileName}
-------------------------------------------------
#Cancelled
""".trimIndent()
)
.build().orSendMessage(task.bot, task.extra["messageId"].toString().toInt())
if (newMessage != null) {
task.extra["infoMessageId"] = newMessage.messageId
}
}
fun getWorkerCount(): Int = executor.getWorkerCount()
}
@ -254,6 +50,20 @@ interface OneDriveTransferCallback {
fun onTransferCancelled(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress?)
}
interface OneDriveTransferTaskExecutor {
fun submitTransferTask(task: OneDriveTransferTask): Boolean
fun cancelTransferTask(task: OneDriveTransferTask): Boolean
fun getQueuedTransferTasks(): List<OneDriveTransferTask>
fun getWorkerCount(): Int
fun getWorkingTasks(): Map<Int, OneDriveTransferWorkerProgress>
}
/**
* OneDrive 中转任务执行器.
* @param threadNum 线程数量.
@ -261,10 +71,10 @@ interface OneDriveTransferCallback {
* @param taskQueue 任务队列.
* @param chunkSize 上传块大小的倍率, 实际上传块大小为 320 KiB * chunkSize.
*/
class OneDriveTransferTaskExecutor(
val threadNum: Int,
class LocalOneDriveTransferTaskExecutor(
private val threadNum: Int,
private val callback: OneDriveTransferCallback,
val taskQueue: BlockingQueue<OneDriveTransferTask>,
private val taskQueue: BlockingQueue<OneDriveTransferTask>,
private val chunkSize: Int = 26
) : ThreadPoolExecutor(
threadNum, threadNum, 0, TimeUnit.SECONDS,
@ -272,12 +82,12 @@ class OneDriveTransferTaskExecutor(
ThreadFactoryBuilder()
.setNameFormat("Transfer Worker %d")
.build()
) {
), OneDriveTransferTaskExecutor {
private val logger = KotlinLogging.logger { }
val threadStatusMap = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
val threadMap = ConcurrentHashMap<Int, Thread>()
private val threadStatusMap = ConcurrentHashMap<Int, OneDriveTransferWorkerProgress>()
private val threadMap = ConcurrentHashMap<Int, Thread>()
init {
if (chunkSize > MAX_CHUNK_SIZE) {
@ -289,7 +99,7 @@ class OneDriveTransferTaskExecutor(
}
}
fun submitTransferTask(task: OneDriveTransferTask): Boolean {
override fun submitTransferTask(task: OneDriveTransferTask): Boolean {
if (taskQueue.remainingCapacity() > 0) {
callback.onTransferTaskCreated(task)
return taskQueue.offer(task)
@ -297,7 +107,7 @@ class OneDriveTransferTaskExecutor(
return false
}
fun cancelTransferTask(task: OneDriveTransferTask): Boolean {
override fun cancelTransferTask(task: OneDriveTransferTask): Boolean {
if (!taskQueue.remove(task)) {
for (i in 0 until threadNum) {
if (threadStatusMap[i]?.currentTask?.id == task.id) {
@ -315,6 +125,12 @@ class OneDriveTransferTaskExecutor(
return true
}
override fun getQueuedTransferTasks(): List<OneDriveTransferTask> = taskQueue.toList()
override fun getWorkerCount(): Int = threadNum
override fun getWorkingTasks(): Map<Int, OneDriveTransferWorkerProgress> = threadStatusMap
private fun createWorker(id: Int): Runnable = Runnable {
threadMap[id] = Thread.currentThread()
logger.info { "下载线程 $id 已启动." }
@ -378,7 +194,7 @@ class OneDriveTransferTaskExecutor(
}
try {
tempFile = task.bot.execute(
tempFile = task.bot.telegramClient.execute(
GetFile.builder()
.fileId(task.document.fileId)
.build()
@ -428,7 +244,9 @@ class OneDriveTransferTaskExecutor(
val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
.createUploadSession(
DriveItemCreateUploadSessionParameterSet.newBuilder()
.withItem(DriveItemUploadableProperties())
.withItem(DriveItemUploadableProperties().apply {
fileSize = file.fileSize
})
.build()
)
.buildRequest()
@ -470,7 +288,7 @@ class OneDriveTransferTaskExecutor(
logger.debug(e) { "无法从本地获取文件:$filePath" }
}
return bot.downloadFileAsStream(filePath)
return bot.telegramClient.downloadFileAsStream(filePath)
}
private fun checkAndGetPath(
@ -588,18 +406,41 @@ class OneDriveTransferTaskExecutor(
}
companion object {
val ONCE_CHUNK_SIZE = 320 * 1024
val MAX_CHUNK_SIZE = 192
const val ONCE_CHUNK_SIZE = 320 * 1024
const val MAX_CHUNK_SIZE = 192
}
}
enum class OneDriveTransferStatus {
/**
* 正在排队.
*/
QUEUING,
/**
* 正在从 Telegram 获取文件信息.
*/
GETTING_FILE_INFO,
/**
* 正在创建上传会话.
*/
CREATING_UPLOAD_SESSION,
/**
* 上传中.
*/
UPLOADING,
/**
* 上传成功.
*/
SUCCESS,
/**
* 上传失败.
*/
FAILURE
}

View File

@ -6,13 +6,12 @@ import com.google.common.cache.CacheBuilder
import com.microsoft.aad.msal4j.MsalInteractionRequiredException
import mu.KotlinLogging
import org.jetbrains.exposed.sql.Database
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.abilitybots.api.objects.Ability
import org.telegram.abilitybots.api.objects.Ability.AbilityBuilder
import org.telegram.abilitybots.api.objects.Locality
import org.telegram.abilitybots.api.objects.Privacy
import org.telegram.abilitybots.api.objects.Reply
import org.telegram.abilitybots.api.util.AbilityExtension
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.abilitybots.api.objects.Ability
import org.telegram.telegrambots.abilitybots.api.objects.Locality
import org.telegram.telegrambots.abilitybots.api.objects.Privacy
import org.telegram.telegrambots.abilitybots.api.objects.Reply
import org.telegram.telegrambots.abilitybots.api.util.AbilityExtension
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
@ -20,6 +19,7 @@ import org.telegram.telegrambots.meta.api.objects.replykeyboard.InlineKeyboardMa
import org.telegram.telegrambots.meta.api.objects.replykeyboard.ReplyKeyboardMarkup
import org.telegram.telegrambots.meta.api.objects.replykeyboard.ReplyKeyboardRemove
import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.InlineKeyboardButton
import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.InlineKeyboardRow
import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.KeyboardRow
import java.io.File
import java.net.MalformedURLException
@ -63,7 +63,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
.privacy(Privacy.PUBLIC)
.locality(Locality.USER)
.action {
it.bot().silent().send(
it.bot().silent.send(
"""
OneDrive Transfer 是一个 Telegram 机器人
可以将 Telegram 中的文件上传到 OneDrive 中转
@ -97,7 +97,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
.action { ctx ->
val url = onedriveService.createLoginUrl(ctx.chatId())
actionCache.put(ctx.chatId(), "login")
ctx.bot().silent().send("""
ctx.bot().silent.send(
"""
请使用以下链接进行登录
$url
@ -111,23 +112,24 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
try {
val account = onedriveService.updateAccount(upd.message.chat.id, URL(upd.message.text.trim()))
actionCache.invalidate(upd.message.chatId)
bot.silent().send("""
bot.silent.send(
"""
登录成功
Microsoft 账号${account.userName}
请使用 /select_drive 选择 OneDrive 驱动器以及设置上传路径
""".trimIndent(), upd.message.chatId)
} catch (e: MsalInteractionRequiredException) {
if (e.errorCode() == "AADSTS54005") {
bot.silent().send("登录失败,登录链接已过期,请重新登录。", upd.message.chatId)
bot.silent.send("登录失败,登录链接已过期,请重新登录。", upd.message.chatId)
} else {
bot.silent().send("登录失败,错误代码:${e.errorCode()}", upd.message.chatId)
bot.silent.send("登录失败,错误代码:${e.errorCode()}", upd.message.chatId)
}
actionCache.invalidate(upd.message.chatId)
} catch (e: MalformedURLException) {
bot.silent().send("链接格式错误,请发送正确的登录链接。", upd.message.chatId)
bot.silent.send("链接格式错误,请发送正确的登录链接。", upd.message.chatId)
} catch (e: Exception) {
logger.error(e) { "处理 Oauth2 令牌时发生错误." }
bot.silent().send("处理 Oauth2 令牌时发生错误,请稍后重试。", upd.message.chatId)
bot.silent.send("处理 Oauth2 令牌时发生错误,请稍后重试。", upd.message.chatId)
actionCache.invalidate(upd.message.chatId)
}
},
@ -144,10 +146,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
.action {
val account = onedriveService.accountManager.getAccountByTgUserId(it.chatId())
if (account == null) {
it.bot().silent().send("当前账户未登录 OneDrive.", it.chatId())
it.bot().silent.send("当前账户未登录 OneDrive.", it.chatId())
return@action
}
it.bot().silent().send("""
it.bot().silent.send(
"""
当前账户已登录 OneDrive.
Microsoft 账号 ${account.userName}
""".trimIndent(), it.chatId())
@ -163,14 +166,14 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
val currentDrive = try {
onedriveService.getCurrentDrive(it.chatId())
} catch (e: OneDriveNotLoginException) {
it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId())
it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId())
return@action
} catch (e: Exception) {
null
}
val drives = onedriveService.listDriversByUserId(it.chatId())
if (drives.isEmpty()) {
it.bot().silent().send("当前账户没有 OneDrive 驱动器.", it.chatId())
it.bot().silent.send("当前账户没有 OneDrive 驱动器.", it.chatId())
return@action
}
@ -186,30 +189,39 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
.text(msgContent)
.replyMarkup(InlineKeyboardMarkup.builder().apply {
for (drive in drives) {
keyboardRow(listOf(InlineKeyboardButton.builder()
keyboardRow(
InlineKeyboardRow(
InlineKeyboardButton.builder()
.text("[${drive.driveType}] ${drive.name}")
.callbackData(buttonTokenCache.putToken("${drive.id}", tokenPrefix = "select_drive:"))
.build()))
.callbackData(
buttonTokenCache.putToken(
"${drive.id}",
tokenPrefix = "select_drive:"
)
)
.build()
)
)
}
}.build())
.build().let { msg ->
it.bot().execute(msg)
it.bot().silent.execute(msg)
}
}
.reply({ bot, upd ->
val driveId = buttonTokenCache.getIfPresent(upd.callbackQuery.data.substringAfter(':'))
if (driveId == null) {
bot.silent().send("无效的驱动器 ID.", upd.callbackQuery.message.chatId)
bot.silent.send("无效的驱动器 ID.", upd.callbackQuery.message.chatId)
return@reply
}
val drive = try {
onedriveService.listDriversByUserId(upd.callbackQuery.from.id).firstOrNull { it.id == driveId }
} catch (e: OneDriveNotLoginException) {
bot.silent().send("当前账户没有登录 OneDrive.", upd.callbackQuery.message.chatId)
bot.silent.send("当前账户没有登录 OneDrive.", upd.callbackQuery.message.chatId)
return@reply
}
if (drive == null) {
bot.silent().send("无效的驱动器 ID.", upd.callbackQuery.message.chatId)
bot.silent.send("无效的驱动器 ID.", upd.callbackQuery.message.chatId)
return@reply
}
onedriveService.setDrive(upd.callbackQuery.from.id, driveId)
@ -228,7 +240,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
""".trimIndent()
)
.build()
bot.execute(editMessageText)
bot.silent.execute(editMessageText)
}, { upd ->
upd.hasCallbackQuery() && upd.callbackQuery.data != null && upd.callbackQuery.data.startsWith("select_drive:")
})
@ -243,16 +255,16 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
val currentDrive = try {
onedriveService.getCurrentDrive(it.chatId())
} catch (e: OneDriveNotLoginException) {
it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId())
it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId())
return@action
}
if (currentDrive == null) {
it.bot().silent().send("当前账户没有选择 OneDrive 驱动器.", it.chatId())
it.bot().silent.send("当前账户没有选择 OneDrive 驱动器.", it.chatId())
return@action
}
val transferSetting = onedriveService.accountManager.getTransferSetting(it.chatId())
if (transferSetting == null) {
it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId())
it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId())
return@action
}
val msgContent = """
@ -279,14 +291,15 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
this.isPersistent(false)
}.build())
.build().let { msg ->
it.bot().execute(msg)
it.bot().silent.execute(msg)
}
}
.reply({ bot, upd ->
val path = upd.message.text.trim()
if (path == "/cancel") {
actionCache.invalidate(upd.message.chatId)
bot.execute(SendMessage.builder()
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("已取消设置.")
.replyMarkup(ReplyKeyboardRemove.builder()
@ -298,7 +311,8 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
val transferSetting = onedriveService.accountManager.getTransferSetting(upd.message.chatId)
if (transferSetting == null) {
actionCache.invalidate(upd.message.chatId)
bot.execute(SendMessage.builder()
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("当前账户没有登录 OneDrive.")
.replyMarkup(ReplyKeyboardRemove.builder()
@ -311,11 +325,13 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
transferSetting.storagePath = path
}
actionCache.invalidate(upd.message.chatId)
bot.execute(DeleteMessage.builder()
bot.silent.execute(
DeleteMessage.builder()
.chatId(upd.message.chatId.toString())
.messageId(upd.message.messageId)
.build())
bot.execute(SendMessage.builder()
bot.silent.execute(
SendMessage.builder()
.chatId(upd.message.chatId.toString())
.text("已设置上传路径为:$path")
.replyMarkup(ReplyKeyboardRemove.builder()
@ -335,13 +351,13 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
.action {
val transferSetting = onedriveService.accountManager.getTransferSetting(it.chatId())
if (transferSetting == null) {
it.bot().silent().send("当前账户没有登录 OneDrive.", it.chatId())
it.bot().silent.send("当前账户没有登录 OneDrive.", it.chatId())
return@action
}
onedriveService.accountManager.doSomething {
transferSetting.delete()
}
it.bot().silent().send("已登出 OneDrive.", it.chatId())
it.bot().silent.send("已登出 OneDrive.", it.chatId())
}
.build()
@ -349,11 +365,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
{ bot, upd ->
val document = upd.message.document
if (document == null) {
bot.silent().send("请发送文件.", upd.message.chatId)
bot.silent.send("请发送文件.", upd.message.chatId)
return@of
}
if (document.fileSize > config.maxFileSize) {
bot.silent().send("文件大小超过限制.", upd.message.chatId)
bot.silent.send("文件大小超过限制.", upd.message.chatId)
return@of
}
onedriveService.submitUploadDocumentTask(upd.message.chatId, upd.message.messageId, document)
@ -379,11 +395,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
val msg = StringBuilder(
"""
当前排队任务数${OneDriveTransferCenter.getQueueingTaskCount()}
当前处理任务数${processingTasks.size} / ${OneDriveTransferCenter.executor.threadNum}
当前处理任务数${processingTasks.size} / ${OneDriveTransferCenter.getWorkerCount()}
--------------------------------------------------------------
""".trimIndent()
).append('\n')
for (workerId in 0..<OneDriveTransferCenter.executor.threadNum) {
for (workerId in 0..<OneDriveTransferCenter.getWorkerCount()) {
val task = processingTasks[workerId]
if (task == null) {
msg.append("Worker $workerId: 空闲\n")
@ -398,7 +414,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
)
}
}
it.bot().silent().send(msg.toString(), it.chatId())
it.bot().silent.send(msg.toString(), it.chatId())
}
.build()
@ -409,7 +425,7 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
.locality(Locality.USER)
.action {
if (!it.update().message.isReply) {
it.bot().silent().send("请回复要取消的消息.", it.chatId())
it.bot().silent.send("请回复要取消的消息.", it.chatId())
return@action
}
@ -425,11 +441,11 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
.text(msg)
.replyToMessageId(it.update().message.messageId)
.build()
it.bot().execute(sendMessage)
it.bot().silent.execute(sendMessage)
}
.build()
private fun AbilityBuilder.named(name: String): AbilityBuilder {
private fun Ability.AbilityBuilder.named(name: String): Ability.AbilityBuilder {
return if (config.useCommandPrefix) {
name("odt_$name")
} else {

View File

@ -13,7 +13,7 @@ import okhttp3.Request
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.transactions.transaction
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.objects.Document
import java.net.URL
import java.util.concurrent.CompletableFuture

View File

@ -1,11 +1,11 @@
package net.lamgc.scext.onedrive_transfer
import com.google.common.cache.Cache
import org.telegram.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.abilitybots.api.bot.BaseAbilityBot
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
import org.telegram.telegrambots.meta.api.objects.Message
import org.telegram.telegrambots.meta.api.objects.message.Message
import org.telegram.telegrambots.meta.exceptions.TelegramApiRequestException
import java.net.URL
import java.net.URLDecoder
@ -36,13 +36,13 @@ fun Cache<String, String>.putToken(value: String, tokenLength: Int = 16, tokenPr
fun EditMessageText.orSendMessage(bot: BaseAbilityBot, replyMessageId: Int, tryRemove: Boolean = true): Message? {
try {
bot.execute(this)
bot.telegramClient.execute(this)
return null
} catch (e: TelegramApiRequestException) {
if (e.errorCode != 400 || e.apiResponse != "message can't be edited") {
throw e
}
return bot.execute(
return bot.telegramClient.execute(
SendMessage.builder()
.replyToMessageId(replyMessageId)
.chatId(chatId.toString())
@ -52,7 +52,7 @@ fun EditMessageText.orSendMessage(bot: BaseAbilityBot, replyMessageId: Int, tryR
} finally {
if (tryRemove) {
try {
bot.execute(
bot.telegramClient.execute(
DeleteMessage.builder()
.messageId(messageId)
.build()