feat: 实现udp并发接受数据??

This commit is contained in:
SikongJueluo 2025-07-14 13:30:16 +08:00
parent 6068a10d67
commit 2894ee24be
No known key found for this signature in database
2 changed files with 354 additions and 337 deletions

View File

@ -237,6 +237,33 @@ class Camera
return result.Value;
}
/// <summary>
/// 读取摄像头寄存器
/// </summary>
/// <param name="registerAddr">寄存器地址</param>
/// <returns>读取到的寄存器值</returns>
private async ValueTask<Result<byte>> ReadRegister(UInt16 registerAddr)
{
var i2c = new Peripherals.I2cClient.I2c(this.address, this.port, this.taskID, this.timeout);
// 地址高低字节
var addrBytes = new byte[2];
addrBytes[0] = (byte)(registerAddr >> 8);
addrBytes[1] = (byte)(registerAddr & 0xFF);
// 先写寄存器地址
var writeResult = await i2c.WriteData(CAM_I2C_ADDR, addrBytes, CAM_PROTO);
if (!writeResult.IsSuccessful)
return new(writeResult.Error);
// 再读一个字节
var readResult = await i2c.ReadData(CAM_I2C_ADDR, 1, CAM_PROTO);
if (!readResult.IsSuccessful)
return new(readResult.Error);
return readResult.Value[0];
}
/// <summary>
/// 批量配置I2C寄存器
/// </summary>

View File

@ -1,9 +1,9 @@
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Text;
using DotNext;
using DotNext.Threading;
using Newtonsoft.Json;
using WebProtocol;
@ -72,9 +72,11 @@ public class UDPServer
{
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
private Dictionary<string, Queue<UDPData>> udpData = new Dictionary<string, Queue<UDPData>>();
private ConcurrentDictionary<string, ConcurrentQueue<UDPData>> udpData = new ConcurrentDictionary<string, ConcurrentQueue<UDPData>>();
private Semaphore taskPool = new Semaphore(3, 3);
const int parallelTaskNum = 5;
Task[] ReceiveTasks = new Task[parallelTaskNum];
private int runningTaskNum = 0;
private int listenPort;
private UdpClient listener;
@ -145,29 +147,20 @@ public class UDPServer
{
UDPData? data = null;
// logger.Debug($"Caller \"{callerName}|{callerLineNum}\": Try to find {ipAddr}-{taskID} UDP Data");
var startTime = DateTime.Now;
var isTimeout = false;
var timeleft = TimeSpan.FromMilliseconds(timeout);
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed;
using (await udpData.AcquireWriteLockAsync(timeleft))
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
dataQueue.TryDequeue(out data))
{
if (udpData.ContainsKey($"{ipAddr}-{taskID}") &&
udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
dataQueue.Count > 0)
{
data = dataQueue.Dequeue();
// logger.Debug($"Find UDP Data: {data.ToString()}");
break;
}
}
await Task.Delay(cycle);
}
@ -196,24 +189,21 @@ public class UDPServer
var startTime = DateTime.Now;
var isTimeout = false;
var timeleft = TimeSpan.FromMilliseconds(timeout);
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed;
using (await udpData.AcquireWriteLockAsync(timeleft))
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
!dataQueue.IsEmpty)
{
if (udpData.ContainsKey($"{ipAddr}-{taskID}") &&
udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
dataQueue.Count > 0)
data = new List<UDPData>();
while (dataQueue.TryDequeue(out var item))
{
data = dataQueue.ToList();
dataQueue.Clear();
break;
data.Add(item);
}
break;
}
}
@ -241,26 +231,20 @@ public class UDPServer
var startTime = DateTime.Now;
var isTimeout = false;
var timeleft = TimeSpan.FromMilliseconds(timeout);
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed;
using (await udpData.AcquireReadLockAsync(timeleft))
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
!dataQueue.IsEmpty)
{
if (udpData.ContainsKey($"{ipAddr}-{taskID}") &&
udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
dataQueue.Count > 0)
{
data = dataQueue.ToList();
data = dataQueue.ToArray().ToList();
// logger.Debug($"Find UDP Data Array: {JsonConvert.SerializeObject(data)}");
break;
}
}
}
if (data is null)
{
@ -286,24 +270,18 @@ public class UDPServer
var startTime = DateTime.Now;
var isTimeout = false;
var timeleft = TimeSpan.FromMilliseconds(timeout);
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed;
using (await udpData.AcquireReadLockAsync(timeleft))
{
if (udpData.ContainsKey($"{ipAddr}-{taskID}") &&
udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue))
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue))
{
count = dataQueue.Count;
break;
}
}
}
if (count is null)
{
@ -367,17 +345,21 @@ public class UDPServer
return retPack.Value;
}
static int ReceiveHandleCcount = 0;
private void ReceiveHandler(IAsyncResult res)
{
var remoteEP = new IPEndPoint(IPAddress.Any, listenPort);
byte[] bytes = listener.EndReceive(res, ref remoteEP);
// 提前开始接收下一个包
// 继续异步接收
if (isRunning)
{
listener.BeginReceive(new AsyncCallback(ReceiveHandler), null);
logger.Debug($"Test ReceiveHandler Count = {ReceiveHandleCcount}");
ReceiveHandleCcount++;
}
else
{
runningTaskNum--;
}
// Handle RemoteEP
if (remoteEP is null)
@ -426,23 +408,11 @@ public class UDPServer
HasRead = false,
};
using (udpData.AcquireWriteLock())
{
// Record UDP Receive Data
if (udpData.ContainsKey($"{remoteAddress}-{taskID}") &&
udpData.TryGetValue($"{remoteAddress}-{taskID}", out var dataQueue))
{
var key = $"{remoteAddress}-{taskID}";
var dataQueue = udpData.GetOrAdd(key, _ => new ConcurrentQueue<UDPData>());
dataQueue.Enqueue(data);
logger.Debug($"Test Lock dataQueue.Count = {dataQueue.Count}");
}
else
{
var queue = new Queue<UDPData>();
queue.Enqueue(data);
udpData.Add($"{remoteAddress}-{taskID}", queue);
logger.Trace("Receive data from new client");
}
}
logger.Debug($"Test dataQueue.Count = {dataQueue.Count}");
return data;
}
@ -497,16 +467,13 @@ public class UDPServer
/// <returns> void </returns>
public void PrintAllData()
{
using (udpData.AcquireReadLock())
{
// logger.Debug("Ready Data:");
logger.Debug("Ready Data:");
foreach (var ip in udpData)
foreach (var kvp in udpData)
{
foreach (var data in ip.Value)
foreach (var data in kvp.Value)
{
// logger.Debug(data.ToString());
}
logger.Debug(data.ToString());
}
}
}
@ -519,17 +486,13 @@ public class UDPServer
/// <returns>无</returns>
public async Task ClearUDPData(string ipAddr, int taskID)
{
using (await udpData.AcquireWriteLockAsync())
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var dataQueue))
{
if (udpData.ContainsKey($"{ipAddr}-{taskID}") &&
udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
dataQueue.Count > 0)
{
dataQueue.Clear();
// 清空队列的最有效方式是替换为新的队列
udpData.TryUpdate(key, new ConcurrentQueue<UDPData>(), dataQueue);
}
}
}
/// <summary>
/// Start UDP Server
@ -537,17 +500,44 @@ public class UDPServer
/// <returns>None</returns>
public void Start()
{
this.isRunning = true;
try
{
this.listener.BeginReceive(new AsyncCallback(ReceiveHandler), null);
Task.Run(() =>
{
while (isRunning)
{
if (runningTaskNum < parallelTaskNum)
{
StartReceive();
logger.Debug($"Begin Receive Task, Now Running Num: {runningTaskNum + 1}");
runningTaskNum++;
}
Task.Delay(100).Wait();
}
});
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
this.isRunning = false;
}
finally
}
private void StartReceive(int times = 5)
{
this.isRunning = true;
for (int i = 0; i < times; i++)
{
try
{
this.listener.BeginReceive(new AsyncCallback(ReceiveHandler), null);
break; // BeginReceive is async, break after scheduling
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}