feat: 增加管理员状态指令.

This commit is contained in:
LamGC 2024-01-12 23:56:02 +08:00
parent 095394ce62
commit 4429997989
Signed by: LamGC
GPG Key ID: 6C5AE2A913941E1D
2 changed files with 66 additions and 6 deletions

View File

@ -25,13 +25,20 @@ import java.util.concurrent.*
object OneDriveTransferCenter {
private val executor =
val executor =
OneDriveTransferTaskExecutor(1, DefaultOneDriveTransferCallback, ArrayBlockingQueue(100))
fun submitUploadTask(task: OneDriveTransferTask) {
executor.submitTransferTask(task)
}
fun getQueueingTaskCount(): Int = executor.taskQueue.size
fun getProcessingTasks(): Map<Int, OneDriveTransferWorkerProgress> =
Collections.unmodifiableMap(executor.threadStatusMap)
fun getProcessThreadNum(): Int = executor.threadNum
}
object DefaultOneDriveTransferCallback : OneDriveTransferCallback {
@ -194,10 +201,10 @@ interface OneDriveTransferCallback {
* @param chunkSize 上传块大小的倍率, 实际上传块大小为 320 KiB * chunkSize.
*/
class OneDriveTransferTaskExecutor(
threadNum: Int,
val threadNum: Int,
private val callback: OneDriveTransferCallback,
private val taskQueue: BlockingQueue<OneDriveTransferTask>,
private val chunkSize: Int = 96
val taskQueue: BlockingQueue<OneDriveTransferTask>,
private val chunkSize: Int = 26
) : ThreadPoolExecutor(
threadNum, threadNum, 0, TimeUnit.SECONDS,
ArrayBlockingQueue(50),
@ -248,6 +255,7 @@ class OneDriveTransferTaskExecutor(
}
} catch (e: Exception) {
logger.warn(e) { "OneDrive 中转任务执行失败: ${e.message}" }
progress.status = OneDriveTransferStatus.FAILURE
callback.onTransferFailure(task, progress.apply {
this.exception = e
})
@ -261,6 +269,7 @@ class OneDriveTransferTaskExecutor(
private fun doTransferFile(task: OneDriveTransferTask, progress: OneDriveTransferWorkerProgress) {
var tempFile: org.telegram.telegrambots.meta.api.objects.File
logger.debug { "开始获取文件信息..." }
progress.status = OneDriveTransferStatus.GETTING_FILE_INFO
var retryCount = 0
val maxRetryCount = 100
while (true) {
@ -296,6 +305,8 @@ class OneDriveTransferTaskExecutor(
logger.warn(e) { "OneDrive 中转任务成功获取文件信息回调失败: ${e.message}" }
}
progress.status = OneDriveTransferStatus.CREATING_UPLOAD_SESSION
val graphClient = task.service.createGraphClient(task.tgUserId)
val drive = graphClient.drives(task.onedriveId).buildRequest().get()
?: throw IllegalStateException("无法获取 OneDrive 驱动器.")
@ -308,11 +319,13 @@ class OneDriveTransferTaskExecutor(
if (file.fileSize < 4 * 1024 * 1024) {
val fileBytes = getFileStream(task.bot, file.filePath).readAllBytes()
progress.status = OneDriveTransferStatus.UPLOADING
val driveItem = graphClient.drives(task.onedriveId).root().itemWithPath(filePath).content()
.buildRequest()
.put(fileBytes)
progress.driveItem = driveItem
progress.progress.set(1.0)
progress.status = OneDriveTransferStatus.SUCCESS
} else {
val uploadSession = graphClient.drives(task.onedriveId).root().itemWithPath(filePath)
.createUploadSession(
@ -340,8 +353,11 @@ class OneDriveTransferTaskExecutor(
file.fileSize,
DriveItem::class.java
)
progress.status = OneDriveTransferStatus.UPLOADING
val uploadResult = largeFileUploadTask.upload(ONCE_CHUNK_SIZE * chunkSize, null, progressCallback)
progress.driveItem = uploadResult.responseBody
progress.progress.set(1.0)
progress.status = OneDriveTransferStatus.SUCCESS
}
}
@ -480,11 +496,21 @@ class OneDriveTransferTaskExecutor(
}
enum class OneDriveTransferStatus {
QUEUING,
GETTING_FILE_INFO,
CREATING_UPLOAD_SESSION,
UPLOADING,
SUCCESS,
FAILURE
}
data class OneDriveTransferWorkerProgress(
val currentTask: OneDriveTransferTask,
val progress: AtomicDouble = AtomicDouble(),
var driveItem: DriveItem? = null,
var exception: Exception? = null
var exception: Exception? = null,
var status: OneDriveTransferStatus = OneDriveTransferStatus.QUEUING
)
data class OneDriveTransferTask(
@ -495,5 +521,6 @@ data class OneDriveTransferTask(
val onedriveId: String,
val storagePath: String,
val extra: MutableMap<String, Any> = mutableMapOf(),
val createdAt: Date = Date()
val createdAt: Date = Date(),
var retryCount: Int = 0
)

View File

@ -369,6 +369,39 @@ class OneDriveTransferExtension(val bot: BaseAbilityBot, val dataFolder: File) :
}
)
fun adminStatus(): Ability = Ability.builder()
.named("status")
.info("查看当前中转状态.(仅限管理员)")
.privacy(Privacy.CREATOR)
.locality(Locality.USER)
.action {
val processingTasks = OneDriveTransferCenter.getProcessingTasks()
val msg = StringBuilder(
"""
当前排队任务数${OneDriveTransferCenter.getQueueingTaskCount()}
当前处理任务数${processingTasks.size} / ${OneDriveTransferCenter.executor.threadNum}
--------------------------------------------------------------
""".trimIndent()
).append('\n')
for (workerId in 0..<OneDriveTransferCenter.executor.threadNum) {
val task = processingTasks[workerId]
if (task == null) {
msg.append("Worker $workerId: 空闲\n")
} else {
msg.append(
"Worker $workerId: [${task.status}] ${
String.format(
"%.3f",
task.progress.get()
)
}% ${task.currentTask.document.fileSize} Bytes\n"
)
}
}
it.bot().silent().send(msg.toString(), it.chatId())
}
.build()
private fun AbilityBuilder.named(name: String): AbilityBuilder {
return if (config.useCommandPrefix) {
name("odt_$name")