diff --git a/server/src/MsgBus.cs b/server/src/MsgBus.cs index f087f03..0cb0c00 100644 --- a/server/src/MsgBus.cs +++ b/server/src/MsgBus.cs @@ -3,7 +3,7 @@ /// public static class MsgBus { - private static readonly UDPServer udpServer = new UDPServer(1234); + private static readonly UDPServer udpServer = new UDPServer(1234, 10); /// /// 获取UDP服务器 /// diff --git a/server/src/Peripherals/CameraClient.cs b/server/src/Peripherals/CameraClient.cs index e4a7d5f..d2e4408 100644 --- a/server/src/Peripherals/CameraClient.cs +++ b/server/src/Peripherals/CameraClient.cs @@ -224,7 +224,7 @@ class Camera // 读取失败时清除缓冲区,为下次读取做准备 try { - await MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); + MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); } catch (Exception ex) { diff --git a/server/src/Peripherals/DDSClient.cs b/server/src/Peripherals/DDSClient.cs index 1dc4d68..e7d1fd9 100644 --- a/server/src/Peripherals/DDSClient.cs +++ b/server/src/Peripherals/DDSClient.cs @@ -108,7 +108,7 @@ public class DDS if (waveNum < 0 || waveNum > 3) return new(new ArgumentException( $"Wave number should be 0 ~ 3 instead of {waveNum}", nameof(waveNum))); - await MsgBus.UDPServer.ClearUDPData(this.address, 1); + MsgBus.UDPServer.ClearUDPData(this.address, 1); logger.Trace("Clear udp data finished"); var ret = await UDPClientPool.WriteAddr( @@ -132,7 +132,7 @@ public class DDS if (waveNum < 0 || waveNum > 3) return new(new ArgumentException( $"Wave number should be 0 ~ 3 instead of {waveNum}", nameof(waveNum))); - await MsgBus.UDPServer.ClearUDPData(this.address, 1); + MsgBus.UDPServer.ClearUDPData(this.address, 1); logger.Trace("Clear udp data finished"); var ret = await UDPClientPool.WriteAddr( @@ -158,7 +158,7 @@ public class DDS if (phase < 0 || phase > 4096) return new(new ArgumentException( $"Phase should be 0 ~ 4096 instead of {phase}", nameof(phase))); - await MsgBus.UDPServer.ClearUDPData(this.address, 1); + MsgBus.UDPServer.ClearUDPData(this.address, 1); logger.Trace("Clear udp data finished"); var ret = await UDPClientPool.WriteAddr( diff --git a/server/src/Peripherals/I2cClient.cs b/server/src/Peripherals/I2cClient.cs index 66ca4d4..025b8bc 100644 --- a/server/src/Peripherals/I2cClient.cs +++ b/server/src/Peripherals/I2cClient.cs @@ -82,7 +82,7 @@ public class I2c /// [TODO:parameter] /// [TODO:parameter] /// [TODO:return] - public I2c(string address, int port, int taskID,int timeout = 2000) + public I2c(string address, int port, int taskID, int timeout = 2000) { if (timeout < 0) throw new ArgumentException("Timeout couldn't be negative", nameof(timeout)); @@ -109,7 +109,7 @@ public class I2c } // 清除UDP服务器接收缓冲区 - await MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); + MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); logger.Trace($"Clear up udp server {this.address} receive data"); @@ -207,7 +207,7 @@ public class I2c } // 清除UDP服务器接收缓冲区 - await MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); + MsgBus.UDPServer.ClearUDPData(this.address, this.taskID); logger.Trace($"Clear up udp server {this.address} receive data"); diff --git a/server/src/Peripherals/JtagClient.cs b/server/src/Peripherals/JtagClient.cs index 813455e..69d807a 100644 --- a/server/src/Peripherals/JtagClient.cs +++ b/server/src/Peripherals/JtagClient.cs @@ -627,7 +627,7 @@ public class Jtag public async ValueTask> ReadIDCode() { // Clear Data - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace($"Clear up udp server {this.address,0} receive data"); @@ -665,7 +665,7 @@ public class Jtag public async ValueTask> ReadStatusReg() { // Clear Data - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace($"Clear up udp server {this.address,0} receive data"); @@ -702,7 +702,7 @@ public class Jtag public async ValueTask> DownloadBitstream(byte[] bitstream) { // Clear Data - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace($"Clear up udp server {this.address,0} receive data"); @@ -786,7 +786,7 @@ public class Jtag logger.Debug($"Get boundar scan registers number: {portNum}"); // Clear Data - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace($"Clear up udp server {this.address,0} receive data"); @@ -853,7 +853,7 @@ public class Jtag public async ValueTask> SetSpeed(UInt32 speed) { // Clear Data - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace($"Clear up udp server {this.address,0} receive data"); diff --git a/server/src/Peripherals/MatrixKeyClient.cs b/server/src/Peripherals/MatrixKeyClient.cs index 99a035f..d952f47 100644 --- a/server/src/Peripherals/MatrixKeyClient.cs +++ b/server/src/Peripherals/MatrixKeyClient.cs @@ -44,7 +44,7 @@ public class MatrixKey public async ValueTask> EnableControl() { if (MsgBus.IsRunning) - await MsgBus.UDPServer.ClearUDPData(this.address, 1); + MsgBus.UDPServer.ClearUDPData(this.address, 1); else return new(new Exception("Message Bus not work!")); var ret = await UDPClientPool.WriteAddr(this.ep, 1, MatrixKeyAddr.KEY_ENABLE, 1, this.timeout); @@ -59,7 +59,7 @@ public class MatrixKey public async ValueTask> DisableControl() { if (MsgBus.IsRunning) - await MsgBus.UDPServer.ClearUDPData(this.address, 1); + MsgBus.UDPServer.ClearUDPData(this.address, 1); else return new(new Exception("Message Bus not work!")); var ret = await UDPClientPool.WriteAddr(this.ep, 1, MatrixKeyAddr.KEY_ENABLE, 0, this.timeout); @@ -75,7 +75,7 @@ public class MatrixKey public async ValueTask> ControlKey(BitArray keyStates) { if (MsgBus.IsRunning) - await MsgBus.UDPServer.ClearUDPData(this.address, 1); + MsgBus.UDPServer.ClearUDPData(this.address, 1); else return new(new Exception("Message Bus not work!")); if (keyStates.Length != 16) return new(new ArgumentException( diff --git a/server/src/Peripherals/PowerClient.cs b/server/src/Peripherals/PowerClient.cs index d7d6aea..6cb349a 100644 --- a/server/src/Peripherals/PowerClient.cs +++ b/server/src/Peripherals/PowerClient.cs @@ -45,7 +45,7 @@ public class Power public async ValueTask> SetPowerOnOff(bool enable) { if (MsgBus.IsRunning) - await MsgBus.UDPServer.ClearUDPData(this.address, 1); + MsgBus.UDPServer.ClearUDPData(this.address, 1); else return new(new Exception("Message Bus not work!")); var ret = await UDPClientPool.WriteAddr(this.ep, 1, PowerAddr.PowerCtrl, Convert.ToUInt32(enable), this.timeout); diff --git a/server/src/Peripherals/RemoteUpdateClient.cs b/server/src/Peripherals/RemoteUpdateClient.cs index ccfd8dd..e17cad9 100644 --- a/server/src/Peripherals/RemoteUpdateClient.cs +++ b/server/src/Peripherals/RemoteUpdateClient.cs @@ -382,7 +382,7 @@ public class RemoteUpdater /// [TODO:return] public async ValueTask> HotResetBitstream(int bitstreamNum) { - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace("Clear udp data finished"); { @@ -412,7 +412,7 @@ public class RemoteUpdater byte[]? bitstream2, byte[]? bitstream3) { - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace("Clear udp data finished"); for (int bitstreamNum = 0; bitstreamNum < 4; bitstreamNum++) @@ -463,7 +463,7 @@ public class RemoteUpdater $"The length of data should be divided by 4096, bug given {bytesData.Length}", nameof(bytesData))); var bitstreamBlockNum = bytesData.Length / (4 * 1024); - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace("Clear udp data finished"); { @@ -539,7 +539,7 @@ public class RemoteUpdater /// [TODO:return] public async ValueTask> GetVersion() { - await MsgBus.UDPServer.ClearUDPData(this.address, 0); + MsgBus.UDPServer.ClearUDPData(this.address, 0); logger.Trace("Clear udp data finished"); { diff --git a/server/src/UdpServer.cs b/server/src/UdpServer.cs index a1921b2..1c89e01 100644 --- a/server/src/UdpServer.cs +++ b/server/src/UdpServer.cs @@ -74,12 +74,8 @@ public class UDPServer private ConcurrentDictionary> udpData = new ConcurrentDictionary>(); - const int parallelTaskNum = 5; - Task[] ReceiveTasks = new Task[parallelTaskNum]; - private int runningTaskNum = 0; - private int listenPort; - private UdpClient listener; + private List listeners = new List(); private IPEndPoint groupEP; private bool isRunning = false; @@ -105,15 +101,19 @@ public class UDPServer /// Construct a udp server with fixed port /// /// Device UDP Port + /// UDP Client Num /// UDPServer class - public UDPServer(int port) + public UDPServer(int port, int num) { // Construction - listenPort = port; + this.listenPort = port; try { - listener = new UdpClient(listenPort); - groupEP = new IPEndPoint(IPAddress.Any, listenPort); + for (int i = 0; i < num; i++) + { + listeners.Add(new UdpClient(this.listenPort + i)); + } + this.groupEP = new IPEndPoint(IPAddress.Any, listenPort); } catch (Exception e) { @@ -346,54 +346,24 @@ public class UDPServer return retPack.Value; } - private void ReceiveHandler(IAsyncResult res) + private void ReceiveHandler(byte[] data, IPEndPoint endPoint) { - var remoteEP = new IPEndPoint(IPAddress.Any, listenPort); - byte[] bytes = listener.EndReceive(res, ref remoteEP); - - // 继续异步接收 - if (isRunning) - { - listener.BeginReceive(new AsyncCallback(ReceiveHandler), null); - } - else - { - runningTaskNum--; - } - // Handle RemoteEP - if (remoteEP is null) + if (endPoint is null) { logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:"); - logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}"); + logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}"); return; } // 异步处理数据包 Task.Run(() => { - var udpData = RecordUDPData(bytes, remoteEP, Convert.ToInt32(bytes[1])); + var udpData = RecordUDPData(data, endPoint, Convert.ToInt32(data[1])); PrintData(udpData); }); } - private bool SendBytes(IPEndPoint endPoint, byte[] buf) - { - var sendLen = listener.Send(buf, endPoint); - - if (sendLen == buf.Length) { return true; } - else { return false; } - } - - private bool SendString(IPEndPoint endPoint, string text) - { - byte[] buf = Encoding.ASCII.GetBytes(text); - var sendLen = listener.Send(buf, endPoint); - - if (sendLen == buf.Length) { return true; } - else { return false; } - } - private UDPData RecordUDPData(byte[] bytes, IPEndPoint remoteEP, int taskID) { var remoteAddress = remoteEP.Address.ToString(); @@ -484,7 +454,7 @@ public class UDPServer /// IP地址 /// [TODO:parameter] /// - public async Task ClearUDPData(string ipAddr, int taskID) + public void ClearUDPData(string ipAddr, int taskID) { var key = $"{ipAddr}-{taskID}"; if (udpData.TryGetValue(key, out var dataQueue)) @@ -503,19 +473,24 @@ public class UDPServer this.isRunning = true; try { - Task.Run(() => + foreach (var client in listeners) { - while (isRunning) + Task.Run(async () => { - if (runningTaskNum < parallelTaskNum) + while (this.isRunning) { - StartReceive(); - logger.Debug($"Begin Receive Task, Now Running Num: {runningTaskNum + 1}"); - runningTaskNum++; + try + { + UdpReceiveResult result = await client.ReceiveAsync(); + ReceiveHandler(result.Buffer, result.RemoteEndPoint); + } + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + } } - Task.Delay(100).Wait(); - } - }); + }); + } } catch (Exception e) { @@ -524,30 +499,16 @@ public class UDPServer } } - private void StartReceive(int times = 5) - { - - for (int i = 0; i < times; i++) - { - try - { - this.listener.BeginReceive(new AsyncCallback(ReceiveHandler), null); - break; // BeginReceive is async, break after scheduling - } - catch (Exception ex) - { - Console.WriteLine(ex.ToString()); - } - } - } - /// /// Close UDP Server /// /// None public void Stop() { - this.listener.Close(); + foreach (var item in listeners) + { + item.Close(); + } this.isRunning = false; } }