feat: 添加下载进度条
This commit is contained in:
@@ -148,6 +148,10 @@ try
|
||||
builder.Services.AddSingleton<HttpHdmiVideoStreamService>();
|
||||
builder.Services.AddHostedService(provider => provider.GetRequiredService<HttpHdmiVideoStreamService>());
|
||||
|
||||
// 添加进度跟踪服务
|
||||
builder.Services.AddSingleton<ProgressTrackerService>();
|
||||
builder.Services.AddHostedService(provider => provider.GetRequiredService<ProgressTrackerService>());
|
||||
|
||||
// Application Settings
|
||||
var app = builder.Build();
|
||||
// Configure the HTTP request pipeline.
|
||||
@@ -217,7 +221,8 @@ try
|
||||
|
||||
// Router
|
||||
app.MapControllers();
|
||||
app.MapHub<server.Hubs.JtagHub.JtagHub>("hubs/JtagHub");
|
||||
app.MapHub<server.Hubs.JtagHub>("hubs/JtagHub");
|
||||
app.MapHub<server.Hubs.ProgressHub>("hubs/ProgressHub");
|
||||
|
||||
// Setup Program
|
||||
MsgBus.Init();
|
||||
|
@@ -348,4 +348,37 @@ public class Number
|
||||
}
|
||||
return dstBytes;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取数字的长度
|
||||
/// </summary>
|
||||
/// <param name="number">数字</param>
|
||||
/// <returns>数字的长度</returns>
|
||||
public static int GetLength(int number)
|
||||
{
|
||||
// 将整数转换为字符串
|
||||
string numberString = number.ToString();
|
||||
|
||||
// 返回字符串的长度
|
||||
return numberString.Length;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 计算整形的幂
|
||||
/// </summary>
|
||||
/// <param name="x">底数</param>
|
||||
/// <param name="pow">幂</param>
|
||||
/// <returns>计算结果</returns>
|
||||
public static int IntPow(int x, int pow)
|
||||
{
|
||||
int ret = 1;
|
||||
while (pow != 0)
|
||||
{
|
||||
if ((pow & 1) == 1)
|
||||
ret *= x;
|
||||
x *= x;
|
||||
pow >>= 1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@@ -2,6 +2,7 @@ using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Cors;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Database;
|
||||
using server.Services;
|
||||
|
||||
namespace server.Controllers;
|
||||
|
||||
@@ -15,6 +16,15 @@ public class JtagController : ControllerBase
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
|
||||
private readonly ProgressTrackerService _tracker;
|
||||
|
||||
private const string BITSTREAM_PATH = "bitstream/Jtag";
|
||||
|
||||
public JtagController(ProgressTrackerService tracker)
|
||||
{
|
||||
_tracker = tracker;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 控制器首页信息
|
||||
/// </summary>
|
||||
@@ -117,14 +127,14 @@ public class JtagController : ControllerBase
|
||||
/// <param name="address">JTAG 设备地址</param>
|
||||
/// <param name="port">JTAG 设备端口</param>
|
||||
/// <param name="bitstreamId">比特流ID</param>
|
||||
/// <returns>下载结果</returns>
|
||||
/// <returns>进度跟踪TaskID</returns>
|
||||
[HttpPost("DownloadBitstream")]
|
||||
[EnableCors("Users")]
|
||||
[ProducesResponseType(typeof(bool), StatusCodes.Status200OK)]
|
||||
[ProducesResponseType(typeof(string), StatusCodes.Status200OK)]
|
||||
[ProducesResponseType(typeof(string), StatusCodes.Status400BadRequest)]
|
||||
[ProducesResponseType(typeof(Exception), StatusCodes.Status500InternalServerError)]
|
||||
[ProducesResponseType(StatusCodes.Status401Unauthorized)]
|
||||
public async ValueTask<IResult> DownloadBitstream(string address, int port, int bitstreamId)
|
||||
public async ValueTask<IResult> DownloadBitstream(string address, int port, int bitstreamId, CancellationToken cancelToken)
|
||||
{
|
||||
logger.Info($"User {User.Identity?.Name} initiating bitstream download to device {address}:{port} using bitstream ID: {bitstreamId}");
|
||||
|
||||
@@ -176,55 +186,67 @@ public class JtagController : ControllerBase
|
||||
|
||||
logger.Info($"User {username} processing bitstream file of size: {fileBytes.Length} bytes");
|
||||
|
||||
// 定义缓冲区大小: 32KB
|
||||
byte[] buffer = new byte[32 * 1024];
|
||||
byte[] revBuffer = new byte[32 * 1024];
|
||||
long totalBytesProcessed = 0;
|
||||
// 定义进度跟踪
|
||||
var (taskId, progress) = _tracker.CreateTask(cancelToken);
|
||||
progress.Report(10);
|
||||
|
||||
// 使用内存流处理文件
|
||||
using (var inputStream = new MemoryStream(fileBytes))
|
||||
using (var outputStream = new MemoryStream())
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
int bytesRead;
|
||||
while ((bytesRead = await inputStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
|
||||
{
|
||||
// 反转 32bits
|
||||
var retBuffer = Common.Number.ReverseBytes(buffer, 4);
|
||||
if (!retBuffer.IsSuccessful)
|
||||
{
|
||||
logger.Error($"User {username} failed to reverse bytes: {retBuffer.Error}");
|
||||
return TypedResults.InternalServerError(retBuffer.Error);
|
||||
}
|
||||
revBuffer = retBuffer.Value;
|
||||
// 定义缓冲区大小: 32KB
|
||||
byte[] buffer = new byte[32 * 1024];
|
||||
byte[] revBuffer = new byte[32 * 1024];
|
||||
long totalBytesProcessed = 0;
|
||||
|
||||
for (int i = 0; i < revBuffer.Length; i++)
|
||||
// 使用内存流处理文件
|
||||
using (var inputStream = new MemoryStream(fileBytes))
|
||||
using (var outputStream = new MemoryStream())
|
||||
{
|
||||
int bytesRead;
|
||||
while ((bytesRead = await inputStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
|
||||
{
|
||||
revBuffer[i] = Common.Number.ReverseBits(revBuffer[i]);
|
||||
// 反转 32bits
|
||||
var retBuffer = Common.Number.ReverseBytes(buffer, 4);
|
||||
if (!retBuffer.IsSuccessful)
|
||||
{
|
||||
logger.Error($"User {username} failed to reverse bytes: {retBuffer.Error}");
|
||||
progress.Error($"User {username} failed to reverse bytes: {retBuffer.Error}");
|
||||
return;
|
||||
}
|
||||
revBuffer = retBuffer.Value;
|
||||
|
||||
for (int i = 0; i < revBuffer.Length; i++)
|
||||
{
|
||||
revBuffer[i] = Common.Number.ReverseBits(revBuffer[i]);
|
||||
}
|
||||
|
||||
await outputStream.WriteAsync(revBuffer, 0, bytesRead);
|
||||
totalBytesProcessed += bytesRead;
|
||||
}
|
||||
|
||||
await outputStream.WriteAsync(revBuffer, 0, bytesRead);
|
||||
totalBytesProcessed += bytesRead;
|
||||
}
|
||||
// 获取处理后的数据
|
||||
var processedBytes = outputStream.ToArray();
|
||||
logger.Info($"User {username} processed {totalBytesProcessed} bytes for device {address}");
|
||||
|
||||
// 获取处理后的数据
|
||||
var processedBytes = outputStream.ToArray();
|
||||
logger.Info($"User {username} processed {totalBytesProcessed} bytes for device {address}");
|
||||
progress.Report(20);
|
||||
|
||||
// 下载比特流
|
||||
var jtagCtrl = new Peripherals.JtagClient.Jtag(address, port);
|
||||
var ret = await jtagCtrl.DownloadBitstream(processedBytes);
|
||||
// 下载比特流
|
||||
var jtagCtrl = new Peripherals.JtagClient.Jtag(address, port);
|
||||
var ret = await jtagCtrl.DownloadBitstream(processedBytes);
|
||||
|
||||
if (ret.IsSuccessful)
|
||||
{
|
||||
logger.Info($"User {username} successfully downloaded bitstream '{bitstream.ResourceName}' to device {address}");
|
||||
return TypedResults.Ok(ret.Value);
|
||||
if (ret.IsSuccessful)
|
||||
{
|
||||
logger.Info($"User {username} successfully downloaded bitstream '{bitstream.ResourceName}' to device {address}");
|
||||
progress.Finish();
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.Error($"User {username} failed to download bitstream to device {address}: {ret.Error}");
|
||||
progress.Error($"User {username} failed to download bitstream to device {address}: {ret.Error}");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.Error($"User {username} failed to download bitstream to device {address}: {ret.Error}");
|
||||
return TypedResults.InternalServerError(ret.Error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return TypedResults.Ok(taskId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
@@ -1,6 +1,5 @@
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Cors;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using System.Security.Claims;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using DotNext;
|
||||
@@ -8,7 +7,7 @@ using System.Collections.Concurrent;
|
||||
using TypedSignalR.Client;
|
||||
using Tapper;
|
||||
|
||||
namespace server.Hubs.JtagHub;
|
||||
namespace server.Hubs;
|
||||
|
||||
[Hub]
|
||||
public interface IJtagHub
|
||||
|
61
server/src/Hubs/ProgressHub.cs
Normal file
61
server/src/Hubs/ProgressHub.cs
Normal file
@@ -0,0 +1,61 @@
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using System.Security.Claims;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.AspNetCore.Cors;
|
||||
using TypedSignalR.Client;
|
||||
using Tapper;
|
||||
using server.Services;
|
||||
|
||||
namespace server.Hubs;
|
||||
|
||||
[Hub]
|
||||
public interface IProgressHub
|
||||
{
|
||||
Task<bool> Join(string taskId);
|
||||
}
|
||||
|
||||
[Receiver]
|
||||
public interface IProgressReceiver
|
||||
{
|
||||
Task OnReceiveProgress(ProgressInfo message);
|
||||
}
|
||||
|
||||
[TranspilationSource]
|
||||
public enum ProgressStatus
|
||||
{
|
||||
Pending,
|
||||
InProgress,
|
||||
Completed,
|
||||
Canceled,
|
||||
Failed
|
||||
}
|
||||
|
||||
[TranspilationSource]
|
||||
public class ProgressInfo
|
||||
{
|
||||
public string TaskId { get; }
|
||||
public ProgressStatus Status { get; }
|
||||
public int ProgressPercent { get; }
|
||||
public string ErrorMessage { get; }
|
||||
};
|
||||
|
||||
[Authorize]
|
||||
[EnableCors("SignalR")]
|
||||
public class ProgressHub : Hub<IProgressReceiver>, IProgressHub
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
|
||||
private readonly IHubContext<ProgressHub, IProgressReceiver> _hubContext;
|
||||
private readonly ProgressTrackerService _tracker;
|
||||
|
||||
public ProgressHub(IHubContext<ProgressHub, IProgressReceiver> hubContext, ProgressTrackerService tracker)
|
||||
{
|
||||
_hubContext = hubContext;
|
||||
_tracker = tracker;
|
||||
}
|
||||
|
||||
public async Task<bool> Join(string taskId)
|
||||
{
|
||||
return _tracker.BindTask(taskId, Context.ConnectionId);
|
||||
}
|
||||
}
|
@@ -2,7 +2,7 @@ using System.Collections;
|
||||
using System.Net;
|
||||
using DotNext;
|
||||
using Newtonsoft.Json;
|
||||
using server;
|
||||
using server.Services;
|
||||
using WebProtocol;
|
||||
|
||||
namespace Peripherals.JtagClient;
|
||||
@@ -442,11 +442,12 @@ public class Jtag
|
||||
return Convert.ToUInt32(Common.Number.BytesToUInt32(retPackOpts.Data).Value);
|
||||
}
|
||||
|
||||
async ValueTask<Result<bool>> WriteFIFO
|
||||
(UInt32 devAddr, UInt32 data, UInt32 result, UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0)
|
||||
async ValueTask<Result<bool>> WriteFIFO(
|
||||
UInt32 devAddr, UInt32 data, UInt32 result,
|
||||
UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, ProgressReporter? progress = null)
|
||||
{
|
||||
{
|
||||
var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout);
|
||||
var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progress?.CreateChild(80));
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
if (!ret.Value) return new(new Exception("Write FIFO failed"));
|
||||
}
|
||||
@@ -457,15 +458,17 @@ public class Jtag
|
||||
{
|
||||
var ret = await UDPClientPool.ReadAddrWithWait(this.ep, 0, JtagAddr.STATE, result, resultMask, 0, this.timeout);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
progress?.Finish();
|
||||
return ret.Value;
|
||||
}
|
||||
}
|
||||
|
||||
async ValueTask<Result<bool>> WriteFIFO
|
||||
(UInt32 devAddr, byte[] data, UInt32 result, UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0)
|
||||
async ValueTask<Result<bool>> WriteFIFO(
|
||||
UInt32 devAddr, byte[] data, UInt32 result,
|
||||
UInt32 resultMask = 0xFF_FF_FF_FF, UInt32 delayMilliseconds = 0, ProgressReporter? progress = null)
|
||||
{
|
||||
{
|
||||
var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout);
|
||||
var ret = await UDPClientPool.WriteAddr(this.ep, 0, devAddr, data, this.timeout, progress?.CreateChild(80));
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
if (!ret.Value) return new(new Exception("Write FIFO failed"));
|
||||
}
|
||||
@@ -476,6 +479,7 @@ public class Jtag
|
||||
{
|
||||
var ret = await UDPClientPool.ReadAddrWithWait(this.ep, 0, JtagAddr.STATE, result, resultMask, 0, this.timeout);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
progress?.Finish();
|
||||
return ret.Value;
|
||||
}
|
||||
}
|
||||
@@ -559,7 +563,8 @@ public class Jtag
|
||||
return await ClearWriteDataReg();
|
||||
}
|
||||
|
||||
async ValueTask<Result<bool>> LoadDRCareInput(byte[] bytesArray, UInt32 timeout = 10_000, UInt32 cycle = 500)
|
||||
async ValueTask<Result<bool>> LoadDRCareInput(
|
||||
byte[] bytesArray, UInt32 timeout = 10_000, UInt32 cycle = 500, ProgressReporter? progress = null)
|
||||
{
|
||||
var bytesLen = ((uint)(bytesArray.Length * 8));
|
||||
if (bytesLen > Math.Pow(2, 28)) return new(new Exception("Length is over 2^(28 - 3)"));
|
||||
@@ -574,11 +579,15 @@ public class Jtag
|
||||
else if (!ret.Value) return new(new Exception("Write CMD_JTAG_LOAD_DR_CAREI Failed"));
|
||||
}
|
||||
|
||||
progress?.Report(10);
|
||||
|
||||
{
|
||||
var ret = await WriteFIFO(
|
||||
JtagAddr.WRITE_DATA,
|
||||
bytesArray, 0x01_00_00_00,
|
||||
JtagState.CMD_EXEC_FINISH);
|
||||
JtagState.CMD_EXEC_FINISH,
|
||||
progress: progress?.CreateChild(90)
|
||||
);
|
||||
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
return ret.Value;
|
||||
@@ -701,44 +710,55 @@ public class Jtag
|
||||
/// </summary>
|
||||
/// <param name="bitstream">比特流数据</param>
|
||||
/// <returns>指示下载是否成功的异步结果</returns>
|
||||
public async ValueTask<Result<bool>> DownloadBitstream(byte[] bitstream)
|
||||
public async ValueTask<Result<bool>> DownloadBitstream(byte[] bitstream, ProgressReporter? progress = null)
|
||||
{
|
||||
// Clear Data
|
||||
MsgBus.UDPServer.ClearUDPData(this.address, 0);
|
||||
|
||||
logger.Trace($"Clear up udp server {this.address,0} receive data");
|
||||
if (progress != null)
|
||||
{
|
||||
progress.ExpectedSteps = 25;
|
||||
progress.Increase();
|
||||
}
|
||||
|
||||
Result<bool> ret;
|
||||
|
||||
ret = await CloseTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Close Test Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
ret = await RunTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Run Test Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
logger.Trace("Jtag initialize");
|
||||
|
||||
ret = await ExecRDCmd(JtagCmd.JTAG_DR_JRST);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_JRST Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
ret = await RunTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Run Test Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
ret = await ExecRDCmd(JtagCmd.JTAG_DR_CFGI);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_CFGI Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
logger.Trace("Jtag ready to write bitstream");
|
||||
|
||||
ret = await IdleDelay(100000);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag IDLE Delay Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
ret = await LoadDRCareInput(bitstream);
|
||||
ret = await LoadDRCareInput(bitstream, progress: progress?.CreateChild(50));
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Load Data Failed"));
|
||||
|
||||
@@ -747,32 +767,40 @@ public class Jtag
|
||||
ret = await CloseTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Close Test Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
ret = await RunTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Run Test Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
ret = await ExecRDCmd(JtagCmd.JTAG_DR_JWAKEUP);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Execute Command JTAG_DR_JWAKEUP Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
logger.Trace("Jtag reset device");
|
||||
|
||||
ret = await IdleDelay(10000);
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag IDLE Delay Failed"));
|
||||
progress?.Increase();
|
||||
|
||||
var retCode = await ReadStatusReg();
|
||||
if (!retCode.IsSuccessful) return new(retCode.Error);
|
||||
var jtagStatus = new JtagStatusReg(retCode.Value);
|
||||
if (!(jtagStatus.done && jtagStatus.wakeup_over && jtagStatus.init_complete))
|
||||
return new(new Exception("Jtag download bitstream failed"));
|
||||
progress?.Increase();
|
||||
|
||||
ret = await CloseTest();
|
||||
if (!ret.IsSuccessful) return new(ret.Error);
|
||||
else if (!ret.Value) return new(new Exception("Jtag Close Test Failed"));
|
||||
logger.Trace("Jtag download bitstream successfully");
|
||||
progress?.Increase();
|
||||
|
||||
// Finish
|
||||
progress?.Finish();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
288
server/src/Services/ProgressTrackerService.cs
Normal file
288
server/src/Services/ProgressTrackerService.cs
Normal file
@@ -0,0 +1,288 @@
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using System.Collections.Concurrent;
|
||||
using DotNext;
|
||||
using Common;
|
||||
using server.Hubs;
|
||||
|
||||
namespace server.Services;
|
||||
|
||||
public class ProgressReporter : ProgressInfo, IProgress<int>
|
||||
{
|
||||
private int _progress = 0;
|
||||
private int _stepProgress = 1;
|
||||
private int _expectedSteps = 100;
|
||||
private int _parentProportion = 100;
|
||||
|
||||
public int Progress => _progress;
|
||||
public int MaxProgress { get; set; } = 100;
|
||||
public int StepProgress
|
||||
{
|
||||
get => _stepProgress;
|
||||
set
|
||||
{
|
||||
_stepProgress = value;
|
||||
ExpectedSteps = MaxProgress / value;
|
||||
}
|
||||
}
|
||||
public int ExpectedSteps
|
||||
{
|
||||
get => _expectedSteps;
|
||||
set
|
||||
{
|
||||
_expectedSteps = value;
|
||||
MaxProgress = Number.IntPow(10, Number.GetLength(value));
|
||||
StepProgress = MaxProgress / value;
|
||||
}
|
||||
}
|
||||
public Func<int, Task>? ReporterFunc { get; set; } = null;
|
||||
public ProgressReporter? Parent { get; set; }
|
||||
public ProgressReporter? Child { get; set; }
|
||||
|
||||
private ProgressStatus _status = ProgressStatus.Pending;
|
||||
private string _errorMessage;
|
||||
|
||||
public string TaskId { get; set; } = new Guid().ToString();
|
||||
public int ProgressPercent => _progress * 100 / MaxProgress;
|
||||
public ProgressStatus Status => _status;
|
||||
public string ErrorMessage => _errorMessage;
|
||||
|
||||
public ProgressReporter(Func<int, Task>? reporter = null, int initProgress = 0, int maxProgress = 100, int step = 1)
|
||||
{
|
||||
_progress = initProgress;
|
||||
MaxProgress = maxProgress;
|
||||
StepProgress = step;
|
||||
ReporterFunc = reporter;
|
||||
}
|
||||
|
||||
public ProgressReporter(int parentProportion, int expectedSteps = 100, Func<int, Task>? reporter = null)
|
||||
{
|
||||
this._parentProportion = parentProportion;
|
||||
MaxProgress = Number.IntPow(10, Number.GetLength(expectedSteps));
|
||||
StepProgress = MaxProgress / expectedSteps;
|
||||
ReporterFunc = reporter;
|
||||
}
|
||||
|
||||
private async void ForceReport(int value)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (ReporterFunc != null)
|
||||
await ReporterFunc(value);
|
||||
|
||||
if (Parent != null)
|
||||
Parent.Increase((value - _progress) / StepProgress * _parentProportion / (MaxProgress / StepProgress));
|
||||
|
||||
_progress = value;
|
||||
}
|
||||
catch (OperationCanceledException ex)
|
||||
{
|
||||
_errorMessage = ex.Message;
|
||||
this._status = ProgressStatus.Canceled;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_errorMessage = ex.Message;
|
||||
this._status = ProgressStatus.Failed;
|
||||
}
|
||||
}
|
||||
|
||||
public async void Report(int value)
|
||||
{
|
||||
if (this._status == ProgressStatus.Pending)
|
||||
this._status = ProgressStatus.InProgress;
|
||||
else if (this.Status != ProgressStatus.InProgress)
|
||||
return;
|
||||
|
||||
if (value > MaxProgress) return;
|
||||
ForceReport(value);
|
||||
}
|
||||
|
||||
public void Increase(int? value = null)
|
||||
{
|
||||
if (this._status == ProgressStatus.Pending)
|
||||
this._status = ProgressStatus.InProgress;
|
||||
else if (this.Status != ProgressStatus.InProgress)
|
||||
return;
|
||||
|
||||
if (value.HasValue)
|
||||
{
|
||||
if (_progress + value.Value >= MaxProgress) return;
|
||||
this.Report(_progress + value.Value);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_progress + StepProgress >= MaxProgress) return;
|
||||
this.Report(_progress + StepProgress);
|
||||
}
|
||||
}
|
||||
|
||||
public void Finish()
|
||||
{
|
||||
this._status = ProgressStatus.Completed;
|
||||
this.ForceReport(MaxProgress);
|
||||
}
|
||||
|
||||
public void Cancel()
|
||||
{
|
||||
this._status = ProgressStatus.Canceled;
|
||||
this._errorMessage = "User Cancelled";
|
||||
this.ForceReport(_progress);
|
||||
}
|
||||
|
||||
public void Error(string message)
|
||||
{
|
||||
this._status = ProgressStatus.Failed;
|
||||
this._errorMessage = message;
|
||||
this.ForceReport(_progress);
|
||||
}
|
||||
|
||||
public ProgressReporter CreateChild(int proportion, int expectedSteps = 100)
|
||||
{
|
||||
var child = new ProgressReporter(proportion, expectedSteps);
|
||||
child.Parent = this;
|
||||
this.Child = child;
|
||||
return child;
|
||||
}
|
||||
}
|
||||
|
||||
public class ProgressTrackerService : BackgroundService
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
private readonly ConcurrentDictionary<string, TaskProgressInfo> _taskMap = new();
|
||||
private readonly IHubContext<ProgressHub, IProgressReceiver> _hubContext;
|
||||
|
||||
private class TaskProgressInfo
|
||||
{
|
||||
public ProgressReporter Reporter { get; set; }
|
||||
public string? ConnectionId { get; set; }
|
||||
public required CancellationToken CancellationToken { get; set; }
|
||||
public required CancellationTokenSource CancellationTokenSource { get; set; }
|
||||
public required DateTime UpdatedAt { get; set; }
|
||||
}
|
||||
|
||||
public ProgressTrackerService(IHubContext<ProgressHub, IProgressReceiver> hubContext)
|
||||
{
|
||||
_hubContext = hubContext;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
foreach (var kvp in _taskMap)
|
||||
{
|
||||
var info = kvp.Value;
|
||||
// 超过 1 分钟且任务已完成/失败/取消
|
||||
if ((now - info.UpdatedAt).TotalMinutes > 1 &&
|
||||
(info.Reporter.Status == ProgressStatus.Completed ||
|
||||
info.Reporter.Status == ProgressStatus.Failed ||
|
||||
info.Reporter.Status == ProgressStatus.Canceled))
|
||||
{
|
||||
_taskMap.TryRemove(kvp.Key, out _);
|
||||
logger.Info($"Cleaned up task {kvp.Key}");
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Error(ex, "Error during ProgressTracker cleanup");
|
||||
}
|
||||
await Task.Delay(TimeSpan.FromSeconds(30));
|
||||
}
|
||||
}
|
||||
|
||||
public (string, ProgressReporter) CreateTask(CancellationToken? cancellationToken = null)
|
||||
{
|
||||
CancellationTokenSource? cancellationTokenSource;
|
||||
if (cancellationToken.HasValue)
|
||||
{
|
||||
cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken.Value);
|
||||
}
|
||||
else
|
||||
{
|
||||
cancellationTokenSource = new CancellationTokenSource();
|
||||
}
|
||||
|
||||
var progressInfo = new TaskProgressInfo
|
||||
{
|
||||
ConnectionId = null,
|
||||
UpdatedAt = DateTime.UtcNow,
|
||||
CancellationToken = cancellationTokenSource.Token,
|
||||
CancellationTokenSource = cancellationTokenSource,
|
||||
};
|
||||
|
||||
var progress = new ProgressReporter(async value =>
|
||||
{
|
||||
cancellationTokenSource.Token.ThrowIfCancellationRequested();
|
||||
|
||||
// 通过 SignalR 推送进度
|
||||
if (progressInfo.ConnectionId != null)
|
||||
await _hubContext.Clients.Client(progressInfo.ConnectionId).OnReceiveProgress(progressInfo.Reporter);
|
||||
});
|
||||
|
||||
progressInfo.Reporter = progress;
|
||||
|
||||
_taskMap.TryAdd(progressInfo.Reporter.TaskId, progressInfo);
|
||||
|
||||
return (progressInfo.Reporter.TaskId, progress);
|
||||
}
|
||||
|
||||
public Optional<ProgressReporter> GetReporter(string taskId)
|
||||
{
|
||||
if (_taskMap.TryGetValue(taskId, out var info))
|
||||
{
|
||||
return info.Reporter;
|
||||
}
|
||||
return Optional<ProgressReporter>.None;
|
||||
}
|
||||
|
||||
public Optional<ProgressStatus> GetProgressStatus(string taskId)
|
||||
{
|
||||
if (_taskMap.TryGetValue(taskId, out var info))
|
||||
{
|
||||
return info.Reporter.Status;
|
||||
}
|
||||
return Optional<ProgressStatus>.None;
|
||||
}
|
||||
|
||||
public bool BindTask(string taskId, string connectionId)
|
||||
{
|
||||
if (_taskMap.TryGetValue(taskId, out var info) && info != null)
|
||||
{
|
||||
lock (info)
|
||||
{
|
||||
info.ConnectionId = connectionId;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public bool CancelTask(string taskId)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_taskMap.TryGetValue(taskId, out var info) && info != null)
|
||||
{
|
||||
lock (info)
|
||||
{
|
||||
info.CancellationTokenSource.Cancel();
|
||||
info.Reporter.Cancel();
|
||||
info.UpdatedAt = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Error(ex, $"Failed to cancel task {taskId}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@@ -3,6 +3,7 @@ using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using DotNext;
|
||||
using WebProtocol;
|
||||
using server.Services;
|
||||
|
||||
/// <summary>
|
||||
/// UDP客户端发送池
|
||||
@@ -465,7 +466,7 @@ public class UDPClientPool
|
||||
CommandID = Convert.ToByte(taskID),
|
||||
IsWrite = false,
|
||||
BurstLength = (byte)(currentSegmentSize - 1),
|
||||
Address = (burstType == BurstType.ExtendBurst)?(devAddr + (uint)(i * max4BytesPerRead)):(devAddr),
|
||||
Address = (burstType == BurstType.ExtendBurst) ? (devAddr + (uint)(i * max4BytesPerRead)) : (devAddr),
|
||||
// Address = devAddr + (uint)(i * max4BytesPerRead),
|
||||
};
|
||||
pkgList.Add(new SendAddrPackage(opts));
|
||||
@@ -586,7 +587,8 @@ public class UDPClientPool
|
||||
/// <param name="timeout">超时时间(毫秒)</param>
|
||||
/// <returns>写入结果,true表示写入成功</returns>
|
||||
public static async ValueTask<Result<bool>> WriteAddr(
|
||||
IPEndPoint endPoint, int taskID, UInt32 devAddr, UInt32 data, int timeout = 1000)
|
||||
IPEndPoint endPoint, int taskID, UInt32 devAddr,
|
||||
UInt32 data, int timeout = 1000, ProgressReporter? progress = null)
|
||||
{
|
||||
var ret = false;
|
||||
var opts = new SendAddrPackOptions()
|
||||
@@ -597,14 +599,17 @@ public class UDPClientPool
|
||||
Address = devAddr,
|
||||
IsWrite = true,
|
||||
};
|
||||
progress?.Report(20);
|
||||
|
||||
// Write Register
|
||||
ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts));
|
||||
if (!ret) return new(new Exception("Send 1st address package failed!"));
|
||||
progress?.Report(40);
|
||||
// Send Data Package
|
||||
ret = await UDPClientPool.SendDataPackAsync(endPoint,
|
||||
new SendDataPackage(Common.Number.NumberToBytes(data, 4).Value));
|
||||
if (!ret) return new(new Exception("Send data package failed!"));
|
||||
progress?.Report(60);
|
||||
|
||||
// Check Msg Bus
|
||||
if (!MsgBus.IsRunning)
|
||||
@@ -614,6 +619,7 @@ public class UDPClientPool
|
||||
var udpWriteAck = await MsgBus.UDPServer.WaitForAckAsync(
|
||||
endPoint.Address.ToString(), taskID, endPoint.Port, timeout);
|
||||
if (!udpWriteAck.IsSuccessful) return new(udpWriteAck.Error);
|
||||
progress?.Finish();
|
||||
|
||||
return udpWriteAck.Value.IsSuccessful;
|
||||
}
|
||||
@@ -628,7 +634,8 @@ public class UDPClientPool
|
||||
/// <param name="timeout">超时时间(毫秒)</param>
|
||||
/// <returns>写入结果,true表示写入成功</returns>
|
||||
public static async ValueTask<Result<bool>> WriteAddr(
|
||||
IPEndPoint endPoint, int taskID, UInt32 devAddr, byte[] dataArray, int timeout = 1000)
|
||||
IPEndPoint endPoint, int taskID, UInt32 devAddr,
|
||||
byte[] dataArray, int timeout = 1000, ProgressReporter? progress = null)
|
||||
{
|
||||
var ret = false;
|
||||
var opts = new SendAddrPackOptions()
|
||||
@@ -650,6 +657,8 @@ public class UDPClientPool
|
||||
var writeTimes = hasRest ?
|
||||
dataArray.Length / (max4BytesPerRead * (32 / 8)) + 1 :
|
||||
dataArray.Length / (max4BytesPerRead * (32 / 8));
|
||||
if (progress != null)
|
||||
progress.ExpectedSteps = writeTimes;
|
||||
for (var i = 0; i < writeTimes; i++)
|
||||
{
|
||||
// Sperate Data Array
|
||||
@@ -678,8 +687,11 @@ public class UDPClientPool
|
||||
|
||||
if (!udpWriteAck.Value.IsSuccessful)
|
||||
return false;
|
||||
|
||||
progress?.Increase();
|
||||
}
|
||||
|
||||
progress?.Finish();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user