feat: 使用发送多个地址包来改善大数据读取的速度

This commit is contained in:
SikongJueluo 2025-07-13 13:53:29 +08:00
parent bad64bdfbd
commit 8221f8e133
No known key found for this signature in database
1 changed files with 55 additions and 13 deletions

View File

@ -110,6 +110,46 @@ public class UDPClientPool
return await Task.Run(() => { return SendAddrPack(endPoint, pkg); });
}
/// <summary>
/// 发送多个地址包
/// </summary>
/// <param name="endPoint">IP端点IP地址与端口</param>
/// <param name="pkgs">地址包集合(最多512 / 8)</param>
/// <returns>是否全部成功</returns>
public static bool SendMultiAddrPack(IPEndPoint endPoint, IEnumerable<WebProtocol.SendAddrPackage> pkgs)
{
const int maxPkgs = 512 / 8;
var pkgList = pkgs.Take(maxPkgs).ToList();
if (pkgList.Count == 0) return false;
// 合并所有包为一个buffer
int totalLen = pkgList.Sum(pkg => pkg.ToBytes().Length);
byte[] buffer = new byte[totalLen];
int offset = 0;
foreach (var pkg in pkgList)
{
var bytes = pkg.ToBytes();
Buffer.BlockCopy(bytes, 0, buffer, offset, bytes.Length);
offset += bytes.Length;
}
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
var sendLen = socket.SendTo(buffer.ToArray(), endPoint);
socket.Close();
return sendLen == buffer.Length;
}
/// <summary>
/// 异步发送多个地址包
/// </summary>
/// <param name="endPoint">IP端点IP地址与端口</param>
/// <param name="pkgs">地址包集合(最多512 / 8)</param>
/// <returns>是否全部成功</returns>
public async static ValueTask<bool> SendMultiAddrPackAsync(IPEndPoint endPoint, IEnumerable<WebProtocol.SendAddrPackage> pkgs)
{
return await Task.Run(() => SendMultiAddrPack(endPoint, pkgs));
}
/// <summary>
/// 发送数据包
@ -392,14 +432,14 @@ public class UDPClientPool
public static async ValueTask<Result<byte[]>> ReadAddr4BytesAsync(
IPEndPoint endPoint, int taskID, UInt32 devAddr, int dataLength, int timeout = 1000)
{
var optsList = new List<SendAddrPackOptions>();
var pkgList = new List<SendAddrPackage>();
var resultData = new List<byte>();
// Check Msg Bus
if (!MsgBus.IsRunning)
return new(new Exception("Message bus not working!"));
// Prepare options for each segment
// Prepare packages for each segment
var max4BytesPerRead = 0x80; // 512 bytes per read
var rest4Bytes = dataLength % max4BytesPerRead;
var readTimes = (rest4Bytes != 0) ?
@ -419,15 +459,15 @@ public class UDPClientPool
BurstLength = (byte)(currentSegmentSize - 1),
Address = devAddr + (uint)(i * max4BytesPerRead)
};
optsList.Add(opts);
pkgList.Add(new SendAddrPackage(opts));
}
// Send all address packages first, but keep outstanding < 512
// Send address packages in batches of 128, control outstanding
int sentCount = 0;
var startTime = DateTime.Now;
while (sentCount < optsList.Count)
const int batchSize = 128;
while (sentCount < pkgList.Count)
{
// Check how many data packets have been received
var elapsed = DateTime.Now - startTime;
if (elapsed >= TimeSpan.FromMilliseconds(timeout))
break;
@ -436,14 +476,16 @@ public class UDPClientPool
var found = await MsgBus.UDPServer.GetDataCountAsync(endPoint.Address.ToString(), taskID, timeleft);
int outstanding = sentCount - (found.HasValue ? found.Value : 0);
// If outstanding >= 512, wait for some data to be received
if (outstanding >= 512)
// If outstanding >= 512 - batchSize, wait for some data to be received
if (outstanding >= 512 - batchSize)
continue;
// Send next address package
var ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(optsList[sentCount]));
if (!ret) return new(new Exception($"Send address package failed at segment {sentCount}!"));
sentCount++;
// Send next batch of address packages (up to 128)
int batchSend = Math.Min(batchSize, pkgList.Count - sentCount);
var batchPkgs = pkgList.Skip(sentCount).Take(batchSend);
var ret = await UDPClientPool.SendMultiAddrPackAsync(endPoint, batchPkgs);
if (!ret) return new(new Exception($"Send address package batch failed at segment {sentCount}!"));
sentCount += batchSend;
}
// Wait until enough data is received or timeout
@ -474,7 +516,7 @@ public class UDPClientPool
for (var i = 0; i < udpDatas.Count; i++)
{
var bytes = udpDatas[i].Data;
var expectedLen = ((optsList[i].BurstLength + 1) * 4);
var expectedLen = ((pkgList[i].Options.BurstLength + 1) * 4);
if (bytes.Length != expectedLen)
return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length} bytes at segment {i}"));
resultData.AddRange(bytes);