fix: 由于解析错误导致的无法通信的问题

This commit is contained in:
SikongJueluo 2025-07-17 14:11:24 +08:00
parent 56dcbf5caa
commit 1053d71d29
No known key found for this signature in database
4 changed files with 99 additions and 107 deletions

View File

@ -14,8 +14,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="DotNext" Version="5.19.1" />
<PackageReference Include="DotNext.Threading" Version="5.19.1" />
<PackageReference Include="DotNext" Version="5.23.0" />
<PackageReference Include="DotNext.Threading" Version="5.23.0" />
<PackageReference Include="Honoo.IO.Hashing.Crc" Version="1.3.3" />
<PackageReference Include="linq2db.AspNet" Version="5.4.1" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.7" />

View File

@ -525,9 +525,9 @@ public class UDPClientPool
{
var bytes = udpDatas[i].Data;
var expectedLen = ((pkgList[i].Options.BurstLength + 1) * 4);
if ((bytes.Length - 4) != expectedLen)
return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length - 4} bytes at segment {i}"));
resultData.AddRange(bytes[4..]);
if ((bytes.Length - 8) != expectedLen)
return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length - 8} bytes at segment {i}"));
resultData.AddRange(bytes[8..]);
}
// Validate total data length

View File

@ -6,6 +6,7 @@ using System.Runtime.CompilerServices;
using System.Text;
using Common;
using DotNext;
using DotNext.Threading;
using Newtonsoft.Json;
using WebProtocol;
@ -81,7 +82,7 @@ public class UDPServer
private ConcurrentDictionary<string, SortedList<UInt32, UDPData>> udpData
= new ConcurrentDictionary<string, SortedList<UInt32, UDPData>>();
private readonly ReaderWriterLockSlim udpDataLock = new ReaderWriterLockSlim();
private readonly AsyncReaderWriterLock udpDataLock = new AsyncReaderWriterLock();
private int listenPort;
private List<UdpClient> listeners = new List<UdpClient>();
@ -189,45 +190,36 @@ public class UDPServer
)
{
UDPData? data = null;
var key = $"{ipAddr}-{taskID}";
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
try
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
udpDataLock.EnterWriteLock();
try
using (await udpDataLock.AcquireWriteLockAsync(TimeSpan.FromMilliseconds(timeout)))
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{
// 获取最早的数据(第一个元素)
var firstKey = sortedList.Keys[0];
data = sortedList[firstKey];
sortedList.RemoveAt(0);
break;
}
}
finally
if (data is null)
{
udpDataLock.ExitWriteLock();
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data.DeepClone());
}
await Task.Delay(cycle);
}
if (data is null)
catch
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data.DeepClone());
}
}
/// <summary>
@ -240,43 +232,38 @@ public class UDPServer
public async ValueTask<Optional<List<UDPData>>> FindDataArrayAsync(string ipAddr, int taskID, int timeout = 1000)
{
List<UDPData>? data = null;
var key = $"{ipAddr}-{taskID}";
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
try
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
udpDataLock.EnterWriteLock();
try
using (await udpDataLock.AcquireWriteLockAsync(TimeSpan.FromMilliseconds(timeout)))
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{
data = new List<UDPData>(sortedList.Values);
// 输出数据
// PrintDataArray(data);
sortedList.Clear();
break;
}
}
finally
if (data is null)
{
udpDataLock.ExitWriteLock();
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data);
}
}
if (data is null)
catch
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data);
}
}
/// <summary>
@ -290,28 +277,21 @@ public class UDPServer
{
List<UDPData>? data = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
try
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
udpDataLock.EnterReadLock();
try
using (await udpDataLock.AcquireReadLockAsync(TimeSpan.FromMilliseconds(timeout)))
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{
data = new List<UDPData>(sortedList.Values);
break;
}
}
finally
{
udpDataLock.ExitReadLock();
}
}
catch (TimeoutException)
{
logger.Trace("Failed to acquire read lock within timeout");
return new(null);
}
if (data is null)
@ -336,28 +316,21 @@ public class UDPServer
{
int? count = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
try
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
udpDataLock.EnterReadLock();
try
using (await udpDataLock.AcquireReadLockAsync(TimeSpan.FromMilliseconds(timeout)))
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var sortedList))
{
count = sortedList.Count;
break;
}
}
finally
{
udpDataLock.ExitReadLock();
}
}
catch (TimeoutException)
{
logger.Trace("Failed to acquire read lock within timeout");
return Optional<int>.None;
}
if (count is null)
@ -426,7 +399,7 @@ public class UDPServer
private async Task ReceiveHandler(byte[] data, IPEndPoint endPoint, DateTime time)
{
// 异步锁保护 udpData
await Task.Run(() =>
await Task.Run(async () =>
{
try
{
@ -438,7 +411,7 @@ public class UDPServer
return;
}
var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1]));
var udpDataObj = await RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1 + 4]));
// PrintData(udpDataObj);
}
catch (Exception e)
@ -448,7 +421,7 @@ public class UDPServer
});
}
private UDPData RecordUDPData(byte[] bytes, IPEndPoint remoteEP, DateTime time, int taskID)
private async Task<UDPData> RecordUDPData(byte[] bytes, IPEndPoint remoteEP, DateTime time, int taskID)
{
var remoteAddress = remoteEP.Address.ToString();
var remotePort = remoteEP.Port;
@ -465,29 +438,30 @@ public class UDPServer
var key = $"{remoteAddress}-{taskID}";
udpDataLock.EnterWriteLock();
try
{
var sortedList = udpData.GetOrAdd(key, _ => new SortedList<UInt32, UDPData>());
// 处理相同时间戳的情况,添加微小的时间差
var uniqueTime = data.Timestamp;
while (sortedList.ContainsKey(uniqueTime))
using (await udpDataLock.AcquireWriteLockAsync(TimeSpan.FromMilliseconds(5000)))
{
logger.Warn(
$"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}.");
uniqueTime += 1;
}
var sortedList = udpData.GetOrAdd(key, _ => new SortedList<UInt32, UDPData>());
sortedList.Add(uniqueTime, data);
// 输出单个数据
PrintData(data);
// 输出全部数据
// PrintAllData();
// 处理相同时间戳的情况,添加微小的时间差
var uniqueTime = data.Timestamp;
while (sortedList.ContainsKey(uniqueTime))
{
logger.Warn(
$"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}.");
uniqueTime += 1;
}
sortedList.Add(uniqueTime, data);
// 输出单个数据
PrintData(data);
}
}
finally
catch (TimeoutException)
{
udpDataLock.ExitWriteLock();
logger.Error($"Failed to acquire write lock for recording UDP data from {remoteAddress}:{remotePort}");
throw;
}
return data;
@ -550,24 +524,26 @@ public class UDPServer
/// 将所有数据输出到log中
/// </summary>
/// <returns> void </returns>
public void PrintAllData()
public async Task PrintAllDataAsync()
{
logger.Debug("Ready Data:");
udpDataLock.EnterReadLock();
try
{
foreach (var kvp in udpData)
using (await udpDataLock.AcquireReadLockAsync(TimeSpan.FromMilliseconds(5000)))
{
foreach (var data in kvp.Value.Values)
foreach (var kvp in udpData)
{
logger.Debug(PrintData(data));
foreach (var data in kvp.Value.Values)
{
logger.Debug(PrintData(data));
}
}
}
}
finally
catch (TimeoutException)
{
udpDataLock.ExitReadLock();
logger.Error("Failed to acquire read lock for printing all data");
}
}
@ -580,18 +556,14 @@ public class UDPServer
public void ClearUDPData(string ipAddr, int taskID)
{
var key = $"{ipAddr}-{taskID}";
udpDataLock.EnterWriteLock();
try
using (udpDataLock.AcquireWriteLock())
{
if (udpData.TryGetValue(key, out var sortedList))
{
sortedList.Clear();
}
}
finally
{
udpDataLock.ExitWriteLock();
}
// 强制进行ARP刷新防止后续传输时造成影响
// FlushArpEntry(ipAddr);

View File

@ -393,6 +393,16 @@ namespace WebProtocol
bytes[8..]);
}
/// <summary>
/// [TODO:description]
/// </summary>
/// <param name="bytes">[TODO:parameter]</param>
/// <returns>[TODO:return]</returns>
public static bool IsRecvDataPackage(byte[] bytes)
{
return bytes[4] == (byte)PackSign.RecvData;
}
/// <summary>
/// 将数据包转化为字节数组
/// </summary>
@ -496,6 +506,16 @@ namespace WebProtocol
return new RecvRespPackage(timestamp, bytes[5], bytes[6]);
}
/// <summary>
/// [TODO:description]
/// </summary>
/// <param name="bytes">[TODO:parameter]</param>
/// <returns>[TODO:return]</returns>
public static bool IsRecvRespPackage(byte[] bytes)
{
return bytes[4] == (byte)PackSign.RecvResp;
}
/// <summary>
/// 将数据包转化为字节数组
/// </summary>