using System.Net; using System.Net.Sockets; using System.Text; using DotNext; using WebProtocol; using server.Services; /// /// UDP客户端发送池 /// public class UDPClientPool { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); private static IPAddress localhost = IPAddress.Parse("127.0.0.1"); /// /// 发送字符串 /// /// IP端点(IP地址与端口) /// 字符串数组 /// 是否成功 public static bool SendString(IPEndPoint endPoint, string[] stringArray) { var isSuccessful = true; Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); foreach (var str in stringArray) { byte[] sendbuf = Encoding.ASCII.GetBytes(str); var sendLen = socket.SendTo(sendbuf, endPoint); if (str.Length != sendLen) isSuccessful = false; } socket.Close(); if (isSuccessful) { return true; } else { return false; } } /// /// 异步发送字符串 /// /// IP端点(IP地址与端口) /// 字符串数组 /// 是否成功 public async static ValueTask SendStringAsync(IPEndPoint endPoint, string[] stringArray) { return await Task.Run(() => { return SendString(endPoint, stringArray); }); } /// /// 发送二进制字节 /// /// IP端点(IP地址与端口) /// 二进制字节 /// 是否成功 public static bool SendBytes(IPEndPoint endPoint, byte[] buf) { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); var sendLen = socket.SendTo(buf, endPoint); socket.Close(); // logger.Debug($"UDP socket send bytes to device {endPoint.Address.ToString()}:{endPoint.Port.ToString()}:"); // logger.Debug($" Original Data: {BitConverter.ToString(buf).Replace("-", " ")}"); if (sendLen == buf.Length) { return true; } else { return false; } } /// /// 异步发送二进制字节 /// /// IP端点(IP地址与端口) /// 二进制字节 /// 是否成功 public async static ValueTask SendBytesAsync(IPEndPoint endPoint, byte[] buf) { return await Task.Run(() => { return SendBytes(endPoint, buf); }); } /// /// 发送地址包 /// /// IP端点(IP地址与端口) /// 地址包 /// 是否成功 public static bool SendAddrPack(IPEndPoint endPoint, WebProtocol.SendAddrPackage pkg) { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); var sendBytes = pkg.ToBytes(); var sendLen = socket.SendTo(sendBytes, endPoint); socket.Close(); // logger.Debug($"UDP socket send address package to device {endPoint.Address.ToString()}:{endPoint.Port.ToString()}:"); // logger.Debug($" Original Data: {BitConverter.ToString(pkg.ToBytes()).Replace("-", " ")}"); // logger.Debug($" Decoded Data: {pkg.ToString()}"); if (sendLen == sendBytes.Length) { return true; } else { return false; } } /// /// 异步发送地址包 /// /// IP端点(IP地址与端口) /// 地址包 /// 是否成功 public async static ValueTask SendAddrPackAsync(IPEndPoint endPoint, WebProtocol.SendAddrPackage pkg) { return await Task.Run(() => { return SendAddrPack(endPoint, pkg); }); } /// /// 发送多个地址包 /// /// IP端点(IP地址与端口) /// 地址包集合(最多512 / 8) /// 是否全部成功 public static bool SendMultiAddrPack(IPEndPoint endPoint, IEnumerable pkgs) { const int maxPkgs = 512 / 8; var pkgList = pkgs.Take(maxPkgs).ToList(); if (pkgList.Count == 0) return false; // 合并所有包为一个buffer int totalLen = pkgList.Sum(pkg => pkg.ToBytes().Length); byte[] buffer = new byte[totalLen]; int offset = 0; foreach (var pkg in pkgList) { var bytes = pkg.ToBytes(); Buffer.BlockCopy(bytes, 0, buffer, offset, bytes.Length); offset += bytes.Length; } Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); var sendLen = socket.SendTo(buffer.ToArray(), endPoint); socket.Close(); return sendLen == buffer.Length; } /// /// 异步发送多个地址包 /// /// IP端点(IP地址与端口) /// 地址包集合(最多512 / 8) /// 是否全部成功 public async static ValueTask SendMultiAddrPackAsync(IPEndPoint endPoint, IEnumerable pkgs) { return await Task.Run(() => SendMultiAddrPack(endPoint, pkgs)); } /// /// 发送数据包 /// /// IP端点(IP地址与端口) /// 数据包 /// 是否成功 public static bool SendDataPack(IPEndPoint endPoint, WebProtocol.SendDataPackage pkg) { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); var sendBytes = pkg.ToBytes(); var sendLen = socket.SendTo(sendBytes, endPoint); socket.Close(); // logger.Debug($"UDP socket send data package to device {endPoint.Address.ToString()}:{endPoint.Port.ToString()}:"); // logger.Debug($" Original Data: {BitConverter.ToString(pkg.ToBytes()).Replace("-", " ")}"); if (sendLen == sendBytes.Length) { return true; } else { return false; } } /// /// 异步发送数据包 /// /// IP端点(IP地址与端口) /// 数据包 /// 是否成功 public async static ValueTask SendDataPackAsync(IPEndPoint endPoint, WebProtocol.SendDataPackage pkg) { return await Task.Run(() => { return SendDataPack(endPoint, pkg); }); } /// /// 发送字符串到本地 /// /// 端口 /// 字符串数组 /// 是否成功 public static bool SendStringLocalHost(int port, string[] stringArray) { return SendString(new IPEndPoint(localhost, port), stringArray); } /// /// 循环发送字符串到本地 /// /// 发送总次数 /// 间隔时间 /// 端口 /// 字符串数组 /// 是否成功 public static bool CycleSendStringLocalHost(int times, int sleepMilliSeconds, int port, string[] stringArray) { var isSuccessful = true; while (times-- >= 0) { isSuccessful = SendStringLocalHost(port, stringArray); if (!isSuccessful) break; Thread.Sleep(sleepMilliSeconds); } return isSuccessful; } /// /// 读取设备地址数据 /// /// IP端点(IP地址与端口) /// 任务ID /// 设备地址 /// 数据长度(0~255) /// 超时时间(毫秒) /// 读取结果,包含接收到的数据包 public static async ValueTask> ReadAddr( IPEndPoint endPoint, int taskID, uint devAddr, int dataLength, int timeout = 1000) { if (dataLength <= 0) return new(new ArgumentException("Data length must be greater than 0")); if (dataLength > 255) return new(new ArgumentException("Data length must be less than or equal to 255")); var ret = false; var opts = new SendAddrPackOptions() { BurstType = BurstType.FixedBurst, BurstLength = ((byte)(dataLength - 1)), CommandID = Convert.ToByte(taskID), Address = devAddr, IsWrite = false, }; // Read Register ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts)); if (!ret) return new(new Exception("Send Address Package Failed!")); // Wait for Read Ack if (!MsgBus.IsRunning) return new(new Exception("Message Bus not Working!")); var retPack = await MsgBus.UDPServer.WaitForDataAsync( endPoint.Address.ToString(), taskID, endPoint.Port, timeout); if (!retPack.IsSuccessful) return new(retPack.Error); else if (!retPack.Value.IsSuccessful) return new(new Exception("Send address package failed")); var retPackOpts = retPack.Value.Options; if (retPackOpts.Data is null) return new(new Exception($"Data is Null, package: {retPackOpts.ToString()}")); return retPack; } /// /// 读取设备地址数据 /// /// IP端点(IP地址与端口) /// 任务ID /// 设备地址 /// 超时时间(毫秒) /// 读取结果,包含接收到的数据包 public static async ValueTask> ReadAddrByte( IPEndPoint endPoint, int taskID, uint devAddr, int timeout = 1000) { return await ReadAddr(endPoint, taskID, devAddr, 0, timeout); } /// /// 读取设备地址数据并校验结果 /// /// IP端点(IP地址与端口) /// 任务ID /// 设备地址 /// 期望的结果值 /// 结果掩码,用于位校验 /// 超时时间(毫秒) /// 校验结果,true表示数据匹配期望值 public static async ValueTask> ReadAddr( IPEndPoint endPoint, int taskID, uint devAddr, UInt32 result, UInt32 resultMask, int timeout = 1000) { var address = endPoint.Address.ToString(); var ret = await ReadAddrByte(endPoint, taskID, devAddr, timeout); if (!ret.IsSuccessful) return new(ret.Error); if (!ret.Value.IsSuccessful) return new(new Exception($"Read device {address} address {devAddr} failed")); var retData = ret.Value.Options.Data; if (retData is null) return new(new Exception($"Device {address} receive none")); if (retData.Length != 4) return new(new Exception( $"Device {address} receive data is {retData.Length} bytes instead of 4 bytes")); // Check result try { var retCode = Convert.ToUInt32(Common.Number.BytesToUInt64(retData).Value); return Common.Number.BitsCheck(retCode, result, resultMask); } catch (Exception error) { return new(error); } } /// /// 读取设备地址数据并等待直到结果匹配或超时 /// /// IP端点(IP地址与端口) /// 任务ID /// 设备地址 /// 期望的结果值 /// 结果掩码,用于位校验 /// 等待间隔时间(毫秒) /// 超时时间(毫秒) /// 校验结果,true表示在超时前数据匹配期望值 public static async ValueTask> ReadAddrWithWait( IPEndPoint endPoint, int taskID, uint devAddr, UInt32 result, UInt32 resultMask, int waittime = 100, int timeout = 1000) { var address = endPoint.Address.ToString(); var startTime = DateTime.Now; while (true) { var elapsed = DateTime.Now - startTime; if (elapsed >= TimeSpan.FromMilliseconds(timeout)) break; var timeleft = TimeSpan.FromMilliseconds(timeout) - elapsed; await Task.Delay(waittime); try { var ret = await ReadAddrByte(endPoint, taskID, devAddr, Convert.ToInt32(timeleft.TotalMilliseconds)); if (!ret.IsSuccessful) return new(ret.Error); if (!ret.Value.IsSuccessful) return new(new Exception($"Read device {address} address {devAddr} failed")); var retData = ret.Value.Options.Data; if (retData is null) return new(new Exception($"Device {address} receive none")); if (retData.Length != 4) return new(new Exception( $"Device {address} receive data is {retData.Length} bytes instead of 4 bytes")); // Check result var retCode = Convert.ToUInt32(Common.Number.BytesToUInt32(retData).Value); if (Common.Number.BitsCheck(retCode, result, resultMask)) return true; } catch (Exception error) { return new(error); } } return false; } /// /// 从设备地址读取字节数组数据(支持大数据量分段传输) /// /// IP端点(IP地址与端口) /// 任务ID /// 设备地址 /// 要读取的数据长度(4字节) /// 超时时间(毫秒) /// 读取结果,包含接收到的字节数组 public static async ValueTask> ReadAddr4Bytes( IPEndPoint endPoint, int taskID, UInt32 devAddr, int dataLength, int timeout = 1000) { var ret = false; var opts = new SendAddrPackOptions() { BurstLength = 0, Address = 0, BurstType = BurstType.FixedBurst, CommandID = Convert.ToByte(taskID), IsWrite = false, }; var resultData = new List(); // Check Msg Bus if (!MsgBus.IsRunning) return new(new Exception("Message bus not working!")); // Calculate read times and segments var max4BytesPerRead = 0x80; // 512 bytes per read var rest4Bytes = dataLength % max4BytesPerRead; var readTimes = (rest4Bytes != 0) ? (dataLength / max4BytesPerRead + 1) : (dataLength / max4BytesPerRead); for (var i = 0; i < readTimes; i++) { // Calculate current segment size var isLastSegment = i == readTimes - 1; var currentSegmentSize = (isLastSegment && rest4Bytes != 0) ? rest4Bytes : max4BytesPerRead; // Set burst length (in 32-bit words) opts.BurstLength = (byte)(currentSegmentSize - 1); // Update address for current segment opts.Address = devAddr + (uint)(i * max4BytesPerRead); // Send read address package ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts)); if (!ret) return new(new Exception($"Send address package failed at segment {i}!")); // Wait for data response var retPack = await MsgBus.UDPServer.WaitForDataAsync( endPoint.Address.ToString(), taskID, endPoint.Port, timeout); if (!retPack.IsSuccessful) return new(retPack.Error); if (!retPack.Value.IsSuccessful) return new(new Exception($"Read address package failed at segment {i}")); var retPackOpts = retPack.Value.Options; if (retPackOpts.Data is null) return new(new Exception($"Data is null at segment {i}, package: {retPackOpts.ToString()}")); // Validate received data length if (retPackOpts.Data.Length != currentSegmentSize * 4) return new(new Exception($"Expected {currentSegmentSize * 4} bytes but received {retPackOpts.Data.Length} bytes at segment {i}")); // Add received data to result resultData.AddRange(retPackOpts.Data); } // Validate total data length if (resultData.Count != dataLength * 4) return new(new Exception($"Expected total {dataLength * 4} bytes but received {resultData.Count} bytes")); return resultData.ToArray(); } /// /// 从设备地址读取字节数组数据(支持大数据量分段传输,先发送所有地址包再接收所有数据包) /// /// IP端点(IP地址与端口) /// 任务ID /// 设备地址 /// 突发类型 /// 要读取的数据长度(4字节) /// 超时时间(毫秒) /// 读取结果,包含接收到的字节数组 public static async ValueTask> ReadAddr4BytesAsync( IPEndPoint endPoint, int taskID, UInt32 devAddr, int dataLength, BurstType burstType, int timeout = 1000) { var pkgList = new List(); var resultData = new List(); // Check Msg Bus if (!MsgBus.IsRunning) return new(new Exception("Message bus not working!")); // Prepare packages for each segment var max4BytesPerRead = 0x80; // 512 bytes per read var rest4Bytes = dataLength % max4BytesPerRead; var readTimes = (rest4Bytes != 0) ? (dataLength / max4BytesPerRead + 1) : (dataLength / max4BytesPerRead); for (var i = 0; i < readTimes; i++) { var isLastSegment = i == readTimes - 1; var currentSegmentSize = (isLastSegment && rest4Bytes != 0) ? rest4Bytes : max4BytesPerRead; var opts = new SendAddrPackOptions { BurstType = burstType, CommandID = Convert.ToByte(taskID), IsWrite = false, BurstLength = (byte)(currentSegmentSize - 1), Address = (burstType == BurstType.ExtendBurst) ? (devAddr + (uint)(i * max4BytesPerRead)) : (devAddr), // Address = devAddr + (uint)(i * max4BytesPerRead), }; pkgList.Add(new SendAddrPackage(opts)); } // Send address packages in batches of 128, control outstanding int sentCount = 0; var startTime = DateTime.Now; const int batchSize = 32; while (sentCount < pkgList.Count) { var elapsed = DateTime.Now - startTime; if (elapsed >= TimeSpan.FromMilliseconds(timeout)) break; var timeleft = timeout - (int)elapsed.TotalMilliseconds; var found = await MsgBus.UDPServer.GetDataCountAsync(endPoint.Address.ToString(), taskID, timeleft); int outstanding = sentCount - (found.HasValue ? found.Value : 0); // If outstanding >= 512 - batchSize, wait for some data to be received if (outstanding >= 128 - batchSize) continue; // Send next batch of address packages (up to 128) int batchSend = Math.Min(batchSize, pkgList.Count - sentCount); var batchPkgs = pkgList.Skip(sentCount).Take(batchSend); var ret = await UDPClientPool.SendMultiAddrPackAsync(endPoint, batchPkgs); if (!ret) return new(new Exception($"Send address package batch failed at segment {sentCount}!")); sentCount += batchSend; // Task.Delay(1).Wait(); } // Wait until enough data is received or timeout startTime = DateTime.Now; var udpDatas = new List(); while (true) { var elapsed = DateTime.Now - startTime; if (elapsed >= TimeSpan.FromMilliseconds(timeout)) break; var timeleft = timeout - (int)elapsed.TotalMilliseconds; var found = await MsgBus.UDPServer.GetDataCountAsync(endPoint.Address.ToString(), taskID, timeleft); if (found.HasValue && found.Value >= readTimes) { var dataArr = await MsgBus.UDPServer.FindDataArrayAsync(endPoint.Address.ToString(), taskID, timeleft); if (dataArr.HasValue) { udpDatas.AddRange(dataArr.Value); break; } } } if (udpDatas.Count < readTimes) return new(new Exception($"Expected {readTimes} UDP data packets but received {udpDatas.Count}")); // Collect and validate all received data for (var i = 0; i < udpDatas.Count; i++) { var bytes = udpDatas[i].Data; var expectedLen = ((pkgList[i].Options.BurstLength + 1) * 4); if ((bytes.Length - 8) != expectedLen) return new(new Exception($"Expected {expectedLen} bytes but received {bytes.Length - 8} bytes at segment {i}")); resultData.AddRange(bytes[8..]); } // Validate total data length if (resultData.Count != dataLength * 4) return new(new Exception($"Expected total {dataLength * 4} bytes but received {resultData.Count} bytes")); return resultData.ToArray(); } /// /// 顺序读取多个地址的数据,并合并BodyData后返回 /// /// IP端点(IP地址与端口) /// 任务ID /// 地址数组 /// 超时时间(毫秒) /// 合并后的BodyData字节数组 public static async ValueTask> ReadAddrSeq(IPEndPoint endPoint, int taskID, UInt32[] addr, int timeout = 1000) { var length = addr.Length; var resultData = new List(); for (int i = 0; i < length; i++) { var ret = await ReadAddrByte(endPoint, taskID, addr[i], timeout); if (!ret.IsSuccessful) { logger.Error($"ReadAddrSeq failed at index {i}: {ret.Error}"); return new(ret.Error); } if (!ret.Value.IsSuccessful) { logger.Error($"ReadAddrSeq failed at index {i}: Read not successful"); return new(new Exception($"ReadAddrSeq failed at index {i}")); } var data = ret.Value.Options.Data; if (data is null) { logger.Error($"ReadAddrSeq got null data at index {i}"); return new(new Exception($"ReadAddrSeq got null data at index {i}")); } resultData.AddRange(data); } return resultData.ToArray(); } /// /// 向设备地址写入32位数据 /// /// IP端点(IP地址与端口) /// 任务ID /// 设备地址 /// 要写入的32位数据 /// 超时时间(毫秒) /// 写入结果,true表示写入成功 public static async ValueTask> WriteAddr( IPEndPoint endPoint, int taskID, UInt32 devAddr, UInt32 data, int timeout = 1000, ProgressReporter? progress = null) { var ret = false; var opts = new SendAddrPackOptions() { BurstType = BurstType.FixedBurst, BurstLength = 0, CommandID = Convert.ToByte(taskID), Address = devAddr, IsWrite = true, }; progress?.Report(20); // Write Register ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts)); if (!ret) return new(new Exception("Send 1st address package failed!")); progress?.Report(40); // Send Data Package ret = await UDPClientPool.SendDataPackAsync(endPoint, new SendDataPackage(Common.Number.NumberToBytes(data, 4).Value)); if (!ret) return new(new Exception("Send data package failed!")); progress?.Report(60); // Check Msg Bus if (!MsgBus.IsRunning) return new(new Exception("Message bus not working!")); // Wait for Write Ack var udpWriteAck = await MsgBus.UDPServer.WaitForAckAsync( endPoint.Address.ToString(), taskID, endPoint.Port, timeout); if (!udpWriteAck.IsSuccessful) return new(udpWriteAck.Error); progress?.Finish(); return udpWriteAck.Value.IsSuccessful; } /// /// 向设备地址写入字节数组数据(支持大数据量分段传输) /// /// IP端点(IP地址与端口) /// 任务ID /// 设备地址 /// 要写入的字节数组 /// 超时时间(毫秒) /// 写入结果,true表示写入成功 public static async ValueTask> WriteAddr( IPEndPoint endPoint, int taskID, UInt32 devAddr, byte[] dataArray, int timeout = 1000, ProgressReporter? progress = null) { var ret = false; var opts = new SendAddrPackOptions() { BurstType = BurstType.FixedBurst, CommandID = Convert.ToByte(taskID), Address = devAddr, BurstLength = 0, IsWrite = true, }; var max4BytesPerRead = 128; // 1024 bytes per read // Check Msg Bus if (!MsgBus.IsRunning) return new(new Exception("Message bus not working!")); var hasRest = dataArray.Length % (max4BytesPerRead * (32 / 8)) != 0; var writeTimes = hasRest ? dataArray.Length / (max4BytesPerRead * (32 / 8)) + 1 : dataArray.Length / (max4BytesPerRead * (32 / 8)); if (progress != null) progress.ExpectedSteps = writeTimes; for (var i = 0; i < writeTimes; i++) { // Sperate Data Array var isLastData = i == writeTimes - 1; var sendDataArray = isLastData ? dataArray[(i * (max4BytesPerRead * (32 / 8)))..] : dataArray[(i * (max4BytesPerRead * (32 / 8)))..((i + 1) * (max4BytesPerRead * (32 / 8)))]; // Calculate BurstLength opts.BurstLength = ((byte)( sendDataArray.Length % 4 == 0 ? (sendDataArray.Length / 4 - 1) : (sendDataArray.Length / 4) )); ret = await UDPClientPool.SendAddrPackAsync(endPoint, new SendAddrPackage(opts)); if (!ret) return new(new Exception("Send 1st address package failed!")); // Send Data Package ret = await UDPClientPool.SendDataPackAsync(endPoint, new SendDataPackage(sendDataArray)); if (!ret) return new(new Exception("Send data package failed!")); // Wait for Write Ack var udpWriteAck = await MsgBus.UDPServer.WaitForAckAsync(endPoint.Address.ToString(), taskID, endPoint.Port, timeout); if (!udpWriteAck.IsSuccessful) return new(udpWriteAck.Error); if (!udpWriteAck.Value.IsSuccessful) return false; progress?.Increase(); } progress?.Finish(); return true; } /// /// [TODO:description] /// /// [TODO:parameter] /// [TODO:parameter] /// [TODO:parameter] /// [TODO:parameter] /// [TODO:parameter] /// [TODO:return] public static async ValueTask> WriteAddrSeq(IPEndPoint endPoint, int taskID, UInt32[] addr, byte[] data, int timeout = 1000) { var length = addr.Length; if (length != data.Length) { logger.Error($"TODO"); return new(new ArgumentException($"TODO")); } for (int i = 0; i < length; i++) { var ret = await WriteAddr(endPoint, taskID, addr[i], (UInt32)data[i], timeout); if (!ret.IsSuccessful) { logger.Error($"TODO"); return new(ret.Error); } if (!ret.Value) { logger.Error($"TODO"); return false; } } return true; } }