From d551cbe793a6c56ea4c3e8232a1324fb92021cb2 Mon Sep 17 00:00:00 2001 From: SikongJueluo Date: Wed, 16 Jul 2025 20:25:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=E9=80=9A=E4=BF=A1?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/Peripherals/JtagClient.cs | 14 +-- server/src/UdpClientPool.cs | 57 +++++----- server/src/UdpServer.cs | 149 +++++++++++++++++++-------- server/src/WebProtocol.cs | 148 +++++++++++++------------- 4 files changed, 227 insertions(+), 141 deletions(-) diff --git a/server/src/Peripherals/JtagClient.cs b/server/src/Peripherals/JtagClient.cs index 69d807a..cae8873 100644 --- a/server/src/Peripherals/JtagClient.cs +++ b/server/src/Peripherals/JtagClient.cs @@ -406,15 +406,17 @@ public class Jtag async ValueTask> ReadFIFO(uint devAddr) { var ret = false; - var opts = new SendAddrPackOptions(); + var opts = new SendAddrPackOptions() + { + BurstType = BurstType.FixedBurst, + BurstLength = 0, + CommandID = 0, + Address = devAddr, + IsWrite = false, + }; - opts.BurstType = BurstType.FixedBurst; - opts.BurstLength = 0; - opts.CommandID = 0; - opts.Address = devAddr; // Read Jtag State Register - opts.IsWrite = false; ret = await UDPClientPool.SendAddrPackAsync(ep, new SendAddrPackage(opts)); if (!ret) return new(new Exception("Send Address Package Failed!")); diff --git a/server/src/UdpClientPool.cs b/server/src/UdpClientPool.cs index d7d00f3..98fef10 100644 --- a/server/src/UdpClientPool.cs +++ b/server/src/UdpClientPool.cs @@ -228,15 +228,17 @@ public class UDPClientPool IPEndPoint endPoint, int taskID, uint devAddr, int timeout = 1000) { var ret = false; - var opts = new SendAddrPackOptions(); + var opts = new SendAddrPackOptions() + { + BurstType = BurstType.FixedBurst, + BurstLength = 0, + CommandID = Convert.ToByte(taskID), + Address = devAddr, + IsWrite = false, + }; - opts.BurstType = BurstType.FixedBurst; - opts.BurstLength = 0; - opts.CommandID = Convert.ToByte(taskID); - opts.Address = devAddr; - // Read Jtag State Register - opts.IsWrite = false; + // Read Register ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts)); if (!ret) return new(new Exception("Send Address Package Failed!")); @@ -358,12 +360,16 @@ public class UDPClientPool IPEndPoint endPoint, int taskID, UInt32 devAddr, int dataLength, int timeout = 1000) { var ret = false; - var opts = new SendAddrPackOptions(); + var opts = new SendAddrPackOptions() + { + BurstLength = 0, + Address = 0, + BurstType = BurstType.FixedBurst, + CommandID = Convert.ToByte(taskID), + IsWrite = false, + }; var resultData = new List(); - opts.BurstType = BurstType.FixedBurst; - opts.CommandID = Convert.ToByte(taskID); - opts.IsWrite = false; // Check Msg Bus if (!MsgBus.IsRunning) @@ -544,15 +550,17 @@ public class UDPClientPool IPEndPoint endPoint, int taskID, UInt32 devAddr, UInt32 data, int timeout = 1000) { var ret = false; - var opts = new SendAddrPackOptions(); + var opts = new SendAddrPackOptions() + { + BurstType = BurstType.FixedBurst, + BurstLength = 0, + CommandID = Convert.ToByte(taskID), + Address = devAddr, + IsWrite = true, + }; - opts.BurstType = BurstType.FixedBurst; - opts.BurstLength = 0; - opts.CommandID = Convert.ToByte(taskID); - opts.Address = devAddr; - // Write Jtag State Register - opts.IsWrite = true; + // Write Register ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts)); if (!ret) return new(new Exception("Send 1st address package failed!")); // Send Data Package @@ -585,18 +593,21 @@ public class UDPClientPool IPEndPoint endPoint, int taskID, UInt32 devAddr, byte[] dataArray, int timeout = 1000) { var ret = false; - var opts = new SendAddrPackOptions(); + var opts = new SendAddrPackOptions() + { + BurstType = BurstType.FixedBurst, + CommandID = Convert.ToByte(taskID), + Address = devAddr, + BurstLength = 0, + IsWrite = true, + }; - opts.BurstType = BurstType.FixedBurst; - opts.CommandID = Convert.ToByte(taskID); - opts.Address = devAddr; // Check Msg Bus if (!MsgBus.IsRunning) return new(new Exception("Message bus not working!")); - opts.IsWrite = true; var hasRest = dataArray.Length % (256 * (32 / 8)) != 0; var writeTimes = hasRest ? dataArray.Length / (256 * (32 / 8)) + 1 : diff --git a/server/src/UdpServer.cs b/server/src/UdpServer.cs index 4acb939..28dd5f1 100644 --- a/server/src/UdpServer.cs +++ b/server/src/UdpServer.cs @@ -4,6 +4,7 @@ using System.Net.NetworkInformation; // 添加这个引用 using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Text; +using Common; using DotNext; using Newtonsoft.Json; using WebProtocol; @@ -16,6 +17,10 @@ public class UDPData /// public required DateTime DateTime { get; set; } /// + /// 数据包时间戳 + /// + public required UInt32 Timestamp { get; set; } + /// /// 发送来源的IP地址 /// public required string Address { get; set; } @@ -48,6 +53,7 @@ public class UDPData return new UDPData() { DateTime = this.DateTime, + Timestamp = this.Timestamp, Address = new string(this.Address), Port = this.Port, TaskID = this.TaskID, @@ -69,24 +75,26 @@ public class UDPData /// /// UDP 服务器 /// -public class UDPServer : IDisposable +public class UDPServer { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); - private ConcurrentDictionary> udpData = new ConcurrentDictionary>(); + private ConcurrentDictionary> udpData + = new ConcurrentDictionary>(); private readonly ReaderWriterLockSlim udpDataLock = new ReaderWriterLockSlim(); private int listenPort; private List listeners = new List(); + private List tasks = new List(); private IPEndPoint groupEP; - private bool isRunning = false; + private CancellationTokenSource? cancellationTokenSource; private bool disposed = false; /// /// 是否正在工作 /// - public bool IsRunning { get { return isRunning; } } + public bool IsRunning => cancellationTokenSource?.Token.IsCancellationRequested == false; /// UDP 服务器的错误代码 public enum ErrorCode @@ -415,27 +423,29 @@ public class UDPServer : IDisposable var remotePort = remoteEP.Port; var data = new UDPData() { + DateTime = time, + Timestamp = Number.BytesToUInt32(bytes[..4]).Value, Address = remoteAddress, Port = remotePort, TaskID = taskID, Data = bytes, - DateTime = time, HasRead = false, }; + var key = $"{remoteAddress}-{taskID}"; + udpDataLock.EnterWriteLock(); try { - var key = $"{remoteAddress}-{taskID}"; - var sortedList = udpData.GetOrAdd(key, _ => new SortedList()); + var sortedList = udpData.GetOrAdd(key, _ => new SortedList()); // 处理相同时间戳的情况,添加微小的时间差 - var uniqueTime = time; + var uniqueTime = data.Timestamp; while (sortedList.ContainsKey(uniqueTime)) { logger.Warn( $"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}."); - uniqueTime = uniqueTime.AddTicks(1); + uniqueTime += 1; } sortedList.Add(uniqueTime, data); @@ -491,7 +501,7 @@ public class UDPServer : IDisposable recvData = Encoding.ASCII.GetString(bytes, 0, bytes.Length); } - logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:"); + logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()} - {data.Timestamp}:"); logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}"); if (recvData.Length != 0) logger.Debug($" Decoded Data : {recvData}"); return $@" @@ -580,8 +590,7 @@ public class UDPServer : IDisposable return; } - // 异步执行ARP刷新,避免阻塞主线程 - Task.Run(() => ExecuteArpFlush(ipAddr)); + ExecuteArpFlush(ipAddr); } catch (Exception ex) { @@ -633,7 +642,7 @@ public class UDPServer : IDisposable /// 执行ARP刷新流程:先删除ARP条目,再用ping重新刷新 /// /// 目标IP地址 - private async void ExecuteArpFlush(string ipAddr) + private void ExecuteArpFlush(string ipAddr) { try { @@ -806,33 +815,57 @@ public class UDPServer : IDisposable /// None public void Start() { - this.isRunning = true; + if (cancellationTokenSource != null && !cancellationTokenSource.Token.IsCancellationRequested) + { + logger.Warn("UDP Server is already running"); + return; + } + + cancellationTokenSource = new CancellationTokenSource(); + var cancellationToken = cancellationTokenSource.Token; + try { foreach (var client in listeners) { - Task.Run(() => + tasks.Add(Task.Run(async () => { - while (this.isRunning) + while (!cancellationToken.IsCancellationRequested) { try { - var ep = new IPEndPoint(IPAddress.Any, listenPort); - var result = client.Receive(ref ep); - _ = ReceiveHandler(result, ep, DateTime.Now); + // 使用 CancellationToken 来取消接收操作 + var result = await client.ReceiveAsync(cancellationToken); + _ = ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now); + } + catch (OperationCanceledException) + { + logger.Debug("UDP receive operation was cancelled"); + break; + } + catch (ObjectDisposedException) + { + logger.Debug("UDP client was disposed"); + break; } catch (Exception ex) { - Console.WriteLine($"Error: {ex.Message}"); + if (!cancellationToken.IsCancellationRequested) + { + logger.Error($"Error in UDP receive: {ex.Message}"); + } } } - }); + }, cancellationToken)); } + + logger.Info("UDP Server started successfully"); } catch (Exception e) { - Console.WriteLine(e.ToString()); - this.isRunning = false; + logger.Error($"Failed to start UDP server: {e}"); + cancellationTokenSource?.Cancel(); + throw; } } @@ -842,39 +875,71 @@ public class UDPServer : IDisposable /// None public void Stop() { - foreach (var item in listeners) + if (cancellationTokenSource == null || cancellationTokenSource.Token.IsCancellationRequested) { - item.Close(); + logger.Warn("UDP Server is not running or already stopped"); + return; } - this.isRunning = false; + try + { + logger.Info("Stopping UDP Server..."); + + // 取消所有操作 + cancellationTokenSource.Cancel(); + + // 等待所有任务完成,设置超时时间 + var waitTasks = Task.WhenAll(tasks); + if (!waitTasks.Wait(TimeSpan.FromSeconds(5))) + { + logger.Warn("Some tasks did not complete within timeout period"); + } + + // 关闭所有UDP客户端 + foreach (var client in listeners) + { + try + { + client.Close(); + } + catch (Exception ex) + { + logger.Warn($"Error closing UDP client: {ex.Message}"); + } + } + + // 清理任务列表 + tasks.Clear(); + + logger.Info("UDP Server stopped successfully"); + } + catch (Exception ex) + { + logger.Error($"Error stopping UDP server: {ex.Message}"); + } + finally + { + cancellationTokenSource?.Dispose(); + cancellationTokenSource = null; + } } /// - /// 释放资源 + /// 实现IDisposable接口,确保资源正确释放 /// public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) { if (!disposed) { - if (disposing) + Stop(); + + foreach (var client in listeners) { - Stop(); - udpDataLock?.Dispose(); + client?.Dispose(); } + + udpDataLock?.Dispose(); disposed = true; } } - - ~UDPServer() - { - Dispose(false); - } } - diff --git a/server/src/WebProtocol.cs b/server/src/WebProtocol.cs index 9b1012e..c3aa4cc 100644 --- a/server/src/WebProtocol.cs +++ b/server/src/WebProtocol.cs @@ -1,3 +1,4 @@ +using Common; using DotNext; using Newtonsoft.Json; @@ -35,32 +36,32 @@ namespace WebProtocol /// 突发类型 /// /// 0 - public BurstType BurstType { get; set; } + public required BurstType BurstType { get; set; } /// /// 任务ID /// /// 1 - public byte CommandID { get; set; } + public required byte CommandID { get; set; } /// /// 标识写入还是读取 /// /// true - public bool IsWrite { get; set; } + public required bool IsWrite { get; set; } /// /// 突发长度:0是32bits,255是32bits x 256 /// /// 255 - public byte BurstLength { get; set; } + public required byte BurstLength { get; set; } /// /// 目标地址 /// /// 0 - public UInt32 Address { get; set; } + public required UInt32 Address { get; set; } /// /// 转换为Json格式字符串 @@ -84,23 +85,29 @@ namespace WebProtocol WriteResp }; + /// + /// 时间戳 + /// + /// 1234567 + public required UInt32 Timestamp { get; set; } + /// /// 数据包类型 /// /// 0 - public PackType Type { get; set; } + public required PackType Type { get; set; } /// /// Task ID /// /// 0 - public byte CommandID { get; set; } + public required byte CommandID { get; set; } /// /// Whether is succeed to finish command /// /// true - public bool IsSuccess { get; set; } + public required bool IsSuccess { get; set; } /// /// Return Data @@ -214,12 +221,14 @@ namespace WebProtocol /// 字符串 public override string ToString() { - var opts = new SendAddrPackOptions(); - opts.BurstType = (BurstType)(commandType >> 6); - opts.CommandID = Convert.ToByte((commandType >> 4) & 0b0011); - opts.IsWrite = Convert.ToBoolean(commandType & 0x01); - opts.BurstLength = burstLength; - opts.Address = address; + var opts = new SendAddrPackOptions() + { + BurstType = (BurstType)(commandType >> 6), + CommandID = Convert.ToByte((commandType >> 4) & 0b0011), + IsWrite = Convert.ToBoolean(commandType & 0x01), + BurstLength = burstLength, + Address = address, + }; return JsonConvert.SerializeObject(opts); } @@ -297,8 +306,9 @@ namespace WebProtocol } /// FPGA->Server 读响应包 - public struct RecvDataPackage + public class RecvDataPackage { + readonly UInt32 timestamp; readonly byte sign = (byte)PackSign.RecvData; readonly byte commandID; readonly byte resp; @@ -309,11 +319,13 @@ namespace WebProtocol /// FPGA->Server 读响应包 /// 构造函数 /// + /// 时间戳 /// 任务ID号 /// 读响应包响应 /// 数据 - public RecvDataPackage(byte commandID, byte resp, byte[] bodyData) + public RecvDataPackage(UInt32 timestamp, byte commandID, byte resp, byte[] bodyData) { + this.timestamp = timestamp; this.commandID = commandID; this.resp = resp; this.bodyData = bodyData; @@ -322,26 +334,13 @@ namespace WebProtocol _ = this._reserved; } - /// - /// FPGA->Server 读响应包 - /// 构造函数 - /// - /// 任务ID号 - /// 是否读取成功 - /// 数据 - public RecvDataPackage(byte commandID, bool isSuccess, byte[] bodyData) - { - this.commandID = commandID; - this.resp = Convert.ToByte(isSuccess); - this.bodyData = bodyData; - } - /// /// 通过接受包选项构建读响应包 /// /// 接收包(读响应包和写响应包)选项 public RecvDataPackage(RecvPackOptions opts) { + this.timestamp = opts.Timestamp; this.commandID = opts.CommandID; this.resp = Convert.ToByte(opts.IsSuccess ? 0b10 : 0b00); this.bodyData = opts.Data ?? (byte[])[0, 0, 0, 0]; @@ -354,11 +353,14 @@ namespace WebProtocol { get { - var opts = new RecvPackOptions(); - opts.Type = RecvPackOptions.PackType.ReadResp; - opts.CommandID = commandID; - opts.IsSuccess = Convert.ToBoolean((resp >> 1) == 0b01 ? false : true); - opts.Data = bodyData; + var opts = new RecvPackOptions() + { + Timestamp = this.timestamp, + Type = RecvPackOptions.PackType.ReadResp, + CommandID = this.commandID, + IsSuccess = Convert.ToBoolean((resp >> 1) == 0b01 ? false : true), + Data = this.bodyData, + }; return opts; } @@ -369,7 +371,7 @@ namespace WebProtocol /// public bool IsSuccessful { - get { return Convert.ToBoolean((resp >> 1) == 0b01 ? false : true); } + get { return Convert.ToBoolean((this.resp >> 1) == 0b01 ? false : true); } } /// @@ -379,12 +381,16 @@ namespace WebProtocol /// 读响应包 public static Result FromBytes(byte[] bytes) { - if (bytes[0] != (byte)PackSign.RecvData) + if (bytes[4] != (byte)PackSign.RecvData) return new(new ArgumentException( $"The sign of bytes is not RecvData Package, Sign: 0x{BitConverter.ToString([bytes[0]])}", nameof(bytes) )); - return new RecvDataPackage(bytes[1], bytes[2], bytes[4..]); + return new RecvDataPackage( + Number.BytesToUInt32(bytes[..4]).Value, + bytes[5], + bytes[6], + bytes[8..]); } /// @@ -394,13 +400,16 @@ namespace WebProtocol public byte[] ToBytes() { var bodyDataLen = bodyData.Length; - var arr = new byte[4 + bodyDataLen]; + var arr = new byte[8 + bodyDataLen]; - arr[0] = this.sign; - arr[1] = this.commandID; - arr[2] = this.resp; + Buffer.BlockCopy( + Number.UInt32ArrayToBytes([this.timestamp]).Value, 0, arr, 0, 4); + arr[4] = this.sign; + arr[5] = this.commandID; + arr[6] = this.resp; + arr[7] = this.resp; - Array.Copy(bodyData, 0, arr, 4, bodyDataLen); + Array.Copy(bodyData, 0, arr, 8, bodyDataLen); return arr; } @@ -408,8 +417,9 @@ namespace WebProtocol } /// 写响应包 - public struct RecvRespPackage + public class RecvRespPackage { + readonly UInt32 timestamp; readonly byte sign = (byte)PackSign.RecvResp; readonly byte commandID; readonly byte resp; @@ -418,10 +428,12 @@ namespace WebProtocol /// /// 构建写响应包 /// + /// 时间戳 /// 任务ID /// 写响应 - public RecvRespPackage(byte commandID, byte resp) + public RecvRespPackage(UInt32 timestamp, byte commandID, byte resp) { + this.timestamp = timestamp; this.commandID = commandID; this.resp = resp; @@ -429,23 +441,13 @@ namespace WebProtocol _ = this._reserved; } - /// - /// 构建写响应包 - /// - /// 任务ID - /// 是否写成功 - public RecvRespPackage(byte commandID, bool isSuccess) - { - this.commandID = commandID; - this.resp = Convert.ToByte(isSuccess); - } - /// /// 通过接受包选项构建写响应包 /// /// 接收包(读响应包和写响应包)选项 public RecvRespPackage(RecvPackOptions opts) { + this.timestamp = opts.Timestamp; this.commandID = opts.CommandID; this.resp = Convert.ToByte(opts.IsSuccess ? 0b10 : 0b00); } @@ -457,11 +459,14 @@ namespace WebProtocol { get { - var opts = new RecvPackOptions(); - opts.Type = RecvPackOptions.PackType.WriteResp; - opts.CommandID = commandID; - opts.IsSuccess = Convert.ToBoolean((resp >> 1) == 0b01 ? false : true); - opts.Data = null; + var opts = new RecvPackOptions() + { + Timestamp = this.timestamp, + Type = RecvPackOptions.PackType.WriteResp, + CommandID = commandID, + IsSuccess = Convert.ToBoolean((resp >> 1) == 0b01 ? false : true), + Data = null, + }; return opts; } @@ -472,7 +477,7 @@ namespace WebProtocol /// public bool IsSuccessful { - get { return Convert.ToBoolean((resp >> 1) == 0b01 ? false : true); } + get { return Convert.ToBoolean((this.resp >> 1) == 0b01 ? false : true); } } /// @@ -482,12 +487,13 @@ namespace WebProtocol /// 写响应包 public static Result FromBytes(byte[] bytes) { - if (bytes[0] != (byte)PackSign.RecvResp) + if (bytes[4] != (byte)PackSign.RecvResp) return new(new ArgumentException( - $"The sign of bytes is not RecvResp Package, Sign: 0x{BitConverter.ToString([bytes[0]])}", + $"The sign of bytes is not RecvResp Package, Sign: 0x{BitConverter.ToString([bytes[4]])}", nameof(bytes) )); - return new RecvRespPackage(bytes[1], bytes[2]); + var timestamp = Number.BytesToUInt32(bytes[..4]).Value; + return new RecvRespPackage(timestamp, bytes[5], bytes[6]); } /// @@ -496,11 +502,13 @@ namespace WebProtocol /// 字节数组 public byte[] ToBytes() { - var arr = new byte[4]; - - arr[0] = this.sign; - arr[1] = this.commandID; - arr[2] = this.resp; + var arr = new byte[8]; + Buffer.BlockCopy( + Number.UInt32ArrayToBytes([this.timestamp]).Value, 0, arr, 0, 4); + arr[4] = this.sign; + arr[5] = this.commandID; + arr[6] = this.resp; + arr[7] = this._reserved; return arr; }