FPGA_WebLab/server/src/UdpServer.cs

539 lines
16 KiB
C#

using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Text;
using DotNext;
using Newtonsoft.Json;
using WebProtocol;
/// <summary> UDP接受数据包格式 </summary>
public class UDPData
{
/// <summary>
/// 接受到的时间
/// </summary>
public required DateTime DateTime { get; set; }
/// <summary>
/// 发送来源的IP地址
/// </summary>
public required string Address { get; set; }
/// <summary>
/// 发送来源的端口号
/// </summary>
public required int Port { get; set; }
/// <summary>
/// 任务ID
/// </summary>
public required int TaskID { get; set; }
/// <summary>
/// 接受到的数据
/// </summary>
public required byte[] Data { get; set; }
/// <summary>
/// 是否被读取过
/// </summary>
public required bool HasRead { get; set; }
/// <summary>
/// 深度拷贝对象
/// </summary>
/// <returns>UDPData</returns>
public UDPData DeepClone()
{
var cloneData = new byte[this.Data.Length];
Buffer.BlockCopy(this.Data, 0, cloneData, 0, this.Data.Length);
return new UDPData()
{
DateTime = this.DateTime,
Address = new string(this.Address),
Port = this.Port,
TaskID = this.TaskID,
Data = cloneData,
HasRead = this.HasRead
};
}
/// <summary>
/// 将UDP Data 转化为Json 格式字符串
/// </summary>
/// <returns>json字符串</returns>
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}
}
/// <summary>
/// UDP 服务器
/// </summary>
public class UDPServer
{
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
private ConcurrentDictionary<string, ConcurrentQueue<UDPData>> udpData = new ConcurrentDictionary<string, ConcurrentQueue<UDPData>>();
private int listenPort;
private List<UdpClient> listeners = new List<UdpClient>();
private IPEndPoint groupEP;
private bool isRunning = false;
/// <summary>
/// 是否正在工作
/// </summary>
public bool IsRunning { get { return isRunning; } }
/// <summary> UDP 服务器的错误代码 </summary>
public enum ErrorCode
{
/// <summary> [TODO:description] </summary>
Success = 0,
/// <summary> [TODO:description] </summary>
GetNoneAfterTimeout,
/// <summary> [TODO:description] </summary>
ResponseWrong,
/// <summary> [TODO:description] </summary>
NotRecvDataPackage,
}
/// <summary>
/// Construct a udp server with fixed port
/// </summary>
/// <param name="port"> Device UDP Port </param>
/// <param name="num"> UDP Client Num </param>
/// <returns> UDPServer class </returns>
public UDPServer(int port, int num)
{
// Construction
this.listenPort = port;
try
{
for (int i = 0; i < num; i++)
{
listeners.Add(new UdpClient(this.listenPort + i));
}
this.groupEP = new IPEndPoint(IPAddress.Any, listenPort);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw new ArgumentException(
$"Failed to set up server with this port: {port}",
nameof(port)
);
}
}
/// <summary>
/// 异步寻找目标发送的内容
/// </summary>
/// <param name="ipAddr"> 目标IP地址 </param>
/// <param name="taskID">[TODO:parameter]</param>
/// <param name="timeout">超时时间</param>
/// <param name="cycle">延迟时间</param>
/// <param name="callerName">调用函数名称</param>
/// <param name="callerLineNum">调用函数位置</param>
/// <returns>
/// 异步Optional 数据包:
/// Optional 为空时,表明找不到数据;
/// Optional 存在时,为最先收到的数据
/// </returns>
public async ValueTask<Optional<UDPData>> FindDataAsync(
string ipAddr, int taskID, int timeout = 1000, int cycle = 0,
[CallerMemberName] string callerName = "",
[CallerLineNumber] int callerLineNum = 0
)
{
UDPData? data = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
lock (udpData)
{
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
dataQueue.TryDequeue(out data))
{
// logger.Debug($"Find UDP Data: {data.ToString()}");
break;
}
}
await Task.Delay(cycle);
}
if (data is null)
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data.DeepClone());
}
}
/// <summary>
/// 异步寻找目标发送的所有内容,并清空队列
/// </summary>
/// <param name="ipAddr">目标IP地址</param>
/// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param>
/// <returns>异步Optional 数据包列表</returns>
public async ValueTask<Optional<List<UDPData>>> FindDataArrayAsync(string ipAddr, int taskID, int timeout = 1000)
{
List<UDPData>? data = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
lock (udpData)
{
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
!dataQueue.IsEmpty)
{
data = new List<UDPData>();
while (dataQueue.TryDequeue(out var item))
{
data.Add(item);
}
break;
}
}
}
if (data is null)
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data);
}
}
/// <summary>
/// 获取还未被读取的数据列表
/// </summary>
/// <param name="ipAddr">IP地址</param>
/// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param>
/// <returns>数据列表</returns>
public async ValueTask<Optional<List<UDPData>>> GetDataArrayAsync(string ipAddr, int taskID, int timeout = 1000)
{
List<UDPData>? data = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue) &&
!dataQueue.IsEmpty)
{
data = dataQueue.ToArray().ToList();
// logger.Debug($"Find UDP Data Array: {JsonConvert.SerializeObject(data)}");
break;
}
}
if (data is null)
{
logger.Trace("Get nothing even after time out");
return new(null);
}
else
{
return new(data);
}
}
/// <summary>
/// 异步获取指定IP和任务ID的数据队列长度
/// </summary>
/// <param name="ipAddr">IP地址</param>
/// <param name="taskID">任务ID</param>
/// <param name="timeout">超时时间</param>
/// <returns>数据队列长度</returns>
public async ValueTask<Optional<int>> GetDataCountAsync(string ipAddr, int taskID, int timeout = 1000)
{
int? count = null;
var startTime = DateTime.Now;
var isTimeout = false;
while (!isTimeout)
{
var elapsed = DateTime.Now - startTime;
isTimeout = elapsed >= TimeSpan.FromMilliseconds(timeout);
if (isTimeout) break;
if (udpData.TryGetValue($"{ipAddr}-{taskID}", out var dataQueue))
{
count = dataQueue.Count;
break;
}
}
if (count is null)
{
logger.Trace("Get nothing even after time out");
return Optional<int>.None;
}
else
{
return new(count.Value);
}
}
/// <summary>
/// 异步等待写响应
/// </summary>
/// <param name="address">IP地址</param>
/// <param name="taskID">[TODO:parameter]</param>
/// <param name="port">UDP 端口</param>
/// <param name="timeout">超时时间范围</param>
/// <returns>接收响应包</returns>
public async ValueTask<Result<WebProtocol.RecvRespPackage>> WaitForAckAsync
(string address, int taskID, int port = -1, int timeout = 1000)
{
var data = await FindDataAsync(address, taskID, timeout);
if (!data.HasValue)
return new(new Exception("Get None even after time out!"));
var recvData = data.Value;
if (recvData.Address != address || (port > 0 && recvData.Port != port))
return new(new Exception("Receive Data From Wrong Board!"));
var retPack = WebProtocol.RecvRespPackage.FromBytes(recvData.Data);
if (!retPack.IsSuccessful)
return new(new Exception("Not RecvDataPackage!", retPack.Error));
return retPack.Value;
}
/// <summary>
/// 异步等待数据
/// </summary>
/// <param name="address">IP地址</param>
/// <param name="taskID">[TODO:parameter]</param>
/// <param name="port">UDP 端口</param>
/// <param name="timeout">超时时间范围</param>
/// <returns>接收数据包</returns>
public async ValueTask<Result<RecvDataPackage>> WaitForDataAsync
(string address, int taskID, int port = -1, int timeout = 1000)
{
var data = await FindDataAsync(address, taskID, timeout);
if (!data.HasValue)
return new(new Exception("Get None even after time out!"));
var recvData = data.Value;
if (recvData.Address != address || (port >= 0 && recvData.Port != port))
return new(new Exception("Receive Data From Wrong Board!"));
var retPack = WebProtocol.RecvDataPackage.FromBytes(recvData.Data);
if (!retPack.IsSuccessful)
return new(new Exception("Not RecvDataPackage!", retPack.Error));
return retPack.Value;
}
private async Task ReceiveHandler(byte[] data, IPEndPoint endPoint, DateTime time)
{
// 异步锁保护 udpData
await Task.Run(() =>
{
// Handle RemoteEP
if (endPoint is null)
{
logger.Debug($"Receive Data from Unknown at {DateTime.Now.ToString()}:");
logger.Debug($" Original Data : {BitConverter.ToString(data).Replace("-", " ")}");
return;
}
var udpDataObj = RecordUDPData(data, endPoint, time, Convert.ToInt32(data[1]));
// PrintData(udpDataObj);
});
}
private UDPData RecordUDPData(byte[] bytes, IPEndPoint remoteEP, DateTime time, int taskID)
{
var remoteAddress = remoteEP.Address.ToString();
var remotePort = remoteEP.Port;
var data = new UDPData()
{
Address = remoteAddress,
Port = remotePort,
TaskID = taskID,
Data = bytes,
DateTime = time,
HasRead = false,
};
lock (udpData)
{
var key = $"{remoteAddress}-{taskID}";
var dataQueue = udpData.GetOrAdd(key, _ => new ConcurrentQueue<UDPData>());
dataQueue.Enqueue(data);
// 对队列进行一次按时间排序
if (dataQueue.Count > 0)
{
var sorted = dataQueue.OrderBy(d => d.DateTime).ToList();
udpData.TryUpdate(key, new ConcurrentQueue<UDPData>(sorted), dataQueue);
}
PrintAllData();
}
return data;
}
/// <summary>
/// 输出UDP Data到log中
/// </summary>
/// <param name="data">UDP数据</param>
public string PrintData(UDPData data)
{
var bytes = data.Data;
var sign = bytes[0];
string recvData = "";
if (sign == (byte)WebProtocol.PackSign.SendAddr)
{
var resData = WebProtocol.SendAddrPackage.FromBytes(bytes);
if (resData.IsSuccessful)
recvData = resData.Value.ToString();
else
recvData = resData.Error.ToString();
}
else if (sign == (byte)WebProtocol.PackSign.SendData) { }
else if (sign == (byte)WebProtocol.PackSign.RecvData)
{
var resData = WebProtocol.RecvDataPackage.FromBytes(bytes);
if (resData.IsSuccessful)
recvData = resData.Value.Options.ToString();
else
recvData = resData.Error.ToString();
}
else if (sign == (byte)WebProtocol.PackSign.RecvResp)
{
var resData = WebProtocol.RecvRespPackage.FromBytes(bytes);
if (resData.IsSuccessful)
recvData = resData.Value.Options.ToString();
else
recvData = resData.Error.ToString();
}
else
{
recvData = Encoding.ASCII.GetString(bytes, 0, bytes.Length);
}
// logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:");
// logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}");
// if (recvData.Length != 0) logger.Debug($" Decoded Data : {recvData}");
return $@"
Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:
Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}
Decoded Data : {recvData}
";
}
/// <summary>
/// 将所有数据输出到log中
/// </summary>
/// <returns> void </returns>
public void PrintAllData()
{
logger.Debug("Ready Data:");
foreach (var kvp in udpData)
{
foreach (var data in kvp.Value)
{
logger.Debug(PrintData(data));
}
}
}
/// <summary>
/// 清空指定IP地址的数据
/// </summary>
/// <param name="ipAddr">IP地址</param>
/// <param name="taskID">[TODO:parameter]</param>
/// <returns>无</returns>
public void ClearUDPData(string ipAddr, int taskID)
{
var key = $"{ipAddr}-{taskID}";
if (udpData.TryGetValue(key, out var dataQueue))
{
// 清空队列的最有效方式是替换为新的队列
udpData.TryUpdate(key, new ConcurrentQueue<UDPData>(), dataQueue);
}
}
/// <summary>
/// Start UDP Server
/// </summary>
/// <returns>None</returns>
public void Start()
{
this.isRunning = true;
try
{
foreach (var client in listeners)
{
Task.Run(async () =>
{
while (this.isRunning)
{
try
{
UdpReceiveResult result = await client.ReceiveAsync();
ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now);
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
});
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
this.isRunning = false;
}
}
/// <summary>
/// Close UDP Server
/// </summary>
/// <returns>None</returns>
public void Stop()
{
foreach (var item in listeners)
{
item.Close();
}
this.isRunning = false;
}
}