diff --git a/server/src/UdpClientPool.cs b/server/src/UdpClientPool.cs index 6b00db3..d9267b6 100644 --- a/server/src/UdpClientPool.cs +++ b/server/src/UdpClientPool.cs @@ -110,6 +110,46 @@ public class UDPClientPool return await Task.Run(() => { return SendAddrPack(endPoint, pkg); }); } + /// + /// 发送多个地址包 + /// + /// IP端点(IP地址与端口) + /// 地址包集合(最多512 / 8) + /// 是否全部成功 + public static bool SendMultiAddrPack(IPEndPoint endPoint, IEnumerable pkgs) + { + const int maxPkgs = 512 / 8; + var pkgList = pkgs.Take(maxPkgs).ToList(); + if (pkgList.Count == 0) return false; + + // 合并所有包为一个buffer + int totalLen = pkgList.Sum(pkg => pkg.ToBytes().Length); + byte[] buffer = new byte[totalLen]; + int offset = 0; + foreach (var pkg in pkgList) + { + var bytes = pkg.ToBytes(); + Buffer.BlockCopy(bytes, 0, buffer, offset, bytes.Length); + offset += bytes.Length; + } + + Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + var sendLen = socket.SendTo(buffer.ToArray(), endPoint); + socket.Close(); + + return sendLen == buffer.Length; + } + + /// + /// 异步发送多个地址包 + /// + /// IP端点(IP地址与端口) + /// 地址包集合(最多512 / 8) + /// 是否全部成功 + public async static ValueTask SendMultiAddrPackAsync(IPEndPoint endPoint, IEnumerable pkgs) + { + return await Task.Run(() => SendMultiAddrPack(endPoint, pkgs)); + } /// /// 发送数据包 @@ -392,14 +432,14 @@ public class UDPClientPool public static async ValueTask> ReadAddr4BytesAsync( IPEndPoint endPoint, int taskID, UInt32 devAddr, int dataLength, int timeout = 1000) { - var optsList = new List(); + var pkgList = new List(); var resultData = new List(); // Check Msg Bus if (!MsgBus.IsRunning) return new(new Exception("Message bus not working!")); - // Prepare options for each segment + // Prepare packages for each segment var max4BytesPerRead = 0x80; // 512 bytes per read var rest4Bytes = dataLength % max4BytesPerRead; var readTimes = (rest4Bytes != 0) ? @@ -419,15 +459,15 @@ public class UDPClientPool BurstLength = (byte)(currentSegmentSize - 1), Address = devAddr + (uint)(i * max4BytesPerRead) }; - optsList.Add(opts); + pkgList.Add(new SendAddrPackage(opts)); } - // Send all address packages first, but keep outstanding < 512 + // Send address packages in batches of 128, control outstanding int sentCount = 0; var startTime = DateTime.Now; - while (sentCount < optsList.Count) + const int batchSize = 128; + while (sentCount < pkgList.Count) { - // Check how many data packets have been received var elapsed = DateTime.Now - startTime; if (elapsed >= TimeSpan.FromMilliseconds(timeout)) break; @@ -436,14 +476,16 @@ public class UDPClientPool var found = await MsgBus.UDPServer.GetDataCountAsync(endPoint.Address.ToString(), taskID, timeleft); int outstanding = sentCount - (found.HasValue ? found.Value : 0); - // If outstanding >= 512, wait for some data to be received - if (outstanding >= 512) + // If outstanding >= 512 - batchSize, wait for some data to be received + if (outstanding >= 512 - batchSize) continue; - // Send next address package - var ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(optsList[sentCount])); - if (!ret) return new(new Exception($"Send address package failed at segment {sentCount}!")); - sentCount++; + // Send next batch of address packages (up to 128) + int batchSend = Math.Min(batchSize, pkgList.Count - sentCount); + var batchPkgs = pkgList.Skip(sentCount).Take(batchSend); + var ret = await UDPClientPool.SendMultiAddrPackAsync(endPoint, batchPkgs); + if (!ret) return new(new Exception($"Send address package batch failed at segment {sentCount}!")); + sentCount += batchSend; } // Wait until enough data is received or timeout @@ -474,7 +516,7 @@ public class UDPClientPool for (var i = 0; i < udpDatas.Count; i++) { var bytes = udpDatas[i].Data; - var expectedLen = ((optsList[i].BurstLength + 1) * 4); + var expectedLen = ((pkgList[i].Options.BurstLength + 1) * 4); if (bytes.Length != expectedLen) return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length} bytes at segment {i}")); resultData.AddRange(bytes);