From 55edfd771ec9a2323326fdb66d3579daa8405f9c Mon Sep 17 00:00:00 2001 From: SikongJueluo Date: Sun, 17 Aug 2025 13:33:11 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E8=BF=9B=E5=BA=A6?= =?UTF-8?q?=E6=9D=A1=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/Program.cs | 5 +- server/src/Controllers/JtagController.cs | 21 +- server/src/Hubs/ProgressHub.cs | 50 ++- server/src/MsgBus.cs | 30 +- server/src/Peripherals/JtagClient.cs | 62 ++-- server/src/Services/ProgressTracker.cs | 147 ++++++++ server/src/Services/ProgressTrackerService.cs | 294 ---------------- server/src/UdpClientPool.cs | 60 +--- src/components/UploadCard.vue | 317 ++++++++---------- src/stores/progress.ts | 83 +++++ .../signalR/TypedSignalR.Client/index.ts | 8 + .../TypedSignalR.Client/server.Hubs.ts | 10 + src/utils/signalR/server.Hubs.ts | 9 +- 13 files changed, 512 insertions(+), 584 deletions(-) create mode 100644 server/src/Services/ProgressTracker.cs delete mode 100644 server/src/Services/ProgressTrackerService.cs create mode 100644 src/stores/progress.ts diff --git a/server/Program.cs b/server/Program.cs index 211c2b3..682401e 100644 --- a/server/Program.cs +++ b/server/Program.cs @@ -180,8 +180,7 @@ try builder.Services.AddHostedService(provider => provider.GetRequiredService()); // 添加进度跟踪服务 - builder.Services.AddSingleton(); - builder.Services.AddHostedService(provider => provider.GetRequiredService()); + builder.Services.AddSingleton(); // Application Settings var app = builder.Build(); @@ -258,6 +257,8 @@ try // Setup Program MsgBus.Init(); + var progressTracker = app.Services.GetRequiredService(); + MsgBus.SetProgressTracker(progressTracker); // Generate API Client app.MapGet("GetAPIClientCode", async (HttpContext context) => diff --git a/server/src/Controllers/JtagController.cs b/server/src/Controllers/JtagController.cs index c8e071e..717f366 100644 --- a/server/src/Controllers/JtagController.cs +++ b/server/src/Controllers/JtagController.cs @@ -16,17 +16,12 @@ public class JtagController : ControllerBase { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); - private readonly ProgressTrackerService _tracker; + private readonly ProgressTracker _tracker = MsgBus.ProgressTracker; private readonly UserManager _userManager = new(); private readonly ResourceManager _resourceManager = new(); private const string BITSTREAM_PATH = "bitstream/Jtag"; - public JtagController(ProgressTrackerService tracker) - { - _tracker = tracker; - } - /// /// 控制器首页信息 /// @@ -188,8 +183,8 @@ public class JtagController : ControllerBase logger.Info($"User {username} processing bitstream file of size: {fileBytes.Length} bytes"); // 定义进度跟踪 - var (taskId, progress) = _tracker.CreateTask(cancelToken); - progress.Report(10); + var taskId = _tracker.CreateTask(1000); + _tracker.AdvanceProgress(taskId, 10); _ = Task.Run(async () => { @@ -210,7 +205,8 @@ public class JtagController : ControllerBase if (!retBuffer.IsSuccessful) { logger.Error($"User {username} failed to reverse bytes: {retBuffer.Error}"); - progress.Error($"User {username} failed to reverse bytes: {retBuffer.Error}"); + _tracker.FailProgress(taskId, + $"User {username} failed to reverse bytes: {retBuffer.Error}"); return; } revBuffer = retBuffer.Value; @@ -228,7 +224,7 @@ public class JtagController : ControllerBase var processedBytes = outputStream.ToArray(); logger.Info($"User {username} processed {totalBytesProcessed} bytes for device {address}"); - progress.Report(20); + _tracker.AdvanceProgress(taskId, 20); // 下载比特流 var jtagCtrl = new Peripherals.JtagClient.Jtag(address, port); @@ -237,12 +233,13 @@ public class JtagController : ControllerBase if (ret.IsSuccessful) { logger.Info($"User {username} successfully downloaded bitstream '{resource.ResourceName}' to device {address}"); - progress.Finish(); + _tracker.CompleteProgress(taskId); } else { logger.Error($"User {username} failed to download bitstream to device {address}: {ret.Error}"); - progress.Error($"User {username} failed to download bitstream to device {address}: {ret.Error}"); + _tracker.FailProgress(taskId, + $"User {username} failed to download bitstream to device {address}: {ret.Error}"); } } }); diff --git a/server/src/Hubs/ProgressHub.cs b/server/src/Hubs/ProgressHub.cs index 068b919..3f153d1 100644 --- a/server/src/Hubs/ProgressHub.cs +++ b/server/src/Hubs/ProgressHub.cs @@ -1,17 +1,20 @@ using Microsoft.AspNetCore.Authorization; -using System.Security.Claims; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.Cors; using TypedSignalR.Client; using Tapper; using server.Services; +#pragma warning disable 1998 + namespace server.Hubs; [Hub] public interface IProgressHub { Task Join(string taskId); + Task Leave(string taskId); + Task GetProgress(string taskId); } [Receiver] @@ -23,8 +26,7 @@ public interface IProgressReceiver [TranspilationSource] public enum ProgressStatus { - Pending, - InProgress, + Running, Completed, Canceled, Failed @@ -33,10 +35,10 @@ public enum ProgressStatus [TranspilationSource] public class ProgressInfo { - public virtual string TaskId { get; } = string.Empty; - public virtual ProgressStatus Status { get; } - public virtual int ProgressPercent { get; } = 0; - public virtual string ErrorMessage { get; } = string.Empty; + public required string TaskId { get; set; } + public required ProgressStatus Status { get; set; } + public required int ProgressPercent { get; set; } + public required string ErrorMessage { get; set; } }; [Authorize] @@ -44,18 +46,32 @@ public class ProgressInfo public class ProgressHub : Hub, IProgressHub { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); - - private readonly IHubContext _hubContext; - private readonly ProgressTrackerService _tracker; - - public ProgressHub(IHubContext hubContext, ProgressTrackerService tracker) - { - _hubContext = hubContext; - _tracker = tracker; - } + private readonly ProgressTracker _progressTracker = MsgBus.ProgressTracker; public async Task Join(string taskId) { - return await Task.Run(() => _tracker.BindTask(taskId, Context.ConnectionId)); + await Groups.AddToGroupAsync(Context.ConnectionId, taskId); + + // 发送当前状态(如果存在) + var task = _progressTracker.GetTask(taskId); + if (task != null) + { + await Clients.Caller.OnReceiveProgress(task.Value.ToProgressInfo()); + } + + logger.Info($"Client {Context.ConnectionId} joined task {taskId}"); + return true; + } + + public async Task Leave(string taskId) + { + await Groups.RemoveFromGroupAsync(Context.ConnectionId, taskId); + logger.Info($"Client {Context.ConnectionId} left task {taskId}"); + return true; + } + + public async Task GetProgress(string taskId) + { + return _progressTracker.GetTask(taskId)?.ToProgressInfo(); } } diff --git a/server/src/MsgBus.cs b/server/src/MsgBus.cs index 5a93abc..18527ab 100644 --- a/server/src/MsgBus.cs +++ b/server/src/MsgBus.cs @@ -1,7 +1,8 @@ +using server.Services; /// /// 多线程通信总线 /// -public static class MsgBus +public sealed class MsgBus { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); @@ -11,12 +12,39 @@ public static class MsgBus /// public static UDPServer UDPServer { get { return udpServer; } } + // 添加静态ProgressTracker引用 + private static ProgressTracker? _progressTracker; + + /// + /// 设置全局ProgressTracker实例 + /// + public static void SetProgressTracker(ProgressTracker progressTracker) + { + _progressTracker = progressTracker; + } + + public static ProgressTracker ProgressTracker + { + get + { + if (_progressTracker == null) + { + throw new InvalidOperationException("ProgressTracker is not set."); + } + return _progressTracker; + } + } + private static bool isRunning = false; /// /// 获取通信总线运行状态 /// public static bool IsRunning { get { return isRunning; } } + private MsgBus() { } + + static MsgBus() { } + /// /// 通信总线初始化 /// diff --git a/server/src/Peripherals/JtagClient.cs b/server/src/Peripherals/JtagClient.cs index 11f43fd..0fcafac 100644 --- a/server/src/Peripherals/JtagClient.cs +++ b/server/src/Peripherals/JtagClient.cs @@ -380,6 +380,7 @@ public class JtagStatusReg public class Jtag { private static readonly NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); + private readonly ProgressTracker? _progressTracker; private const int CLOCK_FREQ = 50; // MHz @@ -392,18 +393,21 @@ public class Jtag public readonly string address; private IPEndPoint ep; + /// /// Jtag 构造函数 /// /// 目标 IP 地址 /// 目标 UDP 端口 /// 超时时间(毫秒) - public Jtag(string address, int port, int timeout = 2000) + /// 进度追踪器 + public Jtag(string address, int port, int timeout = 2000, ProgressTracker? progressTracker = null) { this.address = address; this.port = port; this.ep = new IPEndPoint(IPAddress.Parse(address), port); this.timeout = timeout; + this._progressTracker = progressTracker; } async ValueTask> ReadFIFO(uint devAddr) @@ -444,10 +448,10 @@ public class Jtag async ValueTask> WriteFIFO( UInt32 devAddr, UInt32 data, UInt32 result, - UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, ProgressReporter? progress = null) + UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, string progressId = "") { { - var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progress?.CreateChild(80)); + var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progressId); if (!ret.IsSuccessful) return new(ret.Error); if (!ret.Value) return new(new Exception("Write FIFO failed")); } @@ -458,17 +462,17 @@ public class Jtag { var ret = await UDPClientPool.ReadAddrWithWait(this.ep, 0, JtagAddr.STATE, result, resultMask, 0, this.timeout); if (!ret.IsSuccessful) return new(ret.Error); - progress?.Finish(); + _progressTracker?.AdvanceProgress(progressId, 10); return ret.Value; } } async ValueTask> WriteFIFO( UInt32 devAddr, byte[] data, UInt32 result, - UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, ProgressReporter? progress = null) + UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, string progressId = "") { { - var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progress?.CreateChild(80)); + var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progressId); if (!ret.IsSuccessful) return new(ret.Error); if (!ret.Value) return new(new Exception("Write FIFO failed")); } @@ -479,7 +483,7 @@ public class Jtag { var ret = await UDPClientPool.ReadAddrWithWait(this.ep, 0, JtagAddr.STATE, result, resultMask, 0, this.timeout); if (!ret.IsSuccessful) return new(ret.Error); - progress?.Finish(); + _progressTracker?.AdvanceProgress(progressId, 10); return ret.Value; } } @@ -564,7 +568,7 @@ public class Jtag } async ValueTask> LoadDRCareInput( - byte[] bytesArray, UInt32 timeout = 10_000, UInt32 cycle = 500, ProgressReporter? progress = null) + byte[] bytesArray, UInt32 timeout = 10_000, UInt32 cycle = 500, string progressId = "") { var bytesLen = ((uint)(bytesArray.Length * 8)); if (bytesLen > Math.Pow(2, 28)) return new(new Exception("Length is over 2^(28 - 3)")); @@ -579,14 +583,15 @@ public class Jtag else if (!ret.Value) return new(new Exception("Write CMD_JTAG_LOAD_DR_CAREI Failed")); } - progress?.Report(10); + _progressTracker?.AdvanceProgress(progressId, 10); { var ret = await WriteFIFO( JtagAddr.WRITE_DATA, bytesArray, 0x01_00_00_00, JtagState.CMD_EXEC_FINISH, - progress: progress?.CreateChild(90) + 0, + progressId ); if (!ret.IsSuccessful) return new(ret.Error); @@ -709,58 +714,53 @@ public class Jtag /// 下载比特流到 JTAG 设备 /// /// 比特流数据 - /// 进度报告器 + /// 进度ID /// 指示下载是否成功的异步结果 public async ValueTask> DownloadBitstream( - byte[] bitstream, ProgressReporter? progress = null) + byte[] bitstream, string progressId = "") { // Clear Data MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace($"Clear up udp server {this.address,0} receive data"); - if (progress != null) - { - progress.ExpectedSteps = 25; - progress.Increase(); - } Result ret; ret = await CloseTest(); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Close Test Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); ret = await RunTest(); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Run Test Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); logger.Trace("Jtag initialize"); ret = await ExecRDCmd(JtagCmd.JTAG_DR_JRST); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_JRST Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); ret = await RunTest(); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Run Test Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); ret = await ExecRDCmd(JtagCmd.JTAG_DR_CFGI); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_CFGI Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); logger.Trace("Jtag ready to write bitstream"); ret = await IdleDelay(100000); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag IDLE Delay Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); - ret = await LoadDRCareInput(bitstream, progress: progress?.CreateChild(50)); + ret = await LoadDRCareInput(bitstream, progressId: progressId); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Load Data Failed")); @@ -769,40 +769,40 @@ public class Jtag ret = await CloseTest(); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Close Test Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); ret = await RunTest(); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Run Test Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); ret = await ExecRDCmd(JtagCmd.JTAG_DR_JWAKEUP); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_JWAKEUP Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); logger.Trace("Jtag reset device"); ret = await IdleDelay(10000); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag IDLE Delay Failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); var retCode = await ReadStatusReg(); if (!retCode.IsSuccessful) return new(retCode.Error); var jtagStatus = new JtagStatusReg(retCode.Value); if (!(jtagStatus.done && jtagStatus.wakeup_over && jtagStatus.init_complete)) return new(new Exception("Jtag download bitstream failed")); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); ret = await CloseTest(); if (!ret.IsSuccessful) return new(ret.Error); else if (!ret.Value) return new(new Exception("Jtag Close Test Failed")); logger.Trace("Jtag download bitstream successfully"); - progress?.Increase(); + _progressTracker?.AdvanceProgress(progressId, 10); // Finish - progress?.Finish(); + _progressTracker?.AdvanceProgress(progressId, 10); return true; } diff --git a/server/src/Services/ProgressTracker.cs b/server/src/Services/ProgressTracker.cs new file mode 100644 index 0000000..27f34dc --- /dev/null +++ b/server/src/Services/ProgressTracker.cs @@ -0,0 +1,147 @@ +using System.Collections.Concurrent; +using Microsoft.AspNetCore.SignalR; +using server.Hubs; + +namespace server.Services; + +public enum TaskState { Running, Completed, Failed, Cancelled } + +public readonly struct TaskProgress +{ + public string Id { get; } + public int Current { get; } + public int Total { get; } + public TaskState State { get; } + public long Timestamp { get; } + public string? Error { get; } + + public TaskProgress(string id, int current, int total, TaskState state, long timestamp, string? error = null) + { + Id = id; + Current = current; + Total = total; + State = state; + Timestamp = timestamp; + Error = error; + } + + public TaskProgress WithUpdate(int? current = null, TaskState? state = null, string? error = null) + { + return new TaskProgress( + Id, + current ?? Current, + Total, + state ?? State, + DateTimeOffset.UtcNow.ToUnixTimeSeconds(), + error ?? Error + ); + } + + public ProgressInfo ToProgressInfo() + { + return new ProgressInfo + { + TaskId = Id, + Status = State switch + { + TaskState.Running => ProgressStatus.Running, + TaskState.Completed => ProgressStatus.Completed, + TaskState.Failed => ProgressStatus.Failed, + TaskState.Cancelled => ProgressStatus.Canceled, + _ => ProgressStatus.Failed + }, + ProgressPercent = Total > 0 ? (Current * 100) / Total : 0, + ErrorMessage = Error ?? string.Empty + }; + } +} + +public sealed class ProgressTracker +{ + private readonly ConcurrentDictionary _tasks = new(); + private readonly Timer _cleaner; + private readonly IHubContext _hubContext; + + // 构造器支持可选的Hub注入 + public ProgressTracker(IHubContext hubContext) + { + _hubContext = hubContext; + _cleaner = new Timer(CleanExpiredTasks, null, + TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + } + + public void CleanExpiredTasks(object? obj) + { + var cutoff = DateTimeOffset.Now.AddMinutes(-3).ToUnixTimeSeconds(); + var expired = _tasks.Where(kvp => kvp.Value.Timestamp < cutoff).Select(kvp => kvp.Key).ToList(); + foreach (var id in expired) + { + _tasks.TryRemove(id, out _); + } + } + + public string CreateTask(int total) + { + var id = Guid.NewGuid().ToString(); + var task = new TaskProgress(id, 0, total, TaskState.Running, DateTimeOffset.UtcNow.ToUnixTimeSeconds()); + _tasks[id] = task; + NotifyIfNeeded(task); + return id; + } + + // 核心更新方法,现在包含自动通知 + public bool UpdateTask(string id, Func updater) + { + if (!_tasks.TryGetValue(id, out var current)) + return false; + + var updated = updater(current); + if (_tasks.TryUpdate(id, updated, current)) + { + NotifyIfNeeded(updated); + return true; + } + return false; + } + + // 自动通知逻辑 - 简单直接 + private void NotifyIfNeeded(TaskProgress task) + { + _hubContext.Clients.Group(task.Id).OnReceiveProgress(task.ToProgressInfo()); + } + + public bool UpdateProgress(string id, int current) + { + return UpdateTask(id, p => p.WithUpdate( + current: Math.Min(current, p.Total))); + } + + public bool AdvanceProgress(string id, int steps) + { + return UpdateTask(id, p => p.WithUpdate( + current: Math.Min(p.Current + steps, p.Total))); + } + + public bool CancelProgress(string id) + { + return UpdateTask(id, p => p.WithUpdate(state: TaskState.Cancelled)); + } + + public bool CompleteProgress(string id) + { + return UpdateTask(id, p => p.WithUpdate( + current: p.Total, state: TaskState.Completed)); + } + + public bool FailProgress(string id, string? error) + { + return UpdateTask(id, p => p.WithUpdate( + state: TaskState.Failed, error: error)); + } + + public TaskProgress? GetTask(string id) + { + _tasks.TryGetValue(id, out var task); + return task.Id == null ? null : task; + } +} diff --git a/server/src/Services/ProgressTrackerService.cs b/server/src/Services/ProgressTrackerService.cs deleted file mode 100644 index 420dfc8..0000000 --- a/server/src/Services/ProgressTrackerService.cs +++ /dev/null @@ -1,294 +0,0 @@ -using Microsoft.AspNetCore.SignalR; -using System.Collections.Concurrent; -using DotNext; -using Common; -using server.Hubs; - -namespace server.Services; - -public class ProgressReporter : ProgressInfo, IProgress -{ - private int _progress = 0; - private int _stepProgress = 1; - private int _expectedSteps = 100; - private int _parentProportion = 100; - - public int Progress => _progress; - public int MaxProgress { get; set; } = 100; - public int StepProgress - { - get => _stepProgress; - set - { - _stepProgress = value; - _expectedSteps = MaxProgress / value; - } - } - public int ExpectedSteps - { - get => _expectedSteps; - set - { - _expectedSteps = value; - MaxProgress = Number.IntPow(10, Number.GetLength(value)); - _stepProgress = MaxProgress / value; - } - } - public Func? ReporterFunc { get; set; } = null; - public ProgressReporter? Parent { get; set; } - public ProgressReporter? Child { get; set; } - - private ProgressStatus _status = ProgressStatus.Pending; - private string _errorMessage = string.Empty; - - public override string TaskId { get; } = Guid.NewGuid().ToString(); - public override int ProgressPercent => _progress * 100 / MaxProgress; - public override ProgressStatus Status => _status; - public override string ErrorMessage => _errorMessage; - - public ProgressReporter(Func? reporter = null, int initProgress = 0, int maxProgress = 100, int step = 1) - { - _progress = initProgress; - MaxProgress = maxProgress; - StepProgress = step; - ReporterFunc = reporter; - } - - public ProgressReporter(int parentProportion, int expectedSteps = 100, Func? reporter = null) - { - this._parentProportion = parentProportion; - MaxProgress = Number.IntPow(10, Number.GetLength(expectedSteps)); - StepProgress = MaxProgress / expectedSteps; - ReporterFunc = reporter; - } - - private async void ForceReport(int value) - { - try - { - if (ReporterFunc != null) - await ReporterFunc(value); - - if (Parent != null) - Parent.Increase((value - _progress) / StepProgress * _parentProportion / (MaxProgress / StepProgress)); - - _progress = value; - } - catch (OperationCanceledException ex) - { - _errorMessage = ex.Message; - this._status = ProgressStatus.Canceled; - } - catch (Exception ex) - { - _errorMessage = ex.Message; - this._status = ProgressStatus.Failed; - } - } - - public void Report(int value) - { - if (this._status == ProgressStatus.Pending) - this._status = ProgressStatus.InProgress; - else if (this.Status != ProgressStatus.InProgress) - return; - - if (value > MaxProgress) return; - ForceReport(value); - } - - public void Increase(int? value = null) - { - if (this._status == ProgressStatus.Pending) - this._status = ProgressStatus.InProgress; - else if (this.Status != ProgressStatus.InProgress) - return; - - if (value.HasValue) - { - if (_progress + value.Value >= MaxProgress) return; - this.Report(_progress + value.Value); - } - else - { - if (_progress + StepProgress >= MaxProgress) return; - this.Report(_progress + StepProgress); - } - } - - public void Finish() - { - this._status = ProgressStatus.Completed; - this.ForceReport(MaxProgress); - } - - public void Cancel() - { - this._status = ProgressStatus.Canceled; - this._errorMessage = "User Cancelled"; - this.ForceReport(_progress); - } - - public void Error(string message) - { - this._status = ProgressStatus.Failed; - this._errorMessage = message; - this.ForceReport(_progress); - } - - public ProgressReporter CreateChild(int proportion, int expectedSteps = 100) - { - var child = new ProgressReporter(proportion, expectedSteps); - child.Parent = this; - this.Child = child; - return child; - } -} - -public class ProgressTrackerService : BackgroundService -{ - private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); - private readonly ConcurrentDictionary _taskMap = new(); - private readonly IHubContext _hubContext; - - private class TaskProgressInfo - { - public ProgressReporter? Reporter { get; set; } - public string? ConnectionId { get; set; } - public required CancellationToken CancellationToken { get; set; } - public required CancellationTokenSource CancellationTokenSource { get; set; } - public required DateTime UpdatedAt { get; set; } - } - - public ProgressTrackerService(IHubContext hubContext) - { - _hubContext = hubContext; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - while (!stoppingToken.IsCancellationRequested) - { - try - { - var now = DateTime.UtcNow; - foreach (var kvp in _taskMap) - { - var info = kvp.Value; - // 超过 1 分钟且任务已完成/失败/取消 - if ( - (now - info.UpdatedAt).TotalMinutes > 1 && - info.Reporter != null && - ( - info.Reporter.Status == ProgressStatus.Completed || - info.Reporter.Status == ProgressStatus.Failed || - info.Reporter.Status == ProgressStatus.Canceled - ) - ) - { - _taskMap.TryRemove(kvp.Key, out _); - logger.Info($"Cleaned up task {kvp.Key}"); - } - } - } - catch (Exception ex) - { - logger.Error(ex, "Error during ProgressTracker cleanup"); - } - await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken); - } - } - - public (string, ProgressReporter) CreateTask(CancellationToken? cancellationToken = null) - { - CancellationTokenSource? cancellationTokenSource; - if (cancellationToken.HasValue) - { - cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken.Value); - } - else - { - cancellationTokenSource = new CancellationTokenSource(); - } - - var progressInfo = new TaskProgressInfo - { - ConnectionId = null, - UpdatedAt = DateTime.UtcNow, - CancellationToken = cancellationTokenSource.Token, - CancellationTokenSource = cancellationTokenSource, - }; - - var progress = new ProgressReporter(async value => - { - cancellationTokenSource.Token.ThrowIfCancellationRequested(); - - // 通过 SignalR 推送进度 - if (progressInfo.ConnectionId != null && progressInfo.Reporter != null) - await _hubContext.Clients.Client(progressInfo.ConnectionId).OnReceiveProgress(progressInfo.Reporter); - }); - - progressInfo.Reporter = progress; - - _taskMap.TryAdd(progressInfo.Reporter.TaskId, progressInfo); - - return (progressInfo.Reporter.TaskId, progress); - } - - public Optional GetReporter(string taskId) - { - if (_taskMap.TryGetValue(taskId, out var info)) - { - return info.Reporter; - } - return Optional.None; - } - - public Optional GetProgressStatus(string taskId) - { - if (_taskMap.TryGetValue(taskId, out var info)) - { - if (info.Reporter != null) - return info.Reporter.Status; - } - return Optional.None; - } - - public bool BindTask(string taskId, string connectionId) - { - if (_taskMap.TryGetValue(taskId, out var info) && info != null) - { - lock (info) - { - info.ConnectionId = connectionId; - } - return true; - } - return false; - } - - public bool CancelTask(string taskId) - { - try - { - if (_taskMap.TryGetValue(taskId, out var info) && info != null && info.Reporter != null) - { - lock (info) - { - info.CancellationTokenSource.Cancel(); - info.Reporter.Cancel(); - info.UpdatedAt = DateTime.UtcNow; - } - - return true; - } - - return false; - } - catch (Exception ex) - { - logger.Error(ex, $"Failed to cancel task {taskId}"); - return false; - } - } -} diff --git a/server/src/UdpClientPool.cs b/server/src/UdpClientPool.cs index 1797630..99a4ac1 100644 --- a/server/src/UdpClientPool.cs +++ b/server/src/UdpClientPool.cs @@ -8,11 +8,11 @@ using server.Services; /// /// UDP客户端发送池 /// -public class UDPClientPool +public sealed class UDPClientPool { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); - private static IPAddress localhost = IPAddress.Parse("127.0.0.1"); + private static ProgressTracker _progressTracker = MsgBus.ProgressTracker; /// /// 发送字符串 @@ -183,40 +183,6 @@ public class UDPClientPool return await Task.Run(() => { return SendDataPack(endPoint, pkg); }); } - /// - /// 发送字符串到本地 - /// - /// 端口 - /// 字符串数组 - /// 是否成功 - public static bool SendStringLocalHost(int port, string[] stringArray) - { - return SendString(new IPEndPoint(localhost, port), stringArray); - } - - /// - /// 循环发送字符串到本地 - /// - /// 发送总次数 - /// 间隔时间 - /// 端口 - /// 字符串数组 - /// 是否成功 - public static bool CycleSendStringLocalHost(int times, int sleepMilliSeconds, int port, string[] stringArray) - { - var isSuccessful = true; - - while (times-- >= 0) - { - isSuccessful = SendStringLocalHost(port, stringArray); - if (!isSuccessful) break; - - Thread.Sleep(sleepMilliSeconds); - } - - return isSuccessful; - } - /// /// 读取设备地址数据 /// @@ -607,11 +573,11 @@ public class UDPClientPool /// 设备地址 /// 要写入的32位数据 /// 超时时间(毫秒) - /// 进度报告器 + /// 进度报告器 /// 写入结果,true表示写入成功 public static async ValueTask> WriteAddr( IPEndPoint endPoint, int taskID, UInt32 devAddr, - UInt32 data, int timeout = 1000, ProgressReporter? progress = null) + UInt32 data, int timeout = 1000, string progressId = "") { var ret = false; var opts = new SendAddrPackOptions() @@ -622,17 +588,18 @@ public class UDPClientPool Address = devAddr, IsWrite = true, }; - progress?.Report(20); + _progressTracker.AdvanceProgress(progressId, 10); // Write Register ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts)); if (!ret) return new(new Exception("Send 1st address package failed!")); - progress?.Report(40); + _progressTracker.AdvanceProgress(progressId, 10); + // Send Data Package ret = await UDPClientPool.SendDataPackAsync(endPoint, new SendDataPackage(Common.Number.NumberToBytes(data, 4).Value)); if (!ret) return new(new Exception("Send data package failed!")); - progress?.Report(60); + _progressTracker.AdvanceProgress(progressId, 10); // Check Msg Bus if (!MsgBus.IsRunning) @@ -642,7 +609,7 @@ public class UDPClientPool var udpWriteAck = await MsgBus.UDPServer.WaitForAckAsync( endPoint.Address.ToString(), taskID, endPoint.Port, timeout); if (!udpWriteAck.IsSuccessful) return new(udpWriteAck.Error); - progress?.Finish(); + _progressTracker.AdvanceProgress(progressId, 10); return udpWriteAck.Value.IsSuccessful; } @@ -655,11 +622,11 @@ public class UDPClientPool /// 设备地址 /// 要写入的字节数组 /// 超时时间(毫秒) - /// 进度报告器 + /// 进度报告器 /// 写入结果,true表示写入成功 public static async ValueTask> WriteAddr( IPEndPoint endPoint, int taskID, UInt32 devAddr, - byte[] dataArray, int timeout = 1000, ProgressReporter? progress = null) + byte[] dataArray, int timeout = 1000, string progressId = "") { var ret = false; var opts = new SendAddrPackOptions() @@ -681,8 +648,6 @@ public class UDPClientPool var writeTimes = hasRest ? dataArray.Length / (max4BytesPerRead * (32 / 8)) + 1 : dataArray.Length / (max4BytesPerRead * (32 / 8)); - if (progress != null) - progress.ExpectedSteps = writeTimes; for (var i = 0; i < writeTimes; i++) { // Sperate Data Array @@ -712,10 +677,9 @@ public class UDPClientPool if (!udpWriteAck.Value.IsSuccessful) return false; - progress?.Increase(); + _progressTracker.AdvanceProgress(progressId, 10); } - progress?.Finish(); return true; } diff --git a/src/components/UploadCard.vue b/src/components/UploadCard.vue index aeeb32a..758e8e8 100644 --- a/src/components/UploadCard.vue +++ b/src/components/UploadCard.vue @@ -16,22 +16,32 @@ {{ bitstream.name }}
@@ -78,28 +92,19 @@ diff --git a/src/stores/progress.ts b/src/stores/progress.ts new file mode 100644 index 0000000..ef70dc7 --- /dev/null +++ b/src/stores/progress.ts @@ -0,0 +1,83 @@ +import type { HubConnection } from "@microsoft/signalr"; +import type { + IProgressHub, + IProgressReceiver, +} from "@/utils/signalR/TypedSignalR.Client/server.Hubs"; +import { + getHubProxyFactory, + getReceiverRegister, +} from "@/utils/signalR/TypedSignalR.Client"; +import { ProgressStatus, type ProgressInfo } from "@/utils/signalR/server.Hubs"; +import { onMounted, onUnmounted, ref, shallowRef } from "vue"; +import { defineStore } from "pinia"; +import { AuthManager } from "@/utils/AuthManager"; +import { forEach, isUndefined } from "lodash"; + +export type ProgressCallback = (msg: ProgressInfo) => void; + +export const useProgressStore = defineStore("progress", () => { + // taskId -> name -> callback + const progressCallbackFuncs = shallowRef< + Map> + >(new Map()); + + const progressHubConnection = shallowRef(); + const progressHubProxy = shallowRef(); + const progressHubReceiver: IProgressReceiver = { + onReceiveProgress: async (msg) => { + const taskMap = progressCallbackFuncs.value.get(msg.taskId); + if (taskMap) { + for (const func of taskMap.values()) { + func(msg); + } + } + }, + }; + + onMounted(async () => { + progressHubConnection.value = + AuthManager.createHubConnection("ProgressHub"); + progressHubProxy.value = getHubProxyFactory("IProgressHub").createHubProxy( + progressHubConnection.value, + ); + getReceiverRegister("IProgressReceiver").register( + progressHubConnection.value, + progressHubReceiver, + ); + progressHubConnection.value.start(); + }); + + onUnmounted(() => { + if (progressHubConnection.value) { + progressHubConnection.value.stop(); + progressHubConnection.value = undefined; + progressHubProxy.value = undefined; + } + }); + + function register(progressId: string, name: string, func: ProgressCallback) { + progressHubProxy.value?.join(progressId); + let taskMap = progressCallbackFuncs.value.get(progressId); + if (!taskMap) { + taskMap = new Map(); + progressCallbackFuncs.value?.set(progressId, taskMap); + } + taskMap.set(name, func); + } + + function unregister(taskId: string, name: string) { + progressHubProxy.value?.leave(taskId); + const taskMap = progressCallbackFuncs.value.get(taskId); + if (taskMap) { + taskMap.delete(name); + if (taskMap.size === 0) { + progressCallbackFuncs.value?.delete(taskId); + } + } + } + + return { + register, + unregister, + }; +}); diff --git a/src/utils/signalR/TypedSignalR.Client/index.ts b/src/utils/signalR/TypedSignalR.Client/index.ts index cbe2a39..55e0eac 100644 --- a/src/utils/signalR/TypedSignalR.Client/index.ts +++ b/src/utils/signalR/TypedSignalR.Client/index.ts @@ -161,6 +161,14 @@ class IProgressHub_HubProxy implements IProgressHub { public readonly join = async (taskId: string): Promise => { return await this.connection.invoke("Join", taskId); } + + public readonly leave = async (taskId: string): Promise => { + return await this.connection.invoke("Leave", taskId); + } + + public readonly getProgress = async (taskId: string): Promise => { + return await this.connection.invoke("GetProgress", taskId); + } } diff --git a/src/utils/signalR/TypedSignalR.Client/server.Hubs.ts b/src/utils/signalR/TypedSignalR.Client/server.Hubs.ts index c4966dc..3b17ecd 100644 --- a/src/utils/signalR/TypedSignalR.Client/server.Hubs.ts +++ b/src/utils/signalR/TypedSignalR.Client/server.Hubs.ts @@ -48,6 +48,16 @@ export type IProgressHub = { * @returns Transpiled from System.Threading.Tasks.Task */ join(taskId: string): Promise; + /** + * @param taskId Transpiled from string + * @returns Transpiled from System.Threading.Tasks.Task + */ + leave(taskId: string): Promise; + /** + * @param taskId Transpiled from string + * @returns Transpiled from System.Threading.Tasks.Task + */ + getProgress(taskId: string): Promise; } export type IDigitalTubesReceiver = { diff --git a/src/utils/signalR/server.Hubs.ts b/src/utils/signalR/server.Hubs.ts index 4b6b4b6..9726bda 100644 --- a/src/utils/signalR/server.Hubs.ts +++ b/src/utils/signalR/server.Hubs.ts @@ -12,11 +12,10 @@ export type DigitalTubeTaskStatus = { /** Transpiled from server.Hubs.ProgressStatus */ export enum ProgressStatus { - Pending = 0, - InProgress = 1, - Completed = 2, - Canceled = 3, - Failed = 4, + Running = 0, + Completed = 1, + Canceled = 2, + Failed = 3, } /** Transpiled from server.Hubs.ProgressInfo */