diff --git a/server/src/Peripherals/CameraClient.cs b/server/src/Peripherals/CameraClient.cs index 382192f..23c5757 100644 --- a/server/src/Peripherals/CameraClient.cs +++ b/server/src/Peripherals/CameraClient.cs @@ -179,12 +179,12 @@ class Camera logger.Trace($"Reading frame from camera {this.address}"); // 使用UDPClientPool读取图像帧数据 - var result = await UDPClientPool.ReadAddr4Bytes( + var result = await UDPClientPool.ReadAddr4BytesAsync( this.ep, this.taskID, // taskID FrameAddr, // ((int)FrameLength), - 1280*720/2, + 1280 * 720 / 2, this.timeout); if (!result.IsSuccessful) @@ -276,7 +276,7 @@ class Camera /// 垂直窗口大小(默认1300) /// 配置结果 public async ValueTask> ConfigureResolution( - UInt16 hStart, UInt16 vStart, + UInt16 hStart, UInt16 vStart, UInt16 dvpHo, UInt16 dvpVo, UInt16 hts, UInt16 vts, UInt16 hOffset, UInt16 vOffset, @@ -285,7 +285,7 @@ class Camera // 计算结束位置 UInt16 hEnd = (UInt16)(hStart + hWindow - 1); UInt16 vEnd = (UInt16)(vStart + vWindow - 1); - + // 计算帧长度 UInt32 frameLength = (UInt32)(dvpHo * dvpVo * 16 / 32); diff --git a/server/src/UdpClientPool.cs b/server/src/UdpClientPool.cs index 8c076f8..6b00db3 100644 --- a/server/src/UdpClientPool.cs +++ b/server/src/UdpClientPool.cs @@ -380,6 +380,113 @@ public class UDPClientPool } + /// + /// 从设备地址读取字节数组数据(支持大数据量分段传输,先发送所有地址包再接收所有数据包) + /// + /// IP端点(IP地址与端口) + /// 任务ID + /// 设备地址 + /// 要读取的数据长度(4字节) + /// 超时时间(毫秒) + /// 读取结果,包含接收到的字节数组 + public static async ValueTask> ReadAddr4BytesAsync( + IPEndPoint endPoint, int taskID, UInt32 devAddr, int dataLength, int timeout = 1000) + { + var optsList = new List(); + var resultData = new List(); + + // Check Msg Bus + if (!MsgBus.IsRunning) + return new(new Exception("Message bus not working!")); + + // Prepare options for each segment + var max4BytesPerRead = 0x80; // 512 bytes per read + var rest4Bytes = dataLength % max4BytesPerRead; + var readTimes = (rest4Bytes != 0) ? + (dataLength / max4BytesPerRead + 1) : + (dataLength / max4BytesPerRead); + + for (var i = 0; i < readTimes; i++) + { + var isLastSegment = i == readTimes - 1; + var currentSegmentSize = (isLastSegment && rest4Bytes != 0) ? rest4Bytes : max4BytesPerRead; + + var opts = new SendAddrPackOptions + { + BurstType = BurstType.FixedBurst, + CommandID = Convert.ToByte(taskID), + IsWrite = false, + BurstLength = (byte)(currentSegmentSize - 1), + Address = devAddr + (uint)(i * max4BytesPerRead) + }; + optsList.Add(opts); + } + + // Send all address packages first, but keep outstanding < 512 + int sentCount = 0; + var startTime = DateTime.Now; + while (sentCount < optsList.Count) + { + // Check how many data packets have been received + var elapsed = DateTime.Now - startTime; + if (elapsed >= TimeSpan.FromMilliseconds(timeout)) + break; + var timeleft = timeout - (int)elapsed.TotalMilliseconds; + + var found = await MsgBus.UDPServer.GetDataCountAsync(endPoint.Address.ToString(), taskID, timeleft); + int outstanding = sentCount - (found.HasValue ? found.Value : 0); + + // If outstanding >= 512, wait for some data to be received + if (outstanding >= 512) + continue; + + // Send next address package + var ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(optsList[sentCount])); + if (!ret) return new(new Exception($"Send address package failed at segment {sentCount}!")); + sentCount++; + } + + // Wait until enough data is received or timeout + startTime = DateTime.Now; + List? udpDatas = null; + while (true) + { + var elapsed = DateTime.Now - startTime; + if (elapsed >= TimeSpan.FromMilliseconds(timeout)) break; + var timeleft = timeout - (int)elapsed.TotalMilliseconds; + + var found = await MsgBus.UDPServer.GetDataCountAsync(endPoint.Address.ToString(), taskID, timeleft); + if (found.HasValue && found.Value >= readTimes) + { + var dataArr = await MsgBus.UDPServer.FindDataArrayAsync(endPoint.Address.ToString(), taskID, timeleft); + if (dataArr.HasValue) + { + udpDatas = dataArr.Value; + break; + } + } + } + + if (udpDatas is null || udpDatas.Count < readTimes) + return new(new Exception($"Expected {readTimes} UDP data packets but received {udpDatas?.Count ?? 0}")); + + // Collect and validate all received data + for (var i = 0; i < udpDatas.Count; i++) + { + var bytes = udpDatas[i].Data; + var expectedLen = ((optsList[i].BurstLength + 1) * 4); + if (bytes.Length != expectedLen) + return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length} bytes at segment {i}")); + resultData.AddRange(bytes); + } + + // Validate total data length + if (resultData.Count != dataLength * 4) + return new(new Exception($"Expected total {dataLength * 4} bytes but received {resultData.Count} bytes")); + + return resultData.ToArray(); + } + /// /// 向设备地址写入32位数据 /// diff --git a/server/src/UdpServer.cs b/server/src/UdpServer.cs index 0075d4f..fb4bd00 100644 --- a/server/src/UdpServer.cs +++ b/server/src/UdpServer.cs @@ -183,15 +183,59 @@ public class UDPServer } } + /// + /// 异步寻找目标发送的所有内容,并清空队列 + /// + /// 目标IP地址 + /// 任务ID + /// 超时时间 + /// 异步Optional 数据包列表 + public async ValueTask>> FindDataArrayAsync(string ipAddr, int taskID, int timeout = 1000) + { + List? data = null; + + var startTime = DateTime.Now; + var isTimeout = false; + var timeleft = TimeSpan.FromMilliseconds(timeout); + while (!isTimeout) + { + var elapsed = DateTime.Now - startTime; + isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); + if (isTimeout) break; + + timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed; + using (await udpData.AcquireWriteLockAsync(timeleft)) + { + if (udpData.ContainsKey($"{ipAddr}-{taskID}") && + udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) && + dataQueue.Count > 0) + { + data = dataQueue.ToList(); + dataQueue.Clear(); + break; + } + } + } + + if (data is null) + { + logger.Trace("Get nothing even after time out"); + return new(null); + } + else + { + return new(data); + } + } + /// /// 获取还未被读取的数据列表 /// /// IP地址 - /// [TODO:parameter] + /// 任务ID /// 超时时间 - /// 延迟时间 /// 数据列表 - public async ValueTask>> GetDataArrayAsync(string ipAddr, int taskID, int timeout = 1000, int cycle = 0) + public async ValueTask>> GetDataArrayAsync(string ipAddr, int taskID, int timeout = 1000) { List? data = null; @@ -229,6 +273,49 @@ public class UDPServer } } + /// + /// 异步获取指定IP和任务ID的数据队列长度 + /// + /// IP地址 + /// 任务ID + /// 超时时间 + /// 数据队列长度 + public async ValueTask> GetDataCountAsync(string ipAddr, int taskID, int timeout = 1000) + { + int? count = null; + + var startTime = DateTime.Now; + var isTimeout = false; + var timeleft = TimeSpan.FromMilliseconds(timeout); + while (!isTimeout) + { + var elapsed = DateTime.Now - startTime; + isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout); + if (isTimeout) break; + + timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed; + using (await udpData.AcquireReadLockAsync(timeleft)) + { + if (udpData.ContainsKey($"{ipAddr}-{taskID}") && + udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue)) + { + count = dataQueue.Count; + break; + } + } + } + + if (count is null) + { + logger.Trace("Get nothing even after time out"); + return Optional.None; + } + else + { + return new(count.Value); + } + } + /// /// 异步等待写响应 ///