using System.Collections.Concurrent; using System.Net; using System.Net.NetworkInformation; // 添加这个引用 using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Text; using Common; using DotNext; using DotNext.Threading; using Newtonsoft.Json; using WebProtocol; /// UDP接受数据包格式 public class UDPData { /// /// 接受到的时间 /// public required DateTime DateTime { get; set; } /// /// 数据包时间戳 /// public required UInt32 Timestamp { get; set; } /// /// 发送来源的IP地址 /// public required string Address { get; set; } /// /// 发送来源的端口号 /// public required int Port { get; set; } /// /// 任务ID /// public required int TaskID { get; set; } /// /// 接受到的数据 /// public required byte[] Data { get; set; } /// /// 是否被读取过 /// public required bool HasRead { get; set; } /// /// 深度拷贝对象 /// /// UDPData public UDPData DeepClone() { var cloneData = new byte[this.Data.Length]; Buffer.BlockCopy(this.Data, 0, cloneData, 0, this.Data.Length); return new UDPData() { DateTime = this.DateTime, Timestamp = this.Timestamp, Address = new string(this.Address), Port = this.Port, TaskID = this.TaskID, Data = cloneData, HasRead = this.HasRead }; } /// /// 将UDP Data 转化为Json 格式字符串 /// /// json字符串 public override string ToString() { return JsonConvert.SerializeObject(this); } } /// /// UDP 服务器 /// public class UDPServer { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); private ConcurrentDictionary> udpData = new ConcurrentDictionary>(); private readonly AsyncReaderWriterLock udpDataLock = new AsyncReaderWriterLock(); private int listenPort; private List listeners = new List(); private List tasks = new List(); private IPEndPoint groupEP; private CancellationTokenSource? cancellationTokenSource; private bool disposed = false; /// /// 是否正在工作 /// public bool IsRunning => cancellationTokenSource?.Token.IsCancellationRequested == false; /// UDP 服务器的错误代码 public enum ErrorCode { /// [TODO:description] Success = 0, /// [TODO:description] GetNoneAfterTimeout, /// [TODO:description] ResponseWrong, /// [TODO:description] NotRecvDataPackage, } /// /// Construct a udp server with fixed port /// /// Device UDP Port /// UDP Client Num /// UDPServer class public UDPServer(int port, int num) { // Construction this.listenPort = port; try { for (int i = 0; i < num; i++) { int currentPort = this.listenPort + i; if (IsPortInUse(currentPort)) { throw new ArgumentException( $"端口{currentPort}已被占用,无法启动UDP Server", nameof(port) ); } listeners.Add(new UdpClient(currentPort)); } this.groupEP = new IPEndPoint(IPAddress.Any, listenPort); } catch (Exception e) { Console.WriteLine(e.ToString()); throw new ArgumentException( $"Failed to set up server with this port: {port}", nameof(port) ); } } private bool IsPortInUse(int port) { bool inUse = false; try { var ipGlobalProperties = IPGlobalProperties.GetIPGlobalProperties(); var udpListeners = ipGlobalProperties.GetActiveUdpListeners(); foreach (var ep in udpListeners) { if (ep.Port == port) { inUse = true; break; } } } catch (Exception ex) { logger.Warn($"Failed to check port usage for port {port}: {ex.Message}"); } return inUse; } /// /// 异步寻找目标发送的内容 /// /// 目标IP地址 /// [TODO:parameter] /// 超时时间 /// 延迟时间 /// 调用函数名称 /// 调用函数位置 /// /// 异步Optional 数据包: /// Optional 为空时,表明找不到数据; /// Optional 存在时,为最先收到的数据 /// public async ValueTask> FindDataAsync( string ipAddr, int taskID, int timeout = 1000, int cycle = 0, [CallerMemberName] string callerName = "", [CallerLineNumber] int callerLineNum = 0 ) { UDPData? data = null; var key = $"{ipAddr}-{taskID}"; var startTime = DateTime.Now; var isTimeout = false; while (!isTimeout) { var elapsed = DateTime.Now - startTime; isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); if (isTimeout) break; try { using (await udpDataLock.AcquireWriteLockAsync(TimeSpan.FromMilliseconds(timeout))) { if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0) { // 获取最早的数据(第一个元素) var firstKey = sortedList.Keys[0]; data = sortedList[firstKey]; sortedList.RemoveAt(0); break; } } } catch { logger.Trace("Get nothing even after time out"); return new(null); } } if (data is null) { logger.Trace("Get nothing even after time out"); return new(null); } else { return new(data.DeepClone()); } } /// /// 异步寻找目标发送的所有内容,并清空队列 /// /// 目标IP地址 /// 任务ID /// 超时时间 /// 异步Optional 数据包列表 public async ValueTask>> FindDataArrayAsync(string ipAddr, int taskID, int timeout = 1000) { List? data = null; var key = $"{ipAddr}-{taskID}"; var startTime = DateTime.Now; var isTimeout = false; while (!isTimeout) { var elapsed = DateTime.Now - startTime; isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); if (isTimeout) break; try { using (await udpDataLock.AcquireWriteLockAsync(TimeSpan.FromMilliseconds(timeout))) { if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0) { data = new List(sortedList.Values); // 输出数据 // PrintDataArray(data); sortedList.Clear(); break; } } } catch { logger.Trace("Get nothing even after time out"); return new(null); } } if (data is null) { logger.Trace("Get nothing even after time out"); return new(null); } else { return new(data); } } /// /// 获取还未被读取的数据列表 /// /// IP地址 /// 任务ID /// 超时时间 /// 数据列表 public async ValueTask>> GetDataArrayAsync(string ipAddr, int taskID, int timeout = 1000) { List? data = null; try { using (await udpDataLock.AcquireReadLockAsync(TimeSpan.FromMilliseconds(timeout))) { var key = $"{ipAddr}-{taskID}"; if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0) { data = new List(sortedList.Values); } } } catch (TimeoutException) { logger.Trace("Failed to acquire read lock within timeout"); return new(null); } if (data is null) { logger.Trace("Get nothing even after time out"); return new(null); } else { return new(data); } } /// /// 异步获取指定IP和任务ID的数据队列长度 /// /// IP地址 /// 任务ID /// 超时时间 /// 数据队列长度 public async ValueTask> GetDataCountAsync(string ipAddr, int taskID, int timeout = 1000) { int? count = null; try { using (await udpDataLock.AcquireReadLockAsync(TimeSpan.FromMilliseconds(timeout))) { var key = $"{ipAddr}-{taskID}"; if (udpData.TryGetValue(key, out var sortedList)) { count = sortedList.Count; } } } catch (TimeoutException) { logger.Trace("Failed to acquire read lock within timeout"); return Optional.None; } if (count is null) { logger.Trace("Get nothing even after time out"); return Optional.None; } else { return new(count.Value); } } /// /// 异步等待写响应 /// /// IP地址 /// [TODO:parameter] /// UDP 端口 /// 超时时间范围 /// 接收响应包 public async ValueTask> WaitForAckAsync (string address, int taskID, int port = -1, int timeout = 1000) { var data = await FindDataAsync(address, taskID, timeout); if (!data.HasValue) return new(new Exception("Get None even after time out!")); var recvData = data.Value; if (recvData.Address != address || (port > 0 && recvData.Port != port)) return new(new Exception("Receive Data From Wrong Board!")); var retPack = WebProtocol.RecvRespPackage.FromBytes(recvData.Data); if (!retPack.IsSuccessful) return new(new Exception("Not RecvDataPackage!", retPack.Error)); return retPack.Value; } /// /// 异步等待数据 /// /// IP地址 /// [TODO:parameter] /// UDP 端口 /// 超时时间范围 /// 接收数据包 public async ValueTask> WaitForDataAsync (string address, int taskID, int port = -1, int timeout = 1000) { var data = await FindDataAsync(address, taskID, timeout); if (!data.HasValue) return new(new Exception("Get None even after time out!")); var recvData = data.Value; if (recvData.Address != address || (port >= 0 && recvData.Port != port)) return new(new Exception("Receive Data From Wrong Board!")); var retPack = WebProtocol.RecvDataPackage.FromBytes(recvData.Data); if (!retPack.IsSuccessful) return new(new Exception("Not RecvDataPackage!", retPack.Error)); return retPack.Value; } private async Task ReceiveHandler(byte[] data, IPEndPoint endPoint, DateTime time) { // 异步锁保护 udpData await Task.Run(async () => { try { // Handle RemoteEP if (endPoint is null) { logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:"); logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}"); return; } var udpDataObj = await RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1 + 4])); // PrintData(udpDataObj); } catch (Exception e) { logger.Error($"Got Error when handle receive:{e}"); } }); } private async Task RecordUDPData(byte[] bytes, IPEndPoint remoteEP, DateTime time, int taskID) { var remoteAddress = remoteEP.Address.ToString(); var remotePort = remoteEP.Port; var data = new UDPData() { DateTime = time, Timestamp = Number.BytesToUInt32(bytes[..4]).Value, Address = remoteAddress, Port = remotePort, TaskID = taskID, Data = bytes, HasRead = false, }; var key = $"{remoteAddress}-{taskID}"; try { using (await udpDataLock.AcquireWriteLockAsync(TimeSpan.FromMilliseconds(5000))) { var sortedList = udpData.GetOrAdd(key, _ => new SortedList()); // 处理相同时间戳的情况,添加微小的时间差 var uniqueTime = data.Timestamp; while (sortedList.ContainsKey(uniqueTime)) { logger.Warn( $"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}."); uniqueTime += 1; } sortedList.Add(uniqueTime, data); // 输出单个数据 // PrintData(data); } } catch (TimeoutException) { logger.Error($"Failed to acquire write lock for recording UDP data from {remoteAddress}:{remotePort}"); throw; } return data; } /// /// 输出UDP Data到log中 /// /// UDP数据 public string PrintData(UDPData data) { var bytes = data.Data; var sign = bytes[4]; string recvData = ""; if (sign == (byte)WebProtocol.PackSign.RecvData) { var resData = WebProtocol.RecvDataPackage.FromBytes(bytes); if (resData.IsSuccessful) recvData = resData.Value.Options.ToString(); else recvData = resData.Error.ToString(); } else if (sign == (byte)WebProtocol.PackSign.RecvResp) { var resData = WebProtocol.RecvRespPackage.FromBytes(bytes); if (resData.IsSuccessful) recvData = resData.Value.Options.ToString(); else recvData = resData.Error.ToString(); } else { recvData = Encoding.ASCII.GetString(bytes, 0, bytes.Length); } logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()} - {data.Timestamp}:"); logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}"); if (recvData.Length != 0) logger.Debug($" Decoded Data : {recvData}"); return $@" Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}: Original Data : {BitConverter.ToString(bytes).Replace("-", " ")} Decoded Data : {recvData} "; } /// /// 输出UDP Data数组到log中 /// /// UDP数据列表 public void PrintDataArray(IEnumerable dataArray) { foreach (var data in dataArray) { logger.Debug(PrintData(data)); } } /// /// 将所有数据输出到log中 /// /// void public async Task PrintAllDataAsync() { logger.Debug("Ready Data:"); try { using (await udpDataLock.AcquireReadLockAsync(TimeSpan.FromMilliseconds(5000))) { foreach (var kvp in udpData) { foreach (var data in kvp.Value.Values) { logger.Debug(PrintData(data)); } } } } catch (TimeoutException) { logger.Error("Failed to acquire read lock for printing all data"); } } /// /// 清空指定IP地址的数据 /// /// IP地址 /// [TODO:parameter] /// public void ClearUDPData(string ipAddr, int taskID) { var key = $"{ipAddr}-{taskID}"; using (udpDataLock.AcquireWriteLock()) { if (udpData.TryGetValue(key, out var sortedList)) { sortedList.Clear(); } } } /// /// Start UDP Server /// /// None public void Start() { if (cancellationTokenSource != null && !cancellationTokenSource.Token.IsCancellationRequested) { logger.Warn("UDP Server is already running"); return; } cancellationTokenSource = new CancellationTokenSource(); var cancellationToken = cancellationTokenSource.Token; try { foreach (var client in listeners) { tasks.Add(Task.Run(async () => { while (!cancellationToken.IsCancellationRequested) { try { // 使用 CancellationToken 来取消接收操作 var result = await client.ReceiveAsync(cancellationToken); _ = ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now); } catch (OperationCanceledException) { logger.Debug("UDP receive operation was cancelled"); break; } catch (ObjectDisposedException) { logger.Debug("UDP client was disposed"); break; } catch (Exception ex) { if (!cancellationToken.IsCancellationRequested) { logger.Error($"Error in UDP receive: {ex.Message}"); } } } }, cancellationToken)); } logger.Info("UDP Server started successfully"); } catch (Exception e) { logger.Error($"Failed to start UDP server: {e}"); cancellationTokenSource?.Cancel(); throw; } } /// /// Close UDP Server /// /// None public void Stop() { if (cancellationTokenSource == null || cancellationTokenSource.Token.IsCancellationRequested) { logger.Warn("UDP Server is not running or already stopped"); return; } try { logger.Info("Stopping UDP Server..."); // 取消所有操作 cancellationTokenSource.Cancel(); // 等待所有任务完成,设置超时时间 var waitTasks = Task.WhenAll(tasks); if (!waitTasks.Wait(TimeSpan.FromSeconds(5))) { logger.Warn("Some tasks did not complete within timeout period"); } // 关闭所有UDP客户端 foreach (var client in listeners) { try { client.Close(); } catch (Exception ex) { logger.Warn($"Error closing UDP client: {ex.Message}"); } } // 清理任务列表 tasks.Clear(); logger.Info("UDP Server stopped successfully"); } catch (Exception ex) { logger.Error($"Error stopping UDP server: {ex.Message}"); } finally { cancellationTokenSource?.Dispose(); cancellationTokenSource = null; } } /// /// 实现IDisposable接口,确保资源正确释放 /// public void Dispose() { if (!disposed) { Stop(); foreach (var client in listeners) { client?.Dispose(); } udpDataLock?.Dispose(); disposed = true; } } }