feat: 实现udp多端口

This commit is contained in:
SikongJueluo 2025-07-14 14:01:08 +08:00
parent 2894ee24be
commit ca906489c2
No known key found for this signature in database
9 changed files with 53 additions and 92 deletions

View File

@ -3,7 +3,7 @@
/// </summary> /// </summary>
public static class MsgBus public static class MsgBus
{ {
private static readonly UDPServer udpServer = new UDPServer(1234); private static readonly UDPServer udpServer = new UDPServer(1234, 10);
/// <summary> /// <summary>
/// 获取UDP服务器 /// 获取UDP服务器
/// </summary> /// </summary>

View File

@ -224,7 +224,7 @@ class Camera
// 读取失败时清除缓冲区,为下次读取做准备 // 读取失败时清除缓冲区,为下次读取做准备
try try
{ {
await MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); MsgBus.UDPServer.ClearUDPData(this.address, this.taskID);
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@ -108,7 +108,7 @@ public class DDS
if (waveNum < 0 || waveNum > 3) return new(new ArgumentException( if (waveNum < 0 || waveNum > 3) return new(new ArgumentException(
$"Wave number should be 0 ~ 3 instead of {waveNum}", nameof(waveNum))); $"Wave number should be 0 ~ 3 instead of {waveNum}", nameof(waveNum)));
await MsgBus.UDPServer.ClearUDPData(this.address, 1); MsgBus.UDPServer.ClearUDPData(this.address, 1);
logger.Trace("Clear udp data finished"); logger.Trace("Clear udp data finished");
var ret = await UDPClientPool.WriteAddr( var ret = await UDPClientPool.WriteAddr(
@ -132,7 +132,7 @@ public class DDS
if (waveNum < 0 || waveNum > 3) return new(new ArgumentException( if (waveNum < 0 || waveNum > 3) return new(new ArgumentException(
$"Wave number should be 0 ~ 3 instead of {waveNum}", nameof(waveNum))); $"Wave number should be 0 ~ 3 instead of {waveNum}", nameof(waveNum)));
await MsgBus.UDPServer.ClearUDPData(this.address, 1); MsgBus.UDPServer.ClearUDPData(this.address, 1);
logger.Trace("Clear udp data finished"); logger.Trace("Clear udp data finished");
var ret = await UDPClientPool.WriteAddr( var ret = await UDPClientPool.WriteAddr(
@ -158,7 +158,7 @@ public class DDS
if (phase < 0 || phase > 4096) return new(new ArgumentException( if (phase < 0 || phase > 4096) return new(new ArgumentException(
$"Phase should be 0 ~ 4096 instead of {phase}", nameof(phase))); $"Phase should be 0 ~ 4096 instead of {phase}", nameof(phase)));
await MsgBus.UDPServer.ClearUDPData(this.address, 1); MsgBus.UDPServer.ClearUDPData(this.address, 1);
logger.Trace("Clear udp data finished"); logger.Trace("Clear udp data finished");
var ret = await UDPClientPool.WriteAddr( var ret = await UDPClientPool.WriteAddr(

View File

@ -109,7 +109,7 @@ public class I2c
} }
// 清除UDP服务器接收缓冲区 // 清除UDP服务器接收缓冲区
await MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); MsgBus.UDPServer.ClearUDPData(this.address, this.taskID);
logger.Trace($"Clear up udp server {this.address} receive data"); logger.Trace($"Clear up udp server {this.address} receive data");
@ -207,7 +207,7 @@ public class I2c
} }
// 清除UDP服务器接收缓冲区 // 清除UDP服务器接收缓冲区
await MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); MsgBus.UDPServer.ClearUDPData(this.address, this.taskID);
logger.Trace($"Clear up udp server {this.address} receive data"); logger.Trace($"Clear up udp server {this.address} receive data");

View File

@ -627,7 +627,7 @@ public class Jtag
public async ValueTask<Result<uint>> ReadIDCode() public async ValueTask<Result<uint>> ReadIDCode()
{ {
// Clear Data // Clear Data
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace($"Clear up udp server {this.address,0} receive data"); logger.Trace($"Clear up udp server {this.address,0} receive data");
@ -665,7 +665,7 @@ public class Jtag
public async ValueTask<Result<uint>> ReadStatusReg() public async ValueTask<Result<uint>> ReadStatusReg()
{ {
// Clear Data // Clear Data
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace($"Clear up udp server {this.address,0} receive data"); logger.Trace($"Clear up udp server {this.address,0} receive data");
@ -702,7 +702,7 @@ public class Jtag
public async ValueTask<Result<bool>> DownloadBitstream(byte[] bitstream) public async ValueTask<Result<bool>> DownloadBitstream(byte[] bitstream)
{ {
// Clear Data // Clear Data
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace($"Clear up udp server {this.address,0} receive data"); logger.Trace($"Clear up udp server {this.address,0} receive data");
@ -786,7 +786,7 @@ public class Jtag
logger.Debug($"Get boundar scan registers number: {portNum}"); logger.Debug($"Get boundar scan registers number: {portNum}");
// Clear Data // Clear Data
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace($"Clear up udp server {this.address,0} receive data"); logger.Trace($"Clear up udp server {this.address,0} receive data");
@ -853,7 +853,7 @@ public class Jtag
public async ValueTask<Result<bool>> SetSpeed(UInt32 speed) public async ValueTask<Result<bool>> SetSpeed(UInt32 speed)
{ {
// Clear Data // Clear Data
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace($"Clear up udp server {this.address,0} receive data"); logger.Trace($"Clear up udp server {this.address,0} receive data");

View File

@ -44,7 +44,7 @@ public class MatrixKey
public async ValueTask<Result<bool>> EnableControl() public async ValueTask<Result<bool>> EnableControl()
{ {
if (MsgBus.IsRunning) if (MsgBus.IsRunning)
await MsgBus.UDPServer.ClearUDPData(this.address, 1); MsgBus.UDPServer.ClearUDPData(this.address, 1);
else return new(new Exception("Message Bus not work!")); else return new(new Exception("Message Bus not work!"));
var ret = await UDPClientPool.WriteAddr(this.ep, 1, MatrixKeyAddr.KEY_ENABLE, 1, this.timeout); var ret = await UDPClientPool.WriteAddr(this.ep, 1, MatrixKeyAddr.KEY_ENABLE, 1, this.timeout);
@ -59,7 +59,7 @@ public class MatrixKey
public async ValueTask<Result<bool>> DisableControl() public async ValueTask<Result<bool>> DisableControl()
{ {
if (MsgBus.IsRunning) if (MsgBus.IsRunning)
await MsgBus.UDPServer.ClearUDPData(this.address, 1); MsgBus.UDPServer.ClearUDPData(this.address, 1);
else return new(new Exception("Message Bus not work!")); else return new(new Exception("Message Bus not work!"));
var ret = await UDPClientPool.WriteAddr(this.ep, 1, MatrixKeyAddr.KEY_ENABLE, 0, this.timeout); var ret = await UDPClientPool.WriteAddr(this.ep, 1, MatrixKeyAddr.KEY_ENABLE, 0, this.timeout);
@ -75,7 +75,7 @@ public class MatrixKey
public async ValueTask<Result<bool>> ControlKey(BitArray keyStates) public async ValueTask<Result<bool>> ControlKey(BitArray keyStates)
{ {
if (MsgBus.IsRunning) if (MsgBus.IsRunning)
await MsgBus.UDPServer.ClearUDPData(this.address, 1); MsgBus.UDPServer.ClearUDPData(this.address, 1);
else return new(new Exception("Message Bus not work!")); else return new(new Exception("Message Bus not work!"));
if (keyStates.Length != 16) return new(new ArgumentException( if (keyStates.Length != 16) return new(new ArgumentException(

View File

@ -45,7 +45,7 @@ public class Power
public async ValueTask<Result<bool>> SetPowerOnOff(bool enable) public async ValueTask<Result<bool>> SetPowerOnOff(bool enable)
{ {
if (MsgBus.IsRunning) if (MsgBus.IsRunning)
await MsgBus.UDPServer.ClearUDPData(this.address, 1); MsgBus.UDPServer.ClearUDPData(this.address, 1);
else return new(new Exception("Message Bus not work!")); else return new(new Exception("Message Bus not work!"));
var ret = await UDPClientPool.WriteAddr(this.ep, 1, PowerAddr.PowerCtrl, Convert.ToUInt32(enable), this.timeout); var ret = await UDPClientPool.WriteAddr(this.ep, 1, PowerAddr.PowerCtrl, Convert.ToUInt32(enable), this.timeout);

View File

@ -382,7 +382,7 @@ public class RemoteUpdater
/// <returns>[TODO:return]</returns> /// <returns>[TODO:return]</returns>
public async ValueTask<Result<bool>> HotResetBitstream(int bitstreamNum) public async ValueTask<Result<bool>> HotResetBitstream(int bitstreamNum)
{ {
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace("Clear udp data finished"); logger.Trace("Clear udp data finished");
{ {
@ -412,7 +412,7 @@ public class RemoteUpdater
byte[]? bitstream2, byte[]? bitstream2,
byte[]? bitstream3) byte[]? bitstream3)
{ {
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace("Clear udp data finished"); logger.Trace("Clear udp data finished");
for (int bitstreamNum = 0; bitstreamNum < 4; bitstreamNum++) for (int bitstreamNum = 0; bitstreamNum < 4; bitstreamNum++)
@ -463,7 +463,7 @@ public class RemoteUpdater
$"The length of data should be divided by 4096, bug given {bytesData.Length}", nameof(bytesData))); $"The length of data should be divided by 4096, bug given {bytesData.Length}", nameof(bytesData)));
var bitstreamBlockNum = bytesData.Length / (4 * 1024); var bitstreamBlockNum = bytesData.Length / (4 * 1024);
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace("Clear udp data finished"); logger.Trace("Clear udp data finished");
{ {
@ -539,7 +539,7 @@ public class RemoteUpdater
/// <returns>[TODO:return]</returns> /// <returns>[TODO:return]</returns>
public async ValueTask<Result<UInt32>> GetVersion() public async ValueTask<Result<UInt32>> GetVersion()
{ {
await MsgBus.UDPServer.ClearUDPData(this.address, 0); MsgBus.UDPServer.ClearUDPData(this.address, 0);
logger.Trace("Clear udp data finished"); logger.Trace("Clear udp data finished");
{ {

View File

@ -74,12 +74,8 @@ public class UDPServer
private ConcurrentDictionary<string, ConcurrentQueue<UDPData>> udpData = new ConcurrentDictionary<string, ConcurrentQueue<UDPData>>(); private ConcurrentDictionary<string, ConcurrentQueue<UDPData>> udpData = new ConcurrentDictionary<string, ConcurrentQueue<UDPData>>();
const int parallelTaskNum = 5;
Task[] ReceiveTasks = new Task[parallelTaskNum];
private int runningTaskNum = 0;
private int listenPort; private int listenPort;
private UdpClient listener; private List<UdpClient> listeners = new List<UdpClient>();
private IPEndPoint groupEP; private IPEndPoint groupEP;
private bool isRunning = false; private bool isRunning = false;
@ -105,15 +101,19 @@ public class UDPServer
/// Construct a udp server with fixed port /// Construct a udp server with fixed port
/// </summary> /// </summary>
/// <param name="port"> Device UDP Port </param> /// <param name="port"> Device UDP Port </param>
/// <param name="num"> UDP Client Num </param>
/// <returns> UDPServer class </returns> /// <returns> UDPServer class </returns>
public UDPServer(int port) public UDPServer(int port, int num)
{ {
// Construction // Construction
listenPort = port; this.listenPort = port;
try try
{ {
listener = new UdpClient(listenPort); for (int i = 0; i < num; i++)
groupEP = new IPEndPoint(IPAddress.Any, listenPort); {
listeners.Add(new UdpClient(this.listenPort + i));
}
this.groupEP = new IPEndPoint(IPAddress.Any, listenPort);
} }
catch (Exception e) catch (Exception e)
{ {
@ -346,54 +346,24 @@ public class UDPServer
return retPack.Value; return retPack.Value;
} }
private void ReceiveHandler(IAsyncResult res) private void ReceiveHandler(byte[] data, IPEndPoint endPoint)
{ {
var remoteEP = new IPEndPoint(IPAddress.Any, listenPort);
byte[] bytes = listener.EndReceive(res, ref remoteEP);
// 继续异步接收
if (isRunning)
{
listener.BeginReceive(new AsyncCallback(ReceiveHandler), null);
}
else
{
runningTaskNum--;
}
// Handle RemoteEP // Handle RemoteEP
if (remoteEP is null) if (endPoint is null)
{ {
logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:"); logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:");
logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}"); logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}");
return; return;
} }
// 异步处理数据包 // 异步处理数据包
Task.Run(() => Task.Run(() =>
{ {
var udpData = RecordUDPData(bytes, remoteEP, Convert.ToInt32(bytes[1])); var udpData = RecordUDPData(data, endPoint, Convert.ToInt32(data[1]));
PrintData(udpData); PrintData(udpData);
}); });
} }
private bool SendBytes(IPEndPoint endPoint, byte[] buf)
{
var sendLen = listener.Send(buf, endPoint);
if (sendLen == buf.Length) { return true; }
else { return false; }
}
private bool SendString(IPEndPoint endPoint, string text)
{
byte[] buf = Encoding.ASCII.GetBytes(text);
var sendLen = listener.Send(buf, endPoint);
if (sendLen == buf.Length) { return true; }
else { return false; }
}
private UDPData RecordUDPData(byte[] bytes, IPEndPoint remoteEP, int taskID) private UDPData RecordUDPData(byte[] bytes, IPEndPoint remoteEP, int taskID)
{ {
var remoteAddress = remoteEP.Address.ToString(); var remoteAddress = remoteEP.Address.ToString();
@ -484,7 +454,7 @@ public class UDPServer
/// <param name="ipAddr">IP地址</param> /// <param name="ipAddr">IP地址</param>
/// <param name="taskID">[TODO:parameter]</param> /// <param name="taskID">[TODO:parameter]</param>
/// <returns>无</returns> /// <returns>无</returns>
public async Task ClearUDPData(string ipAddr, int taskID) public void ClearUDPData(string ipAddr, int taskID)
{ {
var key = $"{ipAddr}-{taskID}"; var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var dataQueue)) if (udpData.TryGetValue(key, out var dataQueue))
@ -503,20 +473,25 @@ public class UDPServer
this.isRunning = true; this.isRunning = true;
try try
{ {
Task.Run(() => foreach (var client in listeners)
{ {
while (isRunning) Task.Run(async () =>
{ {
if (runningTaskNum < parallelTaskNum) while (this.isRunning)
{ {
StartReceive(); try
logger.Debug($"Begin Receive Task, Now Running Num: {runningTaskNum + 1}"); {
runningTaskNum++; UdpReceiveResult result = await client.ReceiveAsync();
ReceiveHandler(result.Buffer, result.RemoteEndPoint);
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
} }
Task.Delay(100).Wait();
} }
}); });
} }
}
catch (Exception e) catch (Exception e)
{ {
Console.WriteLine(e.ToString()); Console.WriteLine(e.ToString());
@ -524,30 +499,16 @@ public class UDPServer
} }
} }
private void StartReceive(int times = 5)
{
for (int i = 0; i < times; i++)
{
try
{
this.listener.BeginReceive(new AsyncCallback(ReceiveHandler), null);
break; // BeginReceive is async, break after scheduling
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
/// <summary> /// <summary>
/// Close UDP Server /// Close UDP Server
/// </summary> /// </summary>
/// <returns>None</returns> /// <returns>None</returns>
public void Stop() public void Stop()
{ {
this.listener.Close(); foreach (var item in listeners)
{
item.Close();
}
this.isRunning = false; this.isRunning = false;
} }
} }