diff --git a/server/src/UdpServer.cs b/server/src/UdpServer.cs
index a1f23ca..1ede0f7 100644
--- a/server/src/UdpServer.cs
+++ b/server/src/UdpServer.cs
@@ -68,17 +68,20 @@ public class UDPData
///
/// UDP 服务器
///
-public class UDPServer
+public class UDPServer : IDisposable
{
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
- private ConcurrentDictionary> udpData = new ConcurrentDictionary>();
+ private ConcurrentDictionary> udpData = new ConcurrentDictionary>();
+ private readonly ReaderWriterLockSlim udpDataLock = new ReaderWriterLockSlim();
private int listenPort;
private List listeners = new List();
private IPEndPoint groupEP;
private bool isRunning = false;
+ private bool disposed = false;
+
///
/// 是否正在工作
///
@@ -155,15 +158,23 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
- lock (udpData)
+ udpDataLock.EnterWriteLock();
+ try
{
- if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
- dataQueue.TryDequeue(out data))
+ var key = $"{ipAddr}-{taskID}";
+ if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{
- // logger.Debug($"Find UDP Data: {data.ToString()}");
+ // 获取最早的数据(第一个元素)
+ var firstKey = sortedList.Keys[0];
+ data = sortedList[firstKey];
+ sortedList.RemoveAt(0);
break;
}
}
+ finally
+ {
+ udpDataLock.ExitWriteLock();
+ }
await Task.Delay(cycle);
}
@@ -198,19 +209,21 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
- lock (udpData)
+ udpDataLock.EnterWriteLock();
+ try
{
- if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
- !dataQueue.IsEmpty)
+ var key = $"{ipAddr}-{taskID}";
+ if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{
- data = new List();
- while (dataQueue.TryDequeue(out var item))
- {
- data.Add(item);
- }
+ data = new List(sortedList.Values);
+ sortedList.Clear();
break;
}
}
+ finally
+ {
+ udpDataLock.ExitWriteLock();
+ }
}
if (data is null)
@@ -243,12 +256,19 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
- if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
- !dataQueue.IsEmpty)
+ udpDataLock.EnterReadLock();
+ try
{
- data = dataQueue.ToArray().ToList();
- // logger.Debug($"Find UDP Data Array: {JsonConvert.SerializeObject(data)}");
- break;
+ var key = $"{ipAddr}-{taskID}";
+ if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
+ {
+ data = new List(sortedList.Values);
+ break;
+ }
+ }
+ finally
+ {
+ udpDataLock.ExitReadLock();
}
}
@@ -282,10 +302,19 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
- if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue))
+ udpDataLock.EnterReadLock();
+ try
{
- count = dataQueue.Count;
- break;
+ var key = $"{ipAddr}-{taskID}";
+ if (udpData.TryGetValue(key, out var sortedList))
+ {
+ count = sortedList.Count;
+ break;
+ }
+ }
+ finally
+ {
+ udpDataLock.ExitReadLock();
}
}
@@ -357,16 +386,23 @@ public class UDPServer
// 异步锁保护 udpData
await Task.Run(() =>
{
- // Handle RemoteEP
- if (endPoint is null)
+ try
{
- logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:");
- logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}");
- return;
- }
+ // Handle RemoteEP
+ if (endPoint is null)
+ {
+ logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:");
+ logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}");
+ return;
+ }
- var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1]));
- // PrintData(udpDataObj);
+ var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1]));
+ // PrintData(udpDataObj);
+ }
+ catch (Exception e)
+ {
+ logger.Error($"Got Error when handle receive:{e}");
+ }
});
}
@@ -384,21 +420,27 @@ public class UDPServer
HasRead = false,
};
-
- lock (udpData)
+ udpDataLock.EnterWriteLock();
+ try
{
var key = $"{remoteAddress}-{taskID}";
- var dataQueue = udpData.GetOrAdd(key, _ => new ConcurrentQueue());
- dataQueue.Enqueue(data);
+ var sortedList = udpData.GetOrAdd(key, _ => new SortedList());
- // 对队列进行一次按时间排序
- if (dataQueue.Count > 0)
+ // 处理相同时间戳的情况,添加微小的时间差
+ var uniqueTime = time;
+ while (sortedList.ContainsKey(uniqueTime))
{
- var sorted = dataQueue.OrderBy(d => d.DateTime).ToList();
- udpData.TryUpdate(key, new ConcurrentQueue(sorted), dataQueue);
+ logger.Warn(
+ $"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}.");
+ uniqueTime = uniqueTime.AddTicks(1);
}
- PrintAllData();
+ sortedList.Add(uniqueTime, data);
+ // PrintAllData();
+ }
+ finally
+ {
+ udpDataLock.ExitWriteLock();
}
return data;
@@ -461,13 +503,21 @@ public class UDPServer
{
logger.Debug("Ready Data:");
- foreach (var kvp in udpData)
+ udpDataLock.EnterReadLock();
+ try
{
- foreach (var data in kvp.Value)
+ foreach (var kvp in udpData)
{
- logger.Debug(PrintData(data));
+ foreach (var data in kvp.Value.Values)
+ {
+ logger.Debug(PrintData(data));
+ }
}
}
+ finally
+ {
+ udpDataLock.ExitReadLock();
+ }
}
///
@@ -479,10 +529,17 @@ public class UDPServer
public void ClearUDPData(string ipAddr, int taskID)
{
var key = $"{ipAddr}-{taskID}";
- if (udpData.TryGetValue(key, out var dataQueue))
+ udpDataLock.EnterWriteLock();
+ try
{
- // 清空队列的最有效方式是替换为新的队列
- udpData.TryUpdate(key, new ConcurrentQueue(), dataQueue);
+ if (udpData.TryGetValue(key, out var sortedList))
+ {
+ sortedList.Clear();
+ }
+ }
+ finally
+ {
+ udpDataLock.ExitWriteLock();
}
// 强制进行ARP刷新,防止后续传输时造成影响
@@ -603,7 +660,7 @@ public class UDPServer
};
process.Start();
-
+
// 设置超时时间,避免进程挂起
if (process.WaitForExit(5000)) // 5秒超时
{
@@ -648,8 +705,8 @@ public class UDPServer
{
try
{
- UdpReceiveResult result = await client.ReceiveAsync();
- ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now);
+ var result = await client.ReceiveAsync();
+ _ = ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now);
}
catch (Exception ex)
{
@@ -679,5 +736,32 @@ public class UDPServer
this.isRunning = false;
}
+
+ ///
+ /// 释放资源
+ ///
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposed)
+ {
+ if (disposing)
+ {
+ Stop();
+ udpDataLock?.Dispose();
+ }
+ disposed = true;
+ }
+ }
+
+ ~UDPServer()
+ {
+ Dispose(false);
+ }
}