From 0cfbebf804b0c6dddc0e4786b06c63afbfcfa845 Mon Sep 17 00:00:00 2001 From: SikongJueluo Date: Wed, 16 Jul 2025 12:23:13 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E9=87=8D=E6=96=B0=E4=BD=BF?= =?UTF-8?q?=E7=94=A8sortedlist=E6=9D=A5=E4=BF=9D=E8=AF=81udp=E6=8E=A5?= =?UTF-8?q?=E5=8F=97=E6=95=B0=E6=8D=AE=E7=9A=84=E9=A1=BA=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/UdpServer.cs | 180 +++++++++++++++++++++++++++++----------- 1 file changed, 132 insertions(+), 48 deletions(-) diff --git a/server/src/UdpServer.cs b/server/src/UdpServer.cs index a1f23ca..1ede0f7 100644 --- a/server/src/UdpServer.cs +++ b/server/src/UdpServer.cs @@ -68,17 +68,20 @@ public class UDPData /// /// UDP 服务器 /// -public class UDPServer +public class UDPServer : IDisposable { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); - private ConcurrentDictionary> udpData = new ConcurrentDictionary>(); + private ConcurrentDictionary> udpData = new ConcurrentDictionary>(); + private readonly ReaderWriterLockSlim udpDataLock = new ReaderWriterLockSlim(); private int listenPort; private List listeners = new List(); private IPEndPoint groupEP; private bool isRunning = false; + private bool disposed = false; + /// /// 是否正在工作 /// @@ -155,15 +158,23 @@ public class UDPServer isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); if (isTimeout) break; - lock (udpData) + udpDataLock.EnterWriteLock(); + try { - if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && - dataQueue.TryDequeue(out data)) + var key = $"{ipAddr}-{taskID}"; + if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0) { - // logger.Debug($"Find UDP Data: {data.ToString()}"); + // 获取最早的数据(第一个元素) + var firstKey = sortedList.Keys[0]; + data = sortedList[firstKey]; + sortedList.RemoveAt(0); break; } } + finally + { + udpDataLock.ExitWriteLock(); + } await Task.Delay(cycle); } @@ -198,19 +209,21 @@ public class UDPServer isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); if (isTimeout) break; - lock (udpData) + udpDataLock.EnterWriteLock(); + try { - if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && - !dataQueue.IsEmpty) + var key = $"{ipAddr}-{taskID}"; + if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0) { - data = new List(); - while (dataQueue.TryDequeue(out var item)) - { - data.Add(item); - } + data = new List(sortedList.Values); + sortedList.Clear(); break; } } + finally + { + udpDataLock.ExitWriteLock(); + } } if (data is null) @@ -243,12 +256,19 @@ public class UDPServer isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); if (isTimeout) break; - if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && - !dataQueue.IsEmpty) + udpDataLock.EnterReadLock(); + try { - data = dataQueue.ToArray().ToList(); - // logger.Debug($"Find UDP Data Array: {JsonConvert.SerializeObject(data)}"); - break; + var key = $"{ipAddr}-{taskID}"; + if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0) + { + data = new List(sortedList.Values); + break; + } + } + finally + { + udpDataLock.ExitReadLock(); } } @@ -282,10 +302,19 @@ public class UDPServer isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); if (isTimeout) break; - if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue)) + udpDataLock.EnterReadLock(); + try { - count = dataQueue.Count; - break; + var key = $"{ipAddr}-{taskID}"; + if (udpData.TryGetValue(key, out var sortedList)) + { + count = sortedList.Count; + break; + } + } + finally + { + udpDataLock.ExitReadLock(); } } @@ -357,16 +386,23 @@ public class UDPServer // 异步锁保护 udpData await Task.Run(() => { - // Handle RemoteEP - if (endPoint is null) + try { - logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:"); - logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}"); - return; - } + // Handle RemoteEP + if (endPoint is null) + { + logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:"); + logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}"); + return; + } - var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1])); - // PrintData(udpDataObj); + var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1])); + // PrintData(udpDataObj); + } + catch (Exception e) + { + logger.Error($"Got Error when handle receive:{e}"); + } }); } @@ -384,21 +420,27 @@ public class UDPServer HasRead = false, }; - - lock (udpData) + udpDataLock.EnterWriteLock(); + try { var key = $"{remoteAddress}-{taskID}"; - var dataQueue = udpData.GetOrAdd(key, _ => new ConcurrentQueue()); - dataQueue.Enqueue(data); + var sortedList = udpData.GetOrAdd(key, _ => new SortedList()); - // 对队列进行一次按时间排序 - if (dataQueue.Count > 0) + // 处理相同时间戳的情况,添加微小的时间差 + var uniqueTime = time; + while (sortedList.ContainsKey(uniqueTime)) { - var sorted = dataQueue.OrderBy(d => d.DateTime).ToList(); - udpData.TryUpdate(key, new ConcurrentQueue(sorted), dataQueue); + logger.Warn( + $"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}."); + uniqueTime = uniqueTime.AddTicks(1); } - PrintAllData(); + sortedList.Add(uniqueTime, data); + // PrintAllData(); + } + finally + { + udpDataLock.ExitWriteLock(); } return data; @@ -461,13 +503,21 @@ public class UDPServer { logger.Debug("Ready Data:"); - foreach (var kvp in udpData) + udpDataLock.EnterReadLock(); + try { - foreach (var data in kvp.Value) + foreach (var kvp in udpData) { - logger.Debug(PrintData(data)); + foreach (var data in kvp.Value.Values) + { + logger.Debug(PrintData(data)); + } } } + finally + { + udpDataLock.ExitReadLock(); + } } /// @@ -479,10 +529,17 @@ public class UDPServer public void ClearUDPData(string ipAddr, int taskID) { var key = $"{ipAddr}-{taskID}"; - if (udpData.TryGetValue(key, out var dataQueue)) + udpDataLock.EnterWriteLock(); + try { - // 清空队列的最有效方式是替换为新的队列 - udpData.TryUpdate(key, new ConcurrentQueue(), dataQueue); + if (udpData.TryGetValue(key, out var sortedList)) + { + sortedList.Clear(); + } + } + finally + { + udpDataLock.ExitWriteLock(); } // 强制进行ARP刷新,防止后续传输时造成影响 @@ -603,7 +660,7 @@ public class UDPServer }; process.Start(); - + // 设置超时时间,避免进程挂起 if (process.WaitForExit(5000)) // 5秒超时 { @@ -648,8 +705,8 @@ public class UDPServer { try { - UdpReceiveResult result = await client.ReceiveAsync(); - ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now); + var result = await client.ReceiveAsync(); + _ = ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now); } catch (Exception ex) { @@ -679,5 +736,32 @@ public class UDPServer this.isRunning = false; } + + /// + /// 释放资源 + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (!disposed) + { + if (disposing) + { + Stop(); + udpDataLock?.Dispose(); + } + disposed = true; + } + } + + ~UDPServer() + { + Dispose(false); + } }