feat: 添加大数据接收方法,以提高接受速度

This commit is contained in:
SikongJueluo 2025-07-13 11:40:41 +08:00
parent b913f58f13
commit 32b126b93f
No known key found for this signature in database
3 changed files with 201 additions and 7 deletions

View File

@ -179,12 +179,12 @@ class Camera
logger.Trace($"Reading frame from camera {this.address}"); logger.Trace($"Reading frame from camera {this.address}");
// 使用UDPClientPool读取图像帧数据 // 使用UDPClientPool读取图像帧数据
var result = await UDPClientPool.ReadAddr4Bytes( var result = await UDPClientPool.ReadAddr4BytesAsync(
this.ep, this.ep,
this.taskID, // taskID this.taskID, // taskID
FrameAddr, FrameAddr,
// ((int)FrameLength), // ((int)FrameLength),
1280*720/2, 1280 * 720 / 2,
this.timeout); this.timeout);
if (!result.IsSuccessful) if (!result.IsSuccessful)
@ -276,7 +276,7 @@ class Camera
/// <param name="vWindow">垂直窗口大小默认1300</param> /// <param name="vWindow">垂直窗口大小默认1300</param>
/// <returns>配置结果</returns> /// <returns>配置结果</returns>
public async ValueTask<Result<bool>> ConfigureResolution( public async ValueTask<Result<bool>> ConfigureResolution(
UInt16 hStart, UInt16 vStart, UInt16 hStart, UInt16 vStart,
UInt16 dvpHo, UInt16 dvpVo, UInt16 dvpHo, UInt16 dvpVo,
UInt16 hts, UInt16 vts, UInt16 hts, UInt16 vts,
UInt16 hOffset, UInt16 vOffset, UInt16 hOffset, UInt16 vOffset,
@ -285,7 +285,7 @@ class Camera
// 计算结束位置 // 计算结束位置
UInt16 hEnd = (UInt16)(hStart + hWindow - 1); UInt16 hEnd = (UInt16)(hStart + hWindow - 1);
UInt16 vEnd = (UInt16)(vStart + vWindow - 1); UInt16 vEnd = (UInt16)(vStart + vWindow - 1);
// 计算帧长度 // 计算帧长度
UInt32 frameLength = (UInt32)(dvpHo * dvpVo * 16 / 32); UInt32 frameLength = (UInt32)(dvpHo * dvpVo * 16 / 32);

View File

@ -380,6 +380,113 @@ public class UDPClientPool
} }
/// <summary>
/// 从设备地址读取字节数组数据(支持大数据量分段传输,先发送所有地址包再接收所有数据包)
/// </summary>
/// <param name="endPoint">IP端点IP地址与端口</param>
/// <param name="taskID">任务ID</param>
/// <param name="devAddr">设备地址</param>
/// <param name="dataLength">要读取的数据长度4字节</param>
/// <param name="timeout">超时时间(毫秒)</param>
/// <returns>读取结果,包含接收到的字节数组</returns>
public static async ValueTask<Result<byte[]>> ReadAddr4BytesAsync(
IPEndPoint endPoint, int taskID, UInt32 devAddr, int dataLength, int timeout = 1000)
{
var optsList = new List<SendAddrPackOptions>();
var resultData = new List<byte>();
// Check Msg Bus
if (!MsgBus.IsRunning)
return new(new Exception("Message bus not working!"));
// Prepare options for each segment
var max4BytesPerRead = 0x80; // 512 bytes per read
var rest4Bytes = dataLength % max4BytesPerRead;
var readTimes = (rest4Bytes != 0) ?
(dataLength / max4BytesPerRead + 1) :
(dataLength / max4BytesPerRead);
for (var i = 0; i < readTimes; i++)
{
var isLastSegment = i == readTimes - 1;
var currentSegmentSize = (isLastSegment && rest4Bytes != 0) ? rest4Bytes : max4BytesPerRead;
var opts = new SendAddrPackOptions
{
BurstType = BurstType.FixedBurst,
CommandID = Convert.ToByte(taskID),
IsWrite = false,
BurstLength = (byte)(currentSegmentSize - 1),
Address = devAddr + (uint)(i * max4BytesPerRead)
};
optsList.Add(opts);
}
// Send all address packages first, but keep outstanding < 512
int sentCount = 0;
var startTime = DateTime.Now;
while (sentCount < optsList.Count)
{
// Check how many data packets have been received
var elapsed = DateTime.Now - startTime;
if (elapsed >= TimeSpan.FromMilliseconds(timeout))
break;
var timeleft = timeout - (int)elapsed.TotalMilliseconds;
var found = await MsgBus.UDPServer.GetDataCountAsync(endPoint.Address.ToString(), taskID, timeleft);
int outstanding = sentCount - (found.HasValue ? found.Value : 0);
// If outstanding >= 512, wait for some data to be received
if (outstanding >= 512)
continue;
// Send next address package
var ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(optsList[sentCount]));
if (!ret) return new(new Exception($"Send address package failed at segment {sentCount}!"));
sentCount++;
}
// Wait until enough data is received or timeout
startTime = DateTime.Now;
List<UDPData>? udpDatas = null;
while (true)
{
var elapsed = DateTime.Now - startTime;
if (elapsed >= TimeSpan.FromMilliseconds(timeout)) break;
var timeleft = timeout - (int)elapsed.TotalMilliseconds;
var found = await MsgBus.UDPServer.GetDataCountAsync(endPoint.Address.ToString(), taskID, timeleft);
if (found.HasValue && found.Value >= readTimes)
{
var dataArr = await MsgBus.UDPServer.FindDataArrayAsync(endPoint.Address.ToString(), taskID, timeleft);
if (dataArr.HasValue)
{
udpDatas = dataArr.Value;
break;
}
}
}
if (udpDatas is null || udpDatas.Count < readTimes)
return new(new Exception($"Expected {readTimes} UDP data packets but received {udpDatas?.Count ?? 0}"));
// Collect and validate all received data
for (var i = 0; i < udpDatas.Count; i++)
{
var bytes = udpDatas[i].Data;
var expectedLen = ((optsList[i].BurstLength + 1) * 4);
if (bytes.Length != expectedLen)
return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length} bytes at segment {i}"));
resultData.AddRange(bytes);
}
// Validate total data length
if (resultData.Count != dataLength * 4)
return new(new Exception($"Expected total {dataLength * 4} bytes but received {resultData.Count} bytes"));
return resultData.ToArray();
}
/// <summary> /// <summary>
/// 向设备地址写入32位数据 /// 向设备地址写入32位数据
/// </summary> /// </summary>

View File

@ -183,15 +183,59 @@ public class UDPServer
} }
} }
/// <summary>
/// 异步寻找目标发送的所有内容,并清空队列
/// </summary>
/// <param name="ipAddr">目标IP地址</param>
/// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param>
/// <returns>异步Optional 数据包列表</returns>
public async ValueTask<Optional<List<UDPData>>> FindDataArrayAsync(string ipAddr, int taskID, int timeout = 1000)
{
List<UDPData>? data = null;
var startTime = DateTime.Now;
var isTimeout = false;
var timeleft = TimeSpan.FromMilliseconds(timeout);
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed;
using (await udpData.AcquireWriteLockAsync(timeleft))
{
if (udpData.ContainsKey($"{ipAddr}-{taskID}") &&
udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
dataQueue.Count > 0)
{
data = dataQueue.ToList();
dataQueue.Clear();
break;
}
}
}
if (data is null)
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data);
}
}
/// <summary> /// <summary>
/// 获取还未被读取的数据列表 /// 获取还未被读取的数据列表
/// </summary> /// </summary>
/// <param name="ipAddr">IP地址</param> /// <param name="ipAddr">IP地址</param>
/// <param name="taskID">[TODO:parameter]</param> /// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param> /// <param name="timeout">超时时间</param>
/// <param name="cycle">延迟时间</param>
/// <returns>数据列表</returns> /// <returns>数据列表</returns>
public async ValueTask<Optional<List<UDPData>>> GetDataArrayAsync(string ipAddr, int taskID, int timeout = 1000, int cycle = 0) public async ValueTask<Optional<List<UDPData>>> GetDataArrayAsync(string ipAddr, int taskID, int timeout = 1000)
{ {
List<UDPData>? data = null; List<UDPData>? data = null;
@ -229,6 +273,49 @@ public class UDPServer
} }
} }
/// <summary>
/// 异步获取指定IP和任务ID的数据队列长度
/// </summary>
/// <param name="ipAddr">IP地址</param>
/// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param>
/// <returns>数据队列长度</returns>
public async ValueTask<Optional<int>> GetDataCountAsync(string ipAddr, int taskID, int timeout = 1000)
{
int? count = null;
var startTime = DateTime.Now;
var isTimeout = false;
var timeleft = TimeSpan.FromMilliseconds(timeout);
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed;
using (await udpData.AcquireReadLockAsync(timeleft))
{
if (udpData.ContainsKey($"{ipAddr}-{taskID}") &&
udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue))
{
count = dataQueue.Count;
break;
}
}
}
if (count is null)
{
logger.Trace("Get nothing even after time out");
return Optional<int>.None;
}
else
{
return new(count.Value);
}
}
/// <summary> /// <summary>
/// 异步等待写响应 /// 异步等待写响应
/// </summary> /// </summary>