feat: 部分修复摄像头批量读逻辑
This commit is contained in:
parent
9b580be5e9
commit
c5f0e706a4
|
@ -465,7 +465,7 @@ public class UDPClientPool
|
||||||
// Send address packages in batches of 128, control outstanding
|
// Send address packages in batches of 128, control outstanding
|
||||||
int sentCount = 0;
|
int sentCount = 0;
|
||||||
var startTime = DateTime.Now;
|
var startTime = DateTime.Now;
|
||||||
const int batchSize = 128;
|
const int batchSize = 64;
|
||||||
while (sentCount < pkgList.Count)
|
while (sentCount < pkgList.Count)
|
||||||
{
|
{
|
||||||
var elapsed = DateTime.Now - startTime;
|
var elapsed = DateTime.Now - startTime;
|
||||||
|
@ -487,11 +487,12 @@ public class UDPClientPool
|
||||||
var ret = await UDPClientPool.SendMultiAddrPackAsync(endPoint, batchPkgs);
|
var ret = await UDPClientPool.SendMultiAddrPackAsync(endPoint, batchPkgs);
|
||||||
if (!ret) return new(new Exception($"Send address package batch failed at segment {sentCount}!"));
|
if (!ret) return new(new Exception($"Send address package batch failed at segment {sentCount}!"));
|
||||||
sentCount += batchSend;
|
sentCount += batchSend;
|
||||||
|
// Task.Delay(1).Wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait until enough data is received or timeout
|
// Wait until enough data is received or timeout
|
||||||
startTime = DateTime.Now;
|
startTime = DateTime.Now;
|
||||||
List<UDPData>? udpDatas = null;
|
var udpDatas = new List<UDPData>();
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
var elapsed = DateTime.Now - startTime;
|
var elapsed = DateTime.Now - startTime;
|
||||||
|
@ -504,23 +505,23 @@ public class UDPClientPool
|
||||||
var dataArr = await MsgBus.UDPServer.FindDataArrayAsync(endPoint.Address.ToString(), taskID, timeleft);
|
var dataArr = await MsgBus.UDPServer.FindDataArrayAsync(endPoint.Address.ToString(), taskID, timeleft);
|
||||||
if (dataArr.HasValue)
|
if (dataArr.HasValue)
|
||||||
{
|
{
|
||||||
udpDatas = dataArr.Value;
|
udpDatas.AddRange(dataArr.Value);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (udpDatas is null || udpDatas.Count < readTimes)
|
if (udpDatas.Count < readTimes)
|
||||||
return new(new Exception($"Expected {readTimes} UDP data packets but received {udpDatas?.Count ?? 0}"));
|
return new(new Exception($"Expected {readTimes} UDP data packets but received {udpDatas.Count}"));
|
||||||
|
|
||||||
// Collect and validate all received data
|
// Collect and validate all received data
|
||||||
for (var i = 0; i < udpDatas.Count; i++)
|
for (var i = 0; i < udpDatas.Count; i++)
|
||||||
{
|
{
|
||||||
var bytes = udpDatas[i].Data;
|
var bytes = udpDatas[i].Data;
|
||||||
var expectedLen = ((pkgList[i].Options.BurstLength + 1) * 4);
|
var expectedLen = ((pkgList[i].Options.BurstLength + 1) * 4);
|
||||||
if (bytes.Length != expectedLen)
|
if ((bytes.Length - 4) != expectedLen)
|
||||||
return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length} bytes at segment {i}"));
|
return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length - 4} bytes at segment {i}"));
|
||||||
resultData.AddRange(bytes);
|
resultData.AddRange(bytes[4..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate total data length
|
// Validate total data length
|
||||||
|
|
|
@ -367,15 +367,17 @@ public class UDPServer
|
||||||
|
|
||||||
return retPack.Value;
|
return retPack.Value;
|
||||||
}
|
}
|
||||||
|
static int ReceiveHandleCcount = 0;
|
||||||
|
|
||||||
private void ReceiveHandler(IAsyncResult res)
|
private void ReceiveHandler(IAsyncResult res)
|
||||||
{
|
{
|
||||||
logger.Trace("Enter handler");
|
|
||||||
var remoteEP = new IPEndPoint(IPAddress.Any, listenPort);
|
var remoteEP = new IPEndPoint(IPAddress.Any, listenPort);
|
||||||
byte[] bytes = listener.EndReceive(res, ref remoteEP);
|
byte[] bytes = listener.EndReceive(res, ref remoteEP);
|
||||||
|
|
||||||
// 提前开始接收下一个包
|
// 提前开始接收下一个包
|
||||||
listener.BeginReceive(new AsyncCallback(ReceiveHandler), null);
|
listener.BeginReceive(new AsyncCallback(ReceiveHandler), null);
|
||||||
|
logger.Debug($"Test ReceiveHandler Count = {ReceiveHandleCcount}");
|
||||||
|
ReceiveHandleCcount++;
|
||||||
|
|
||||||
// Handle RemoteEP
|
// Handle RemoteEP
|
||||||
if (remoteEP is null)
|
if (remoteEP is null)
|
||||||
|
@ -431,7 +433,7 @@ public class UDPServer
|
||||||
udpData.TryGetValue($"{remoteAddress}-{taskID}", out var dataQueue))
|
udpData.TryGetValue($"{remoteAddress}-{taskID}", out var dataQueue))
|
||||||
{
|
{
|
||||||
dataQueue.Enqueue(data);
|
dataQueue.Enqueue(data);
|
||||||
logger.Trace("Receive data from old client");
|
logger.Debug($"Test Lock dataQueue.Count = {dataQueue.Count}");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue