fix: 添加互斥锁,并增加更多log输出

This commit is contained in:
SikongJueluo 2025-07-15 16:36:16 +08:00
parent be8fed995c
commit b139542c4c
No known key found for this signature in database
1 changed files with 38 additions and 20 deletions

View File

@ -155,11 +155,14 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break; if (isTimeout) break;
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && lock (udpData)
dataQueue.TryDequeue(out data))
{ {
// logger.Debug($"Find UDP Data: {data.ToString()}"); if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
break; dataQueue.TryDequeue(out data))
{
// logger.Debug($"Find UDP Data: {data.ToString()}");
break;
}
} }
await Task.Delay(cycle); await Task.Delay(cycle);
@ -195,15 +198,18 @@ public class UDPServer
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break; if (isTimeout) break;
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && lock (udpData)
!dataQueue.IsEmpty)
{ {
data = new List<UDPData>(); if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
while (dataQueue.TryDequeue(out var item)) !dataQueue.IsEmpty)
{ {
data.Add(item); data = new List<UDPData>();
while (dataQueue.TryDequeue(out var item))
{
data.Add(item);
}
break;
} }
break;
} }
} }
@ -360,7 +366,7 @@ public class UDPServer
} }
var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1])); var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1]));
PrintData(udpDataObj); // PrintData(udpDataObj);
}); });
} }
@ -378,15 +384,22 @@ public class UDPServer
HasRead = false, HasRead = false,
}; };
var key = $"{remoteAddress}-{taskID}";
var dataQueue = udpData.GetOrAdd(key, _ => new ConcurrentQueue<UDPData>());
dataQueue.Enqueue(data);
// 对队列进行一次按时间排序 lock (udpData)
var sorted = dataQueue.OrderBy(d => d.DateTime).ToList(); {
udpData.TryUpdate(key, new ConcurrentQueue<UDPData>(sorted), dataQueue); var key = $"{remoteAddress}-{taskID}";
var dataQueue = udpData.GetOrAdd(key, _ => new ConcurrentQueue<UDPData>());
dataQueue.Enqueue(data);
logger.Debug($"Test dataQueue.Count = {udpData[key].Count}"); // 对队列进行一次按时间排序
if (dataQueue.Count > 0)
{
var sorted = dataQueue.OrderBy(d => d.DateTime).ToList();
udpData.TryUpdate(key, new ConcurrentQueue<UDPData>(sorted), dataQueue);
}
PrintAllData();
}
return data; return data;
} }
@ -395,7 +408,7 @@ public class UDPServer
/// 输出UDP Data到log中 /// 输出UDP Data到log中
/// </summary> /// </summary>
/// <param name="data">UDP数据</param> /// <param name="data">UDP数据</param>
public void PrintData(UDPData data) public string PrintData(UDPData data)
{ {
var bytes = data.Data; var bytes = data.Data;
var sign = bytes[0]; var sign = bytes[0];
@ -433,6 +446,11 @@ public class UDPServer
// logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:"); // logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:");
// logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}"); // logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}");
// if (recvData.Length != 0) logger.Debug($" Decoded Data : {recvData}"); // if (recvData.Length != 0) logger.Debug($" Decoded Data : {recvData}");
return $@"
Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:
Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}
Decoded Data : {recvData}
";
} }
/// <summary> /// <summary>
@ -447,7 +465,7 @@ public class UDPServer
{ {
foreach (var data in kvp.Value) foreach (var data in kvp.Value)
{ {
logger.Debug(data.ToString()); logger.Debug(PrintData(data));
} }
} }
} }