This repository has been archived on 2025-10-29. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
FPGA_WebLab/server/src/UdpServer.cs

881 lines
26 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Collections.Concurrent;
using System.Net;
using System.Net.NetworkInformation; // 添加这个引用
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Text;
using DotNext;
using Newtonsoft.Json;
using WebProtocol;
/// <summary> UDP接受数据包格式 </summary>
public class UDPData
{
/// <summary>
/// 接受到的时间
/// </summary>
public required DateTime DateTime { get; set; }
/// <summary>
/// 发送来源的IP地址
/// </summary>
public required string Address { get; set; }
/// <summary>
/// 发送来源的端口号
/// </summary>
public required int Port { get; set; }
/// <summary>
/// 任务ID
/// </summary>
public required int TaskID { get; set; }
/// <summary>
/// 接受到的数据
/// </summary>
public required byte[] Data { get; set; }
/// <summary>
/// 是否被读取过
/// </summary>
public required bool HasRead { get; set; }
/// <summary>
/// 深度拷贝对象
/// </summary>
/// <returns>UDPData</returns>
public UDPData DeepClone()
{
var cloneData = new byte[this.Data.Length];
Buffer.BlockCopy(this.Data, 0, cloneData, 0, this.Data.Length);
return new UDPData()
{
DateTime = this.DateTime,
Address = new string(this.Address),
Port = this.Port,
TaskID = this.TaskID,
Data = cloneData,
HasRead = this.HasRead
};
}
/// <summary>
/// 将UDP Data 转化为Json 格式字符串
/// </summary>
/// <returns>json字符串</returns>
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}
}
/// <summary>
/// UDP 服务器
/// </summary>
public class UDPServer : IDisposable
{
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
private ConcurrentDictionary<string, SortedList<DateTime, UDPData>> udpData = new ConcurrentDictionary<string, SortedList<DateTime, UDPData>>();
private readonly ReaderWriterLockSlim udpDataLock = new ReaderWriterLockSlim();
private int listenPort;
private List<UdpClient> listeners = new List<UdpClient>();
private IPEndPoint groupEP;
private bool isRunning = false;
private bool disposed = false;
/// <summary>
/// 是否正在工作
/// </summary>
public bool IsRunning { get { return isRunning; } }
/// <summary> UDP 服务器的错误代码 </summary>
public enum ErrorCode
{
/// <summary> [TODO:description] </summary>
Success = 0,
/// <summary> [TODO:description] </summary>
GetNoneAfterTimeout,
/// <summary> [TODO:description] </summary>
ResponseWrong,
/// <summary> [TODO:description] </summary>
NotRecvDataPackage,
}
/// <summary>
/// Construct a udp server with fixed port
/// </summary>
/// <param name="port"> Device UDP Port </param>
/// <param name="num"> UDP Client Num </param>
/// <returns> UDPServer class </returns>
public UDPServer(int port, int num)
{
// Construction
this.listenPort = port;
try
{
for (int i = 0; i < num; i++)
{
listeners.Add(new UdpClient(this.listenPort + i));
}
this.groupEP = new IPEndPoint(IPAddress.Any, listenPort);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw new ArgumentException(
$"Failed to set up server with this port: {port}",
nameof(port)
);
}
}
/// <summary>
/// 异步寻找目标发送的内容
/// </summary>
/// <param name="ipAddr"> 目标IP地址 </param>
/// <param name="taskID">[TODO:parameter]</param>
/// <param name="timeout">超时时间</param>
/// <param name="cycle">延迟时间</param>
/// <param name="callerName">调用函数名称</param>
/// <param name="callerLineNum">调用函数位置</param>
/// <returns>
/// 异步Optional 数据包:
/// Optional 为空时,表明找不到数据;
/// Optional 存在时,为最先收到的数据
/// </returns>
public async ValueTask<Optional<UDPData>> FindDataAsync(
string ipAddr, int taskID, int timeout = 1000, int cycle = 0,
[CallerMemberName] string callerName = "",
[CallerLineNumber] int callerLineNum = 0
)
{
UDPData? data = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
udpDataLock.EnterWriteLock();
try
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{
// 获取最早的数据(第一个元素)
var firstKey = sortedList.Keys[0];
data = sortedList[firstKey];
sortedList.RemoveAt(0);
break;
}
}
finally
{
udpDataLock.ExitWriteLock();
}
await Task.Delay(cycle);
}
if (data is null)
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data.DeepClone());
}
}
/// <summary>
/// 异步寻找目标发送的所有内容,并清空队列
/// </summary>
/// <param name="ipAddr">目标IP地址</param>
/// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param>
/// <returns>异步Optional 数据包列表</returns>
public async ValueTask<Optional<List<UDPData>>> FindDataArrayAsync(string ipAddr, int taskID, int timeout = 1000)
{
List<UDPData>? data = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
udpDataLock.EnterWriteLock();
try
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{
data = new List<UDPData>(sortedList.Values);
// 输出数据
// PrintDataArray(data);
sortedList.Clear();
break;
}
}
finally
{
udpDataLock.ExitWriteLock();
}
}
if (data is null)
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data);
}
}
/// <summary>
/// 获取还未被读取的数据列表
/// </summary>
/// <param name="ipAddr">IP地址</param>
/// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param>
/// <returns>数据列表</returns>
public async ValueTask<Optional<List<UDPData>>> GetDataArrayAsync(string ipAddr, int taskID, int timeout = 1000)
{
List<UDPData>? data = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
udpDataLock.EnterReadLock();
try
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var sortedList) && sortedList.Count > 0)
{
data = new List<UDPData>(sortedList.Values);
break;
}
}
finally
{
udpDataLock.ExitReadLock();
}
}
if (data is null)
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data);
}
}
/// <summary>
/// 异步获取指定IP和任务ID的数据队列长度
/// </summary>
/// <param name="ipAddr">IP地址</param>
/// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param>
/// <returns>数据队列长度</returns>
public async ValueTask<Optional<int>> GetDataCountAsync(string ipAddr, int taskID, int timeout = 1000)
{
int? count = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
udpDataLock.EnterReadLock();
try
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var sortedList))
{
count = sortedList.Count;
break;
}
}
finally
{
udpDataLock.ExitReadLock();
}
}
if (count is null)
{
logger.Trace("Get nothing even after time out");
return Optional<int>.None;
}
else
{
return new(count.Value);
}
}
/// <summary>
/// 异步等待写响应
/// </summary>
/// <param name="address">IP地址</param>
/// <param name="taskID">[TODO:parameter]</param>
/// <param name="port">UDP 端口</param>
/// <param name="timeout">超时时间范围</param>
/// <returns>接收响应包</returns>
public async ValueTask<Result<WebProtocol.RecvRespPackage>> WaitForAckAsync
(string address, int taskID, int port = -1, int timeout = 1000)
{
var data = await FindDataAsync(address, taskID, timeout);
if (!data.HasValue)
return new(new Exception("Get None even after time out!"));
var recvData = data.Value;
if (recvData.Address != address || (port > 0 && recvData.Port != port))
return new(new Exception("Receive Data From Wrong Board!"));
var retPack = WebProtocol.RecvRespPackage.FromBytes(recvData.Data);
if (!retPack.IsSuccessful)
return new(new Exception("Not RecvDataPackage!", retPack.Error));
return retPack.Value;
}
/// <summary>
/// 异步等待数据
/// </summary>
/// <param name="address">IP地址</param>
/// <param name="taskID">[TODO:parameter]</param>
/// <param name="port">UDP 端口</param>
/// <param name="timeout">超时时间范围</param>
/// <returns>接收数据包</returns>
public async ValueTask<Result<RecvDataPackage>> WaitForDataAsync
(string address, int taskID, int port = -1, int timeout = 1000)
{
var data = await FindDataAsync(address, taskID, timeout);
if (!data.HasValue)
return new(new Exception("Get None even after time out!"));
var recvData = data.Value;
if (recvData.Address != address || (port >= 0 && recvData.Port != port))
return new(new Exception("Receive Data From Wrong Board!"));
var retPack = WebProtocol.RecvDataPackage.FromBytes(recvData.Data);
if (!retPack.IsSuccessful)
return new(new Exception("Not RecvDataPackage!", retPack.Error));
return retPack.Value;
}
private async Task ReceiveHandler(byte[] data, IPEndPoint endPoint, DateTime time)
{
// 异步锁保护 udpData
await Task.Run(() =>
{
try
{
// Handle RemoteEP
if (endPoint is null)
{
logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:");
logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}");
return;
}
var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1]));
// PrintData(udpDataObj);
}
catch (Exception e)
{
logger.Error($"Got Error when handle receive:{e}");
}
});
}
private UDPData RecordUDPData(byte[] bytes, IPEndPoint remoteEP, DateTime time, int taskID)
{
var remoteAddress = remoteEP.Address.ToString();
var remotePort = remoteEP.Port;
var data = new UDPData()
{
Address = remoteAddress,
Port = remotePort,
TaskID = taskID,
Data = bytes,
DateTime = time,
HasRead = false,
};
udpDataLock.EnterWriteLock();
try
{
var key = $"{remoteAddress}-{taskID}";
var sortedList = udpData.GetOrAdd(key, _ => new SortedList<DateTime, UDPData>());
// 处理相同时间戳的情况,添加微小的时间差
var uniqueTime = time;
while (sortedList.ContainsKey(uniqueTime))
{
logger.Warn(
$"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}.");
uniqueTime = uniqueTime.AddTicks(1);
}
sortedList.Add(uniqueTime, data);
// 输出单个数据
PrintData(data);
// 输出全部数据
// PrintAllData();
}
finally
{
udpDataLock.ExitWriteLock();
}
return data;
}
/// <summary>
/// 输出UDP Data到log中
/// </summary>
/// <param name="data">UDP数据</param>
public string PrintData(UDPData data)
{
var bytes = data.Data;
var sign = bytes[0];
string recvData = "";
if (sign == (byte)WebProtocol.PackSign.SendAddr)
{
var resData = WebProtocol.SendAddrPackage.FromBytes(bytes);
if (resData.IsSuccessful)
recvData = resData.Value.ToString();
else
recvData = resData.Error.ToString();
}
else if (sign == (byte)WebProtocol.PackSign.SendData) { }
else if (sign == (byte)WebProtocol.PackSign.RecvData)
{
var resData = WebProtocol.RecvDataPackage.FromBytes(bytes);
if (resData.IsSuccessful)
recvData = resData.Value.Options.ToString();
else
recvData = resData.Error.ToString();
}
else if (sign == (byte)WebProtocol.PackSign.RecvResp)
{
var resData = WebProtocol.RecvRespPackage.FromBytes(bytes);
if (resData.IsSuccessful)
recvData = resData.Value.Options.ToString();
else
recvData = resData.Error.ToString();
}
else
{
recvData = Encoding.ASCII.GetString(bytes, 0, bytes.Length);
}
logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:");
logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}");
if (recvData.Length != 0) logger.Debug($" Decoded Data : {recvData}");
return $@"
Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:
Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}
Decoded Data : {recvData}
";
}
/// <summary>
/// 输出UDP Data数组到log中
/// </summary>
/// <param name="dataArray">UDP数据列表</param>
public void PrintDataArray(IEnumerable<UDPData> dataArray)
{
foreach (var data in dataArray)
{
logger.Debug(PrintData(data));
}
}
/// <summary>
/// 将所有数据输出到log中
/// </summary>
/// <returns> void </returns>
public void PrintAllData()
{
logger.Debug("Ready Data:");
udpDataLock.EnterReadLock();
try
{
foreach (var kvp in udpData)
{
foreach (var data in kvp.Value.Values)
{
logger.Debug(PrintData(data));
}
}
}
finally
{
udpDataLock.ExitReadLock();
}
}
/// <summary>
/// 清空指定IP地址的数据
/// </summary>
/// <param name="ipAddr">IP地址</param>
/// <param name="taskID">[TODO:parameter]</param>
/// <returns>无</returns>
public void ClearUDPData(string ipAddr, int taskID)
{
var key = $"{ipAddr}-{taskID}";
udpDataLock.EnterWriteLock();
try
{
if (udpData.TryGetValue(key, out var sortedList))
{
sortedList.Clear();
}
}
finally
{
udpDataLock.ExitWriteLock();
}
// 强制进行ARP刷新防止后续传输时造成影响
FlushArpEntry(ipAddr);
}
/// <summary>
/// 跨平台ARP缓存刷新
/// </summary>
/// <param name="ipAddr">目标IP地址</param>
private void FlushArpEntry(string ipAddr)
{
try
{
// 验证IP地址格式
if (!IPAddress.TryParse(ipAddr, out var _))
{
logger.Warn($"Invalid IP address format: {ipAddr}");
return;
}
// 异步执行ARP刷新避免阻塞主线程
Task.Run(() => ExecuteArpFlush(ipAddr));
}
catch (Exception ex)
{
logger.Error($"Error during ARP cache flush for {ipAddr}: {ex.Message}");
}
}
/// <summary>
/// 使用Ping类重新刷新ARP缓存
/// </summary>
/// <param name="ipAddr">目标IP地址</param>
private async void RefreshArpWithPing(string ipAddr)
{
try
{
using var ping = new Ping();
var options = new PingOptions
{
DontFragment = true,
Ttl = 32,
};
// 创建32字节的数据包
byte[] buffer = new byte[32];
for (int i = 0; i < buffer.Length; i++)
{
buffer[i] = (byte)(i % 256);
}
// 异步发送ping100ms超时
var reply = await ping.SendPingAsync(ipAddr, 100, buffer, options);
if (reply.Status == IPStatus.Success)
{
logger.Debug($"ARP cache refreshed successfully for {ipAddr} (RTT: {reply.RoundtripTime}ms)");
}
else
{
logger.Warn($"Ping to {ipAddr} failed with status: {reply.Status}, but ARP entry should still be updated");
}
}
catch (Exception ex)
{
logger.Error($"Error refreshing ARP with ping for {ipAddr}: {ex.Message}");
}
}
/// <summary>
/// 执行ARP刷新流程先删除ARP条目再用ping重新刷新
/// </summary>
/// <param name="ipAddr">目标IP地址</param>
private async void ExecuteArpFlush(string ipAddr)
{
try
{
// 第一步删除ARP条目
bool deleteSuccess = DeleteArpEntry(ipAddr);
if (deleteSuccess)
{
logger.Debug($"ARP entry deleted successfully for {ipAddr}");
// 第二步使用Ping类重新刷新ARP缓存
RefreshArpWithPing(ipAddr);
}
else
{
logger.Warn($"Failed to delete ARP entry for {ipAddr}, but continuing with ping refresh");
// 即使删除失败也尝试ping刷新
RefreshArpWithPing(ipAddr);
}
}
catch (Exception ex)
{
logger.Error($"Failed to execute ARP flush for {ipAddr}: {ex.Message}");
}
}
/// <summary>
/// 删除ARP条目
/// </summary>
/// <param name="ipAddr">目标IP地址</param>
/// <returns>是否成功删除</returns>
private bool DeleteArpEntry(string ipAddr)
{
try
{
string command;
string arguments;
if (OperatingSystem.IsWindows())
{
// Windows: arp -d <ip>
command = "arp";
arguments = $"-d {ipAddr}";
}
else if (OperatingSystem.IsLinux() || OperatingSystem.IsMacOS())
{
// Linux/macOS: 优先使用 ip 命令删除
if (IsCommandAvailable("ip"))
{
command = "ip";
arguments = $"neigh del {ipAddr}";
}
else if (IsCommandAvailable("arp"))
{
command = "arp";
arguments = $"-d {ipAddr}";
}
else
{
logger.Warn("Neither 'ip' nor 'arp' command is available for ARP entry deletion");
return false;
}
}
else
{
logger.Warn($"Unsupported operating system for ARP entry deletion");
return false;
}
return ExecuteCommand(command, arguments, $"delete ARP entry for {ipAddr}");
}
catch (Exception ex)
{
logger.Error($"Error deleting ARP entry for {ipAddr}: {ex.Message}");
return false;
}
}
/// <summary>
/// 检查系统命令是否可用
/// </summary>
/// <param name="command">命令名称</param>
/// <returns>命令是否可用</returns>
private bool IsCommandAvailable(string command)
{
try
{
var process = new System.Diagnostics.Process
{
StartInfo = new System.Diagnostics.ProcessStartInfo
{
FileName = OperatingSystem.IsWindows() ? "where" : "which",
Arguments = command,
UseShellExecute = false,
RedirectStandardOutput = true,
CreateNoWindow = true
}
};
process.Start();
process.WaitForExit(1000); // 1秒超时
return process.ExitCode == 0;
}
catch
{
return false;
}
}
/// <summary>
/// 执行系统命令
/// </summary>
/// <param name="command">命令</param>
/// <param name="arguments">参数</param>
/// <param name="operation">操作描述</param>
/// <returns>是否成功执行</returns>
private bool ExecuteCommand(string command, string arguments, string operation)
{
try
{
var process = new System.Diagnostics.Process
{
StartInfo = new System.Diagnostics.ProcessStartInfo
{
FileName = command,
Arguments = arguments,
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true,
CreateNoWindow = true
}
};
process.Start();
// 设置超时时间,避免进程挂起
if (process.WaitForExit(5000)) // 5秒超时
{
var output = process.StandardOutput.ReadToEnd();
var error = process.StandardError.ReadToEnd();
if (process.ExitCode == 0)
{
logger.Debug($"Command executed successfully: {operation}");
return true;
}
else
{
logger.Warn($"Command failed: {operation}. Exit code: {process.ExitCode}, Error: {error}");
return false;
}
}
else
{
process.Kill();
logger.Warn($"Command timed out: {operation}");
return false;
}
}
catch (Exception ex)
{
logger.Error($"Failed to execute command for {operation}: {ex.Message}");
return false;
}
}
/// <summary>
/// Start UDP Server
/// </summary>
/// <returns>None</returns>
public void Start()
{
this.isRunning = true;
try
{
foreach (var client in listeners)
{
Task.Run(() =>
{
while (this.isRunning)
{
try
{
var ep = new IPEndPoint(IPAddress.Any, listenPort);
var result = client.Receive(ref ep);
_ = ReceiveHandler(result, ep, DateTime.Now);
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
});
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
this.isRunning = false;
}
}
/// <summary>
/// Close UDP Server
/// </summary>
/// <returns>None</returns>
public void Stop()
{
foreach (var item in listeners)
{
item.Close();
}
this.isRunning = false;
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
Stop();
udpDataLock?.Dispose();
}
disposed = true;
}
}
~UDPServer()
{
Dispose(false);
}
}