fix: 修复进度条的问题
This commit is contained in:
@@ -16,17 +16,12 @@ public class JtagController : ControllerBase
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
|
||||
private readonly ProgressTrackerService _tracker;
|
||||
private readonly ProgressTracker _tracker = MsgBus.ProgressTracker;
|
||||
private readonly UserManager _userManager = new();
|
||||
private readonly ResourceManager _resourceManager = new();
|
||||
|
||||
private const string BITSTREAM_PATH = "bitstream/Jtag";
|
||||
|
||||
public JtagController(ProgressTrackerService tracker)
|
||||
{
|
||||
_tracker = tracker;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 控制器首页信息
|
||||
/// </summary>
|
||||
@@ -188,8 +183,8 @@ public class JtagController : ControllerBase
|
||||
logger.Info($"User {username} processing bitstream file of size: {fileBytes.Length} bytes");
|
||||
|
||||
// 定义进度跟踪
|
||||
var (taskId, progress) = _tracker.CreateTask(cancelToken);
|
||||
progress.Report(10);
|
||||
var taskId = _tracker.CreateTask(1000);
|
||||
_tracker.AdvanceProgress(taskId, 10);
|
||||
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
@@ -210,7 +205,8 @@ public class JtagController : ControllerBase
|
||||
if (!retBuffer.IsSuccessful)
|
||||
{
|
||||
logger.Error($"User {username} failed to reverse bytes: {retBuffer.Error}");
|
||||
progress.Error($"User {username} failed to reverse bytes: {retBuffer.Error}");
|
||||
_tracker.FailProgress(taskId,
|
||||
$"User {username} failed to reverse bytes: {retBuffer.Error}");
|
||||
return;
|
||||
}
|
||||
revBuffer = retBuffer.Value;
|
||||
@@ -228,7 +224,7 @@ public class JtagController : ControllerBase
|
||||
var processedBytes = outputStream.ToArray();
|
||||
logger.Info($"User {username} processed {totalBytesProcessed} bytes for device {address}");
|
||||
|
||||
progress.Report(20);
|
||||
_tracker.AdvanceProgress(taskId, 20);
|
||||
|
||||
// 下载比特流
|
||||
var jtagCtrl = new Peripherals.JtagClient.Jtag(address, port);
|
||||
@@ -237,12 +233,13 @@ public class JtagController : ControllerBase
|
||||
if (ret.IsSuccessful)
|
||||
{
|
||||
logger.Info($"User {username} successfully downloaded bitstream '{resource.ResourceName}' to device {address}");
|
||||
progress.Finish();
|
||||
_tracker.CompleteProgress(taskId);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.Error($"User {username} failed to download bitstream to device {address}: {ret.Error}");
|
||||
progress.Error($"User {username} failed to download bitstream to device {address}: {ret.Error}");
|
||||
_tracker.FailProgress(taskId,
|
||||
$"User {username} failed to download bitstream to device {address}: {ret.Error}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using System.Security.Claims;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.AspNetCore.Cors;
|
||||
using TypedSignalR.Client;
|
||||
using Tapper;
|
||||
using server.Services;
|
||||
|
||||
#pragma warning disable 1998
|
||||
|
||||
namespace server.Hubs;
|
||||
|
||||
[Hub]
|
||||
public interface IProgressHub
|
||||
{
|
||||
Task<bool> Join(string taskId);
|
||||
Task<bool> Leave(string taskId);
|
||||
Task<ProgressInfo?> GetProgress(string taskId);
|
||||
}
|
||||
|
||||
[Receiver]
|
||||
@@ -23,8 +26,7 @@ public interface IProgressReceiver
|
||||
[TranspilationSource]
|
||||
public enum ProgressStatus
|
||||
{
|
||||
Pending,
|
||||
InProgress,
|
||||
Running,
|
||||
Completed,
|
||||
Canceled,
|
||||
Failed
|
||||
@@ -33,10 +35,10 @@ public enum ProgressStatus
|
||||
[TranspilationSource]
|
||||
public class ProgressInfo
|
||||
{
|
||||
public virtual string TaskId { get; } = string.Empty;
|
||||
public virtual ProgressStatus Status { get; }
|
||||
public virtual int ProgressPercent { get; } = 0;
|
||||
public virtual string ErrorMessage { get; } = string.Empty;
|
||||
public required string TaskId { get; set; }
|
||||
public required ProgressStatus Status { get; set; }
|
||||
public required int ProgressPercent { get; set; }
|
||||
public required string ErrorMessage { get; set; }
|
||||
};
|
||||
|
||||
[Authorize]
|
||||
@@ -44,18 +46,32 @@ public class ProgressInfo
|
||||
public class ProgressHub : Hub<IProgressReceiver>, IProgressHub
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
|
||||
private readonly IHubContext<ProgressHub, IProgressReceiver> _hubContext;
|
||||
private readonly ProgressTrackerService _tracker;
|
||||
|
||||
public ProgressHub(IHubContext<ProgressHub, IProgressReceiver> hubContext, ProgressTrackerService tracker)
|
||||
{
|
||||
_hubContext = hubContext;
|
||||
_tracker = tracker;
|
||||
}
|
||||
private readonly ProgressTracker _progressTracker = MsgBus.ProgressTracker;
|
||||
|
||||
public async Task<bool> Join(string taskId)
|
||||
{
|
||||
return await Task.Run(() => _tracker.BindTask(taskId, Context.ConnectionId));
|
||||
await Groups.AddToGroupAsync(Context.ConnectionId, taskId);
|
||||
|
||||
// 发送当前状态(如果存在)
|
||||
var task = _progressTracker.GetTask(taskId);
|
||||
if (task != null)
|
||||
{
|
||||
await Clients.Caller.OnReceiveProgress(task.Value.ToProgressInfo());
|
||||
}
|
||||
|
||||
logger.Info($"Client {Context.ConnectionId} joined task {taskId}");
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task<bool> Leave(string taskId)
|
||||
{
|
||||
await Groups.RemoveFromGroupAsync(Context.ConnectionId, taskId);
|
||||
logger.Info($"Client {Context.ConnectionId} left task {taskId}");
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task<ProgressInfo?> GetProgress(string taskId)
|
||||
{
|
||||
return _progressTracker.GetTask(taskId)?.ToProgressInfo();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
using server.Services;
|
||||
/// <summary>
|
||||
/// 多线程通信总线
|
||||
/// </summary>
|
||||
public static class MsgBus
|
||||
public sealed class MsgBus
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
|
||||
@@ -11,12 +12,39 @@ public static class MsgBus
|
||||
/// </summary>
|
||||
public static UDPServer UDPServer { get { return udpServer; } }
|
||||
|
||||
// 添加静态ProgressTracker引用
|
||||
private static ProgressTracker? _progressTracker;
|
||||
|
||||
/// <summary>
|
||||
/// 设置全局ProgressTracker实例
|
||||
/// </summary>
|
||||
public static void SetProgressTracker(ProgressTracker progressTracker)
|
||||
{
|
||||
_progressTracker = progressTracker;
|
||||
}
|
||||
|
||||
public static ProgressTracker ProgressTracker
|
||||
{
|
||||
get
|
||||
{
|
||||
if (_progressTracker == null)
|
||||
{
|
||||
throw new InvalidOperationException("ProgressTracker is not set.");
|
||||
}
|
||||
return _progressTracker;
|
||||
}
|
||||
}
|
||||
|
||||
private static bool isRunning = false;
|
||||
/// <summary>
|
||||
/// 获取通信总线运行状态
|
||||
/// </summary>
|
||||
public static bool IsRunning { get { return isRunning; } }
|
||||
|
||||
private MsgBus() { }
|
||||
|
||||
static MsgBus() { }
|
||||
|
||||
/// <summary>
|
||||
/// 通信总线初始化
|
||||
/// </summary>
|
||||
|
||||
@@ -380,6 +380,7 @@ public class JtagStatusReg
|
||||
public class Jtag
|
||||
{
|
||||
private static readonly NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
private readonly ProgressTracker? _progressTracker;
|
||||
|
||||
private const int CLOCK_FREQ = 50; // MHz
|
||||
|
||||
@@ -392,18 +393,21 @@ public class Jtag
|
||||
public readonly string address;
|
||||
private IPEndPoint ep;
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Jtag 构造函数
|
||||
/// </summary>
|
||||
/// <param name="address">目标 IP 地址</param>
|
||||
/// <param name="port">目标 UDP 端口</param>
|
||||
/// <param name="timeout">超时时间(毫秒)</param>
|
||||
public Jtag(string address, int port, int timeout = 2000)
|
||||
/// <param name="progressTracker">进度追踪器</param>
|
||||
public Jtag(string address, int port, int timeout = 2000, ProgressTracker? progressTracker = null)
|
||||
{
|
||||
this.address = address;
|
||||
this.port = port;
|
||||
this.ep = new IPEndPoint(IPAddress.Parse(address), port);
|
||||
this.timeout = timeout;
|
||||
this._progressTracker = progressTracker;
|
||||
}
|
||||
|
||||
async ValueTask<Result<uint>> ReadFIFO(uint devAddr)
|
||||
@@ -444,10 +448,10 @@ public class Jtag
|
||||
|
||||
async ValueTask<Result<bool>> WriteFIFO(
|
||||
UInt32 devAddr, UInt32 data, UInt32 result,
|
||||
UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, ProgressReporter? progress = null)
|
||||
UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, string progressId = "")
|
||||
{
|
||||
{
|
||||
var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progress?.CreateChild(80));
|
||||
var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progressId);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
if (!ret.Value) return new(new Exception("Write FIFO failed"));
|
||||
}
|
||||
@@ -458,17 +462,17 @@ public class Jtag
|
||||
{
|
||||
var ret = await UDPClientPool.ReadAddrWithWait(this.ep, 0, JtagAddr.STATE, result, resultMask, 0, this.timeout);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
progress?.Finish();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
return ret.Value;
|
||||
}
|
||||
}
|
||||
|
||||
async ValueTask<Result<bool>> WriteFIFO(
|
||||
UInt32 devAddr, byte[] data, UInt32 result,
|
||||
UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, ProgressReporter? progress = null)
|
||||
UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, string progressId = "")
|
||||
{
|
||||
{
|
||||
var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progress?.CreateChild(80));
|
||||
var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progressId);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
if (!ret.Value) return new(new Exception("Write FIFO failed"));
|
||||
}
|
||||
@@ -479,7 +483,7 @@ public class Jtag
|
||||
{
|
||||
var ret = await UDPClientPool.ReadAddrWithWait(this.ep, 0, JtagAddr.STATE, result, resultMask, 0, this.timeout);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
progress?.Finish();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
return ret.Value;
|
||||
}
|
||||
}
|
||||
@@ -564,7 +568,7 @@ public class Jtag
|
||||
}
|
||||
|
||||
async ValueTask<Result<bool>> LoadDRCareInput(
|
||||
byte[] bytesArray, UInt32 timeout = 10_000, UInt32 cycle = 500, ProgressReporter? progress = null)
|
||||
byte[] bytesArray, UInt32 timeout = 10_000, UInt32 cycle = 500, string progressId = "")
|
||||
{
|
||||
var bytesLen = ((uint)(bytesArray.Length * 8));
|
||||
if (bytesLen > Math.Pow(2, 28)) return new(new Exception("Length is over 2^(28 - 3)"));
|
||||
@@ -579,14 +583,15 @@ public class Jtag
|
||||
else if (!ret.Value) return new(new Exception("Write CMD_JTAG_LOAD_DR_CAREI Failed"));
|
||||
}
|
||||
|
||||
progress?.Report(10);
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
{
|
||||
var ret = await WriteFIFO(
|
||||
JtagAddr.WRITE_DATA,
|
||||
bytesArray, 0x01_00_00_00,
|
||||
JtagState.CMD_EXEC_FINISH,
|
||||
progress: progress?.CreateChild(90)
|
||||
0,
|
||||
progressId
|
||||
);
|
||||
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
@@ -709,58 +714,53 @@ public class Jtag
|
||||
/// 下载比特流到 JTAG 设备
|
||||
/// </summary>
|
||||
/// <param name="bitstream">比特流数据</param>
|
||||
/// <param name="progress">进度报告器</param>
|
||||
/// <param name="progressId">进度ID</param>
|
||||
/// <returns>指示下载是否成功的异步结果</returns>
|
||||
public async ValueTask<Result<bool>> DownloadBitstream(
|
||||
byte[] bitstream, ProgressReporter? progress = null)
|
||||
byte[] bitstream, string progressId = "")
|
||||
{
|
||||
// Clear Data
|
||||
MsgBus.UDPServer.ClearUDPData(this.address, 0);
|
||||
|
||||
logger.Trace($"Clear up udp server {this.address,0} receive data");
|
||||
if (progress != null)
|
||||
{
|
||||
progress.ExpectedSteps = 25;
|
||||
progress.Increase();
|
||||
}
|
||||
|
||||
Result<bool> ret;
|
||||
|
||||
ret = await CloseTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Close Test Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
ret = await RunTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Run Test Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
logger.Trace("Jtag initialize");
|
||||
|
||||
ret = await ExecRDCmd(JtagCmd.JTAG_DR_JRST);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_JRST Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
ret = await RunTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Run Test Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
ret = await ExecRDCmd(JtagCmd.JTAG_DR_CFGI);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_CFGI Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
logger.Trace("Jtag ready to write bitstream");
|
||||
|
||||
ret = await IdleDelay(100000);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag IDLE Delay Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
ret = await LoadDRCareInput(bitstream, progress: progress?.CreateChild(50));
|
||||
ret = await LoadDRCareInput(bitstream, progressId: progressId);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Load Data Failed"));
|
||||
|
||||
@@ -769,40 +769,40 @@ public class Jtag
|
||||
ret = await CloseTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Close Test Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
ret = await RunTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Run Test Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
ret = await ExecRDCmd(JtagCmd.JTAG_DR_JWAKEUP);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_JWAKEUP Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
logger.Trace("Jtag reset device");
|
||||
|
||||
ret = await IdleDelay(10000);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag IDLE Delay Failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
var retCode = await ReadStatusReg();
|
||||
if (!retCode.IsSuccessful) return new(retCode.Error);
|
||||
var jtagStatus = new JtagStatusReg(retCode.Value);
|
||||
if (!(jtagStatus.done && jtagStatus.wakeup_over && jtagStatus.init_complete))
|
||||
return new(new Exception("Jtag download bitstream failed"));
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
ret = await CloseTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Close Test Failed"));
|
||||
logger.Trace("Jtag download bitstream successfully");
|
||||
progress?.Increase();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
|
||||
// Finish
|
||||
progress?.Finish();
|
||||
_progressTracker?.AdvanceProgress(progressId, 10);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
147
server/src/Services/ProgressTracker.cs
Normal file
147
server/src/Services/ProgressTracker.cs
Normal file
@@ -0,0 +1,147 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using server.Hubs;
|
||||
|
||||
namespace server.Services;
|
||||
|
||||
public enum TaskState { Running, Completed, Failed, Cancelled }
|
||||
|
||||
public readonly struct TaskProgress
|
||||
{
|
||||
public string Id { get; }
|
||||
public int Current { get; }
|
||||
public int Total { get; }
|
||||
public TaskState State { get; }
|
||||
public long Timestamp { get; }
|
||||
public string? Error { get; }
|
||||
|
||||
public TaskProgress(string id, int current, int total, TaskState state, long timestamp, string? error = null)
|
||||
{
|
||||
Id = id;
|
||||
Current = current;
|
||||
Total = total;
|
||||
State = state;
|
||||
Timestamp = timestamp;
|
||||
Error = error;
|
||||
}
|
||||
|
||||
public TaskProgress WithUpdate(int? current = null, TaskState? state = null, string? error = null)
|
||||
{
|
||||
return new TaskProgress(
|
||||
Id,
|
||||
current ?? Current,
|
||||
Total,
|
||||
state ?? State,
|
||||
DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
|
||||
error ?? Error
|
||||
);
|
||||
}
|
||||
|
||||
public ProgressInfo ToProgressInfo()
|
||||
{
|
||||
return new ProgressInfo
|
||||
{
|
||||
TaskId = Id,
|
||||
Status = State switch
|
||||
{
|
||||
TaskState.Running => ProgressStatus.Running,
|
||||
TaskState.Completed => ProgressStatus.Completed,
|
||||
TaskState.Failed => ProgressStatus.Failed,
|
||||
TaskState.Cancelled => ProgressStatus.Canceled,
|
||||
_ => ProgressStatus.Failed
|
||||
},
|
||||
ProgressPercent = Total > 0 ? (Current * 100) / Total : 0,
|
||||
ErrorMessage = Error ?? string.Empty
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class ProgressTracker
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, TaskProgress> _tasks = new();
|
||||
private readonly Timer _cleaner;
|
||||
private readonly IHubContext<ProgressHub, IProgressReceiver> _hubContext;
|
||||
|
||||
// 构造器支持可选的Hub注入
|
||||
public ProgressTracker(IHubContext<ProgressHub, IProgressReceiver> hubContext)
|
||||
{
|
||||
_hubContext = hubContext;
|
||||
_cleaner = new Timer(CleanExpiredTasks, null,
|
||||
TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
|
||||
}
|
||||
|
||||
public void CleanExpiredTasks(object? obj)
|
||||
{
|
||||
var cutoff = DateTimeOffset.Now.AddMinutes(-3).ToUnixTimeSeconds();
|
||||
var expired = _tasks.Where(kvp => kvp.Value.Timestamp < cutoff).Select(kvp => kvp.Key).ToList();
|
||||
foreach (var id in expired)
|
||||
{
|
||||
_tasks.TryRemove(id, out _);
|
||||
}
|
||||
}
|
||||
|
||||
public string CreateTask(int total)
|
||||
{
|
||||
var id = Guid.NewGuid().ToString();
|
||||
var task = new TaskProgress(id, 0, total, TaskState.Running, DateTimeOffset.UtcNow.ToUnixTimeSeconds());
|
||||
_tasks[id] = task;
|
||||
NotifyIfNeeded(task);
|
||||
return id;
|
||||
}
|
||||
|
||||
// 核心更新方法,现在包含自动通知
|
||||
public bool UpdateTask(string id, Func<TaskProgress, TaskProgress> updater)
|
||||
{
|
||||
if (!_tasks.TryGetValue(id, out var current))
|
||||
return false;
|
||||
|
||||
var updated = updater(current);
|
||||
if (_tasks.TryUpdate(id, updated, current))
|
||||
{
|
||||
NotifyIfNeeded(updated);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// 自动通知逻辑 - 简单直接
|
||||
private void NotifyIfNeeded(TaskProgress task)
|
||||
{
|
||||
_hubContext.Clients.Group(task.Id).OnReceiveProgress(task.ToProgressInfo());
|
||||
}
|
||||
|
||||
public bool UpdateProgress(string id, int current)
|
||||
{
|
||||
return UpdateTask(id, p => p.WithUpdate(
|
||||
current: Math.Min(current, p.Total)));
|
||||
}
|
||||
|
||||
public bool AdvanceProgress(string id, int steps)
|
||||
{
|
||||
return UpdateTask(id, p => p.WithUpdate(
|
||||
current: Math.Min(p.Current + steps, p.Total)));
|
||||
}
|
||||
|
||||
public bool CancelProgress(string id)
|
||||
{
|
||||
return UpdateTask(id, p => p.WithUpdate(state: TaskState.Cancelled));
|
||||
}
|
||||
|
||||
public bool CompleteProgress(string id)
|
||||
{
|
||||
return UpdateTask(id, p => p.WithUpdate(
|
||||
current: p.Total, state: TaskState.Completed));
|
||||
}
|
||||
|
||||
public bool FailProgress(string id, string? error)
|
||||
{
|
||||
return UpdateTask(id, p => p.WithUpdate(
|
||||
state: TaskState.Failed, error: error));
|
||||
}
|
||||
|
||||
public TaskProgress? GetTask(string id)
|
||||
{
|
||||
_tasks.TryGetValue(id, out var task);
|
||||
return task.Id == null ? null : task;
|
||||
}
|
||||
}
|
||||
@@ -1,294 +0,0 @@
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using System.Collections.Concurrent;
|
||||
using DotNext;
|
||||
using Common;
|
||||
using server.Hubs;
|
||||
|
||||
namespace server.Services;
|
||||
|
||||
public class ProgressReporter : ProgressInfo, IProgress<int>
|
||||
{
|
||||
private int _progress = 0;
|
||||
private int _stepProgress = 1;
|
||||
private int _expectedSteps = 100;
|
||||
private int _parentProportion = 100;
|
||||
|
||||
public int Progress => _progress;
|
||||
public int MaxProgress { get; set; } = 100;
|
||||
public int StepProgress
|
||||
{
|
||||
get => _stepProgress;
|
||||
set
|
||||
{
|
||||
_stepProgress = value;
|
||||
_expectedSteps = MaxProgress / value;
|
||||
}
|
||||
}
|
||||
public int ExpectedSteps
|
||||
{
|
||||
get => _expectedSteps;
|
||||
set
|
||||
{
|
||||
_expectedSteps = value;
|
||||
MaxProgress = Number.IntPow(10, Number.GetLength(value));
|
||||
_stepProgress = MaxProgress / value;
|
||||
}
|
||||
}
|
||||
public Func<int, Task>? ReporterFunc { get; set; } = null;
|
||||
public ProgressReporter? Parent { get; set; }
|
||||
public ProgressReporter? Child { get; set; }
|
||||
|
||||
private ProgressStatus _status = ProgressStatus.Pending;
|
||||
private string _errorMessage = string.Empty;
|
||||
|
||||
public override string TaskId { get; } = Guid.NewGuid().ToString();
|
||||
public override int ProgressPercent => _progress * 100 / MaxProgress;
|
||||
public override ProgressStatus Status => _status;
|
||||
public override string ErrorMessage => _errorMessage;
|
||||
|
||||
public ProgressReporter(Func<int, Task>? reporter = null, int initProgress = 0, int maxProgress = 100, int step = 1)
|
||||
{
|
||||
_progress = initProgress;
|
||||
MaxProgress = maxProgress;
|
||||
StepProgress = step;
|
||||
ReporterFunc = reporter;
|
||||
}
|
||||
|
||||
public ProgressReporter(int parentProportion, int expectedSteps = 100, Func<int, Task>? reporter = null)
|
||||
{
|
||||
this._parentProportion = parentProportion;
|
||||
MaxProgress = Number.IntPow(10, Number.GetLength(expectedSteps));
|
||||
StepProgress = MaxProgress / expectedSteps;
|
||||
ReporterFunc = reporter;
|
||||
}
|
||||
|
||||
private async void ForceReport(int value)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (ReporterFunc != null)
|
||||
await ReporterFunc(value);
|
||||
|
||||
if (Parent != null)
|
||||
Parent.Increase((value - _progress) / StepProgress * _parentProportion / (MaxProgress / StepProgress));
|
||||
|
||||
_progress = value;
|
||||
}
|
||||
catch (OperationCanceledException ex)
|
||||
{
|
||||
_errorMessage = ex.Message;
|
||||
this._status = ProgressStatus.Canceled;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_errorMessage = ex.Message;
|
||||
this._status = ProgressStatus.Failed;
|
||||
}
|
||||
}
|
||||
|
||||
public void Report(int value)
|
||||
{
|
||||
if (this._status == ProgressStatus.Pending)
|
||||
this._status = ProgressStatus.InProgress;
|
||||
else if (this.Status != ProgressStatus.InProgress)
|
||||
return;
|
||||
|
||||
if (value > MaxProgress) return;
|
||||
ForceReport(value);
|
||||
}
|
||||
|
||||
public void Increase(int? value = null)
|
||||
{
|
||||
if (this._status == ProgressStatus.Pending)
|
||||
this._status = ProgressStatus.InProgress;
|
||||
else if (this.Status != ProgressStatus.InProgress)
|
||||
return;
|
||||
|
||||
if (value.HasValue)
|
||||
{
|
||||
if (_progress + value.Value >= MaxProgress) return;
|
||||
this.Report(_progress + value.Value);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_progress + StepProgress >= MaxProgress) return;
|
||||
this.Report(_progress + StepProgress);
|
||||
}
|
||||
}
|
||||
|
||||
public void Finish()
|
||||
{
|
||||
this._status = ProgressStatus.Completed;
|
||||
this.ForceReport(MaxProgress);
|
||||
}
|
||||
|
||||
public void Cancel()
|
||||
{
|
||||
this._status = ProgressStatus.Canceled;
|
||||
this._errorMessage = "User Cancelled";
|
||||
this.ForceReport(_progress);
|
||||
}
|
||||
|
||||
public void Error(string message)
|
||||
{
|
||||
this._status = ProgressStatus.Failed;
|
||||
this._errorMessage = message;
|
||||
this.ForceReport(_progress);
|
||||
}
|
||||
|
||||
public ProgressReporter CreateChild(int proportion, int expectedSteps = 100)
|
||||
{
|
||||
var child = new ProgressReporter(proportion, expectedSteps);
|
||||
child.Parent = this;
|
||||
this.Child = child;
|
||||
return child;
|
||||
}
|
||||
}
|
||||
|
||||
public class ProgressTrackerService : BackgroundService
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
private readonly ConcurrentDictionary<string, TaskProgressInfo> _taskMap = new();
|
||||
private readonly IHubContext<ProgressHub, IProgressReceiver> _hubContext;
|
||||
|
||||
private class TaskProgressInfo
|
||||
{
|
||||
public ProgressReporter? Reporter { get; set; }
|
||||
public string? ConnectionId { get; set; }
|
||||
public required CancellationToken CancellationToken { get; set; }
|
||||
public required CancellationTokenSource CancellationTokenSource { get; set; }
|
||||
public required DateTime UpdatedAt { get; set; }
|
||||
}
|
||||
|
||||
public ProgressTrackerService(IHubContext<ProgressHub, IProgressReceiver> hubContext)
|
||||
{
|
||||
_hubContext = hubContext;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
foreach (var kvp in _taskMap)
|
||||
{
|
||||
var info = kvp.Value;
|
||||
// 超过 1 分钟且任务已完成/失败/取消
|
||||
if (
|
||||
(now - info.UpdatedAt).TotalMinutes > 1 &&
|
||||
info.Reporter != null &&
|
||||
(
|
||||
info.Reporter.Status == ProgressStatus.Completed ||
|
||||
info.Reporter.Status == ProgressStatus.Failed ||
|
||||
info.Reporter.Status == ProgressStatus.Canceled
|
||||
)
|
||||
)
|
||||
{
|
||||
_taskMap.TryRemove(kvp.Key, out _);
|
||||
logger.Info($"Cleaned up task {kvp.Key}");
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Error(ex, "Error during ProgressTracker cleanup");
|
||||
}
|
||||
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
|
||||
}
|
||||
}
|
||||
|
||||
public (string, ProgressReporter) CreateTask(CancellationToken? cancellationToken = null)
|
||||
{
|
||||
CancellationTokenSource? cancellationTokenSource;
|
||||
if (cancellationToken.HasValue)
|
||||
{
|
||||
cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken.Value);
|
||||
}
|
||||
else
|
||||
{
|
||||
cancellationTokenSource = new CancellationTokenSource();
|
||||
}
|
||||
|
||||
var progressInfo = new TaskProgressInfo
|
||||
{
|
||||
ConnectionId = null,
|
||||
UpdatedAt = DateTime.UtcNow,
|
||||
CancellationToken = cancellationTokenSource.Token,
|
||||
CancellationTokenSource = cancellationTokenSource,
|
||||
};
|
||||
|
||||
var progress = new ProgressReporter(async value =>
|
||||
{
|
||||
cancellationTokenSource.Token.ThrowIfCancellationRequested();
|
||||
|
||||
// 通过 SignalR 推送进度
|
||||
if (progressInfo.ConnectionId != null && progressInfo.Reporter != null)
|
||||
await _hubContext.Clients.Client(progressInfo.ConnectionId).OnReceiveProgress(progressInfo.Reporter);
|
||||
});
|
||||
|
||||
progressInfo.Reporter = progress;
|
||||
|
||||
_taskMap.TryAdd(progressInfo.Reporter.TaskId, progressInfo);
|
||||
|
||||
return (progressInfo.Reporter.TaskId, progress);
|
||||
}
|
||||
|
||||
public Optional<ProgressReporter> GetReporter(string taskId)
|
||||
{
|
||||
if (_taskMap.TryGetValue(taskId, out var info))
|
||||
{
|
||||
return info.Reporter;
|
||||
}
|
||||
return Optional<ProgressReporter>.None;
|
||||
}
|
||||
|
||||
public Optional<ProgressStatus> GetProgressStatus(string taskId)
|
||||
{
|
||||
if (_taskMap.TryGetValue(taskId, out var info))
|
||||
{
|
||||
if (info.Reporter != null)
|
||||
return info.Reporter.Status;
|
||||
}
|
||||
return Optional<ProgressStatus>.None;
|
||||
}
|
||||
|
||||
public bool BindTask(string taskId, string connectionId)
|
||||
{
|
||||
if (_taskMap.TryGetValue(taskId, out var info) && info != null)
|
||||
{
|
||||
lock (info)
|
||||
{
|
||||
info.ConnectionId = connectionId;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public bool CancelTask(string taskId)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_taskMap.TryGetValue(taskId, out var info) && info != null && info.Reporter != null)
|
||||
{
|
||||
lock (info)
|
||||
{
|
||||
info.CancellationTokenSource.Cancel();
|
||||
info.Reporter.Cancel();
|
||||
info.UpdatedAt = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Error(ex, $"Failed to cancel task {taskId}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,11 +8,11 @@ using server.Services;
|
||||
/// <summary>
|
||||
/// UDP客户端发送池
|
||||
/// </summary>
|
||||
public class UDPClientPool
|
||||
public sealed class UDPClientPool
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
|
||||
private static IPAddress localhost = IPAddress.Parse("127.0.0.1");
|
||||
private static ProgressTracker _progressTracker = MsgBus.ProgressTracker;
|
||||
|
||||
/// <summary>
|
||||
/// 发送字符串
|
||||
@@ -183,40 +183,6 @@ public class UDPClientPool
|
||||
return await Task.Run(() => { return SendDataPack(endPoint, pkg); });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 发送字符串到本地
|
||||
/// </summary>
|
||||
/// <param name="port">端口</param>
|
||||
/// <param name="stringArray">字符串数组</param>
|
||||
/// <returns>是否成功</returns>
|
||||
public static bool SendStringLocalHost(int port, string[] stringArray)
|
||||
{
|
||||
return SendString(new IPEndPoint(localhost, port), stringArray);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 循环发送字符串到本地
|
||||
/// </summary>
|
||||
/// <param name="times">发送总次数</param>
|
||||
/// <param name="sleepMilliSeconds">间隔时间</param>
|
||||
/// <param name="port">端口</param>
|
||||
/// <param name="stringArray">字符串数组</param>
|
||||
/// <returns>是否成功</returns>
|
||||
public static bool CycleSendStringLocalHost(int times, int sleepMilliSeconds, int port, string[] stringArray)
|
||||
{
|
||||
var isSuccessful = true;
|
||||
|
||||
while (times-- >= 0)
|
||||
{
|
||||
isSuccessful = SendStringLocalHost(port, stringArray);
|
||||
if (!isSuccessful) break;
|
||||
|
||||
Thread.Sleep(sleepMilliSeconds);
|
||||
}
|
||||
|
||||
return isSuccessful;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 读取设备地址数据
|
||||
/// </summary>
|
||||
@@ -607,11 +573,11 @@ public class UDPClientPool
|
||||
/// <param name="devAddr">设备地址</param>
|
||||
/// <param name="data">要写入的32位数据</param>
|
||||
/// <param name="timeout">超时时间(毫秒)</param>
|
||||
/// <param name="progress">进度报告器</param>
|
||||
/// <param name="progressId">进度报告器</param>
|
||||
/// <returns>写入结果,true表示写入成功</returns>
|
||||
public static async ValueTask<Result<bool>> WriteAddr(
|
||||
IPEndPoint endPoint, int taskID, UInt32 devAddr,
|
||||
UInt32 data, int timeout = 1000, ProgressReporter? progress = null)
|
||||
UInt32 data, int timeout = 1000, string progressId = "")
|
||||
{
|
||||
var ret = false;
|
||||
var opts = new SendAddrPackOptions()
|
||||
@@ -622,17 +588,18 @@ public class UDPClientPool
|
||||
Address = devAddr,
|
||||
IsWrite = true,
|
||||
};
|
||||
progress?.Report(20);
|
||||
_progressTracker.AdvanceProgress(progressId, 10);
|
||||
|
||||
// Write Register
|
||||
ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts));
|
||||
if (!ret) return new(new Exception("Send 1st address package failed!"));
|
||||
progress?.Report(40);
|
||||
_progressTracker.AdvanceProgress(progressId, 10);
|
||||
|
||||
// Send Data Package
|
||||
ret = await UDPClientPool.SendDataPackAsync(endPoint,
|
||||
new SendDataPackage(Common.Number.NumberToBytes(data, 4).Value));
|
||||
if (!ret) return new(new Exception("Send data package failed!"));
|
||||
progress?.Report(60);
|
||||
_progressTracker.AdvanceProgress(progressId, 10);
|
||||
|
||||
// Check Msg Bus
|
||||
if (!MsgBus.IsRunning)
|
||||
@@ -642,7 +609,7 @@ public class UDPClientPool
|
||||
var udpWriteAck = await MsgBus.UDPServer.WaitForAckAsync(
|
||||
endPoint.Address.ToString(), taskID, endPoint.Port, timeout);
|
||||
if (!udpWriteAck.IsSuccessful) return new(udpWriteAck.Error);
|
||||
progress?.Finish();
|
||||
_progressTracker.AdvanceProgress(progressId, 10);
|
||||
|
||||
return udpWriteAck.Value.IsSuccessful;
|
||||
}
|
||||
@@ -655,11 +622,11 @@ public class UDPClientPool
|
||||
/// <param name="devAddr">设备地址</param>
|
||||
/// <param name="dataArray">要写入的字节数组</param>
|
||||
/// <param name="timeout">超时时间(毫秒)</param>
|
||||
/// <param name="progress">进度报告器</param>
|
||||
/// <param name="progressId">进度报告器</param>
|
||||
/// <returns>写入结果,true表示写入成功</returns>
|
||||
public static async ValueTask<Result<bool>> WriteAddr(
|
||||
IPEndPoint endPoint, int taskID, UInt32 devAddr,
|
||||
byte[] dataArray, int timeout = 1000, ProgressReporter? progress = null)
|
||||
byte[] dataArray, int timeout = 1000, string progressId = "")
|
||||
{
|
||||
var ret = false;
|
||||
var opts = new SendAddrPackOptions()
|
||||
@@ -681,8 +648,6 @@ public class UDPClientPool
|
||||
var writeTimes = hasRest ?
|
||||
dataArray.Length / (max4BytesPerRead * (32 / 8)) + 1 :
|
||||
dataArray.Length / (max4BytesPerRead * (32 / 8));
|
||||
if (progress != null)
|
||||
progress.ExpectedSteps = writeTimes;
|
||||
for (var i = 0; i < writeTimes; i++)
|
||||
{
|
||||
// Sperate Data Array
|
||||
@@ -712,10 +677,9 @@ public class UDPClientPool
|
||||
if (!udpWriteAck.Value.IsSuccessful)
|
||||
return false;
|
||||
|
||||
progress?.Increase();
|
||||
_progressTracker.AdvanceProgress(progressId, 10);
|
||||
}
|
||||
|
||||
progress?.Finish();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user