fix:重新使用sortedlist来保证udp接受数据的顺序

This commit is contained in:
SikongJueluo 2025-07-16 12:23:13 +08:00
parent 446da52515
commit 0cfbebf804
No known key found for this signature in database
1 changed files with 132 additions and 48 deletions

View File

@ -68,17 +68,20 @@ public class UDPData
/// <summary> /// <summary>
/// UDP 服务器 /// UDP 服务器
/// </summary> /// </summary>
public class UDPServer public class UDPServer : IDisposable
{ {
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
private ConcurrentDictionary<string, ConcurrentQueue<UDPData>> udpData = new ConcurrentDictionary<string, ConcurrentQueue<UDPData>>(); private ConcurrentDictionary<string, SortedList<DateTime, UDPData>> udpData = new ConcurrentDictionary<string, SortedList<DateTime, UDPData>>();
private readonly ReaderWriterLockSlim udpDataLock = new ReaderWriterLockSlim();
private int listenPort; private int listenPort;
private List<UdpClient> listeners = new List<UdpClient>(); private List<UdpClient> listeners = new List<UdpClient>();
private IPEndPoint groupEP; private IPEndPoint groupEP;
private bool isRunning = false; private bool isRunning = false;
private bool disposed = false;
/// <summary> /// <summary>
/// 是否正在工作 /// 是否正在工作
/// </summary> /// </summary>
@ -155,15 +158,23 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break; if (isTimeout) break;
lock (udpData) udpDataLock.EnterWriteLock();
try
{ {
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && var key = $"{ipAddr}-{taskID}";
dataQueue.TryDequeue(out data)) if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{ {
// logger.Debug($"Find UDP Data: {data.ToString()}"); // 获取最早的数据(第一个元素)
var firstKey = sortedList.Keys[0];
data = sortedList[firstKey];
sortedList.RemoveAt(0);
break; break;
} }
} }
finally
{
udpDataLock.ExitWriteLock();
}
await Task.Delay(cycle); await Task.Delay(cycle);
} }
@ -198,19 +209,21 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break; if (isTimeout) break;
lock (udpData) udpDataLock.EnterWriteLock();
try
{ {
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && var key = $"{ipAddr}-{taskID}";
!dataQueue.IsEmpty) if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{ {
data = new List<UDPData>(); data = new List<UDPData>(sortedList.Values);
while (dataQueue.TryDequeue(out var item)) sortedList.Clear();
{
data.Add(item);
}
break; break;
} }
} }
finally
{
udpDataLock.ExitWriteLock();
}
} }
if (data is null) if (data is null)
@ -243,12 +256,19 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break; if (isTimeout) break;
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && udpDataLock.EnterReadLock();
!dataQueue.IsEmpty) try
{ {
data = dataQueue.ToArray().ToList(); var key = $"{ipAddr}-{taskID}";
// logger.Debug($"Find UDP Data Array: {JsonConvert.SerializeObject(data)}"); if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
break; {
data = new List<UDPData>(sortedList.Values);
break;
}
}
finally
{
udpDataLock.ExitReadLock();
} }
} }
@ -282,10 +302,19 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break; if (isTimeout) break;
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue)) udpDataLock.EnterReadLock();
try
{ {
count = dataQueue.Count; var key = $"{ipAddr}-{taskID}";
break; if (udpData.TryGetValue(key, out var sortedList))
{
count = sortedList.Count;
break;
}
}
finally
{
udpDataLock.ExitReadLock();
} }
} }
@ -357,16 +386,23 @@ public class UDPServer
// 异步锁保护 udpData // 异步锁保护 udpData
await Task.Run(() => await Task.Run(() =>
{ {
// Handle RemoteEP try
if (endPoint is null)
{ {
logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:"); // Handle RemoteEP
logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}"); if (endPoint is null)
return; {
} logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:");
logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}");
return;
}
var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1])); var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1]));
// PrintData(udpDataObj); // PrintData(udpDataObj);
}
catch (Exception e)
{
logger.Error($"Got Error when handle receive:{e}");
}
}); });
} }
@ -384,21 +420,27 @@ public class UDPServer
HasRead = false, HasRead = false,
}; };
udpDataLock.EnterWriteLock();
lock (udpData) try
{ {
var key = $"{remoteAddress}-{taskID}"; var key = $"{remoteAddress}-{taskID}";
var dataQueue = udpData.GetOrAdd(key, _ => new ConcurrentQueue<UDPData>()); var sortedList = udpData.GetOrAdd(key, _ => new SortedList<DateTime, UDPData>());
dataQueue.Enqueue(data);
// 对队列进行一次按时间排序 // 处理相同时间戳的情况,添加微小的时间差
if (dataQueue.Count > 0) var uniqueTime = time;
while (sortedList.ContainsKey(uniqueTime))
{ {
var sorted = dataQueue.OrderBy(d => d.DateTime).ToList(); logger.Warn(
udpData.TryUpdate(key, new ConcurrentQueue<UDPData>(sorted), dataQueue); $"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}.");
uniqueTime = uniqueTime.AddTicks(1);
} }
PrintAllData(); sortedList.Add(uniqueTime, data);
// PrintAllData();
}
finally
{
udpDataLock.ExitWriteLock();
} }
return data; return data;
@ -461,13 +503,21 @@ public class UDPServer
{ {
logger.Debug("Ready Data:"); logger.Debug("Ready Data:");
foreach (var kvp in udpData) udpDataLock.EnterReadLock();
try
{ {
foreach (var data in kvp.Value) foreach (var kvp in udpData)
{ {
logger.Debug(PrintData(data)); foreach (var data in kvp.Value.Values)
{
logger.Debug(PrintData(data));
}
} }
} }
finally
{
udpDataLock.ExitReadLock();
}
} }
/// <summary> /// <summary>
@ -479,10 +529,17 @@ public class UDPServer
public void ClearUDPData(string ipAddr, int taskID) public void ClearUDPData(string ipAddr, int taskID)
{ {
var key = $"{ipAddr}-{taskID}"; var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var dataQueue)) udpDataLock.EnterWriteLock();
try
{ {
// 清空队列的最有效方式是替换为新的队列 if (udpData.TryGetValue(key, out var sortedList))
udpData.TryUpdate(key, new ConcurrentQueue<UDPData>(), dataQueue); {
sortedList.Clear();
}
}
finally
{
udpDataLock.ExitWriteLock();
} }
// 强制进行ARP刷新防止后续传输时造成影响 // 强制进行ARP刷新防止后续传输时造成影响
@ -648,8 +705,8 @@ public class UDPServer
{ {
try try
{ {
UdpReceiveResult result = await client.ReceiveAsync(); var result = await client.ReceiveAsync();
ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now); _ = ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -679,5 +736,32 @@ public class UDPServer
this.isRunning = false; this.isRunning = false;
} }
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
Stop();
udpDataLock?.Dispose();
}
disposed = true;
}
}
~UDPServer()
{
Dispose(false);
}
} }