feat: 更新通信协议
This commit is contained in:
@@ -4,6 +4,7 @@ using System.Net.NetworkInformation; // 添加这个引用
|
||||
using System.Net.Sockets;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
using Common;
|
||||
using DotNext;
|
||||
using Newtonsoft.Json;
|
||||
using WebProtocol;
|
||||
@@ -16,6 +17,10 @@ public class UDPData
|
||||
/// </summary>
|
||||
public required DateTime DateTime { get; set; }
|
||||
/// <summary>
|
||||
/// 数据包时间戳
|
||||
/// </summary>
|
||||
public required UInt32 Timestamp { get; set; }
|
||||
/// <summary>
|
||||
/// 发送来源的IP地址
|
||||
/// </summary>
|
||||
public required string Address { get; set; }
|
||||
@@ -48,6 +53,7 @@ public class UDPData
|
||||
return new UDPData()
|
||||
{
|
||||
DateTime = this.DateTime,
|
||||
Timestamp = this.Timestamp,
|
||||
Address = new string(this.Address),
|
||||
Port = this.Port,
|
||||
TaskID = this.TaskID,
|
||||
@@ -69,24 +75,26 @@ public class UDPData
|
||||
/// <summary>
|
||||
/// UDP 服务器
|
||||
/// </summary>
|
||||
public class UDPServer : IDisposable
|
||||
public class UDPServer
|
||||
{
|
||||
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||||
|
||||
private ConcurrentDictionary<string, SortedList<DateTime, UDPData>> udpData = new ConcurrentDictionary<string, SortedList<DateTime, UDPData>>();
|
||||
private ConcurrentDictionary<string, SortedList<UInt32, UDPData>> udpData
|
||||
= new ConcurrentDictionary<string, SortedList<UInt32, UDPData>>();
|
||||
private readonly ReaderWriterLockSlim udpDataLock = new ReaderWriterLockSlim();
|
||||
|
||||
private int listenPort;
|
||||
private List<UdpClient> listeners = new List<UdpClient>();
|
||||
private List<Task> tasks = new List<Task>();
|
||||
private IPEndPoint groupEP;
|
||||
|
||||
private bool isRunning = false;
|
||||
private CancellationTokenSource? cancellationTokenSource;
|
||||
private bool disposed = false;
|
||||
|
||||
/// <summary>
|
||||
/// 是否正在工作
|
||||
/// </summary>
|
||||
public bool IsRunning { get { return isRunning; } }
|
||||
public bool IsRunning => cancellationTokenSource?.Token.IsCancellationRequested == false;
|
||||
|
||||
/// <summary> UDP 服务器的错误代码 </summary>
|
||||
public enum ErrorCode
|
||||
@@ -415,27 +423,29 @@ public class UDPServer : IDisposable
|
||||
var remotePort = remoteEP.Port;
|
||||
var data = new UDPData()
|
||||
{
|
||||
DateTime = time,
|
||||
Timestamp = Number.BytesToUInt32(bytes[..4]).Value,
|
||||
Address = remoteAddress,
|
||||
Port = remotePort,
|
||||
TaskID = taskID,
|
||||
Data = bytes,
|
||||
DateTime = time,
|
||||
HasRead = false,
|
||||
};
|
||||
|
||||
var key = $"{remoteAddress}-{taskID}";
|
||||
|
||||
udpDataLock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var key = $"{remoteAddress}-{taskID}";
|
||||
var sortedList = udpData.GetOrAdd(key, _ => new SortedList<DateTime, UDPData>());
|
||||
var sortedList = udpData.GetOrAdd(key, _ => new SortedList<UInt32, UDPData>());
|
||||
|
||||
// 处理相同时间戳的情况,添加微小的时间差
|
||||
var uniqueTime = time;
|
||||
var uniqueTime = data.Timestamp;
|
||||
while (sortedList.ContainsKey(uniqueTime))
|
||||
{
|
||||
logger.Warn(
|
||||
$"Duplicate timestamp detected for {remoteAddress}:{remotePort} at {uniqueTime}.");
|
||||
uniqueTime = uniqueTime.AddTicks(1);
|
||||
uniqueTime += 1;
|
||||
}
|
||||
|
||||
sortedList.Add(uniqueTime, data);
|
||||
@@ -491,7 +501,7 @@ public class UDPServer : IDisposable
|
||||
recvData = Encoding.ASCII.GetString(bytes, 0, bytes.Length);
|
||||
}
|
||||
|
||||
logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()}:");
|
||||
logger.Debug($"Receive Data from {data.Address}:{data.Port} at {data.DateTime.ToString()} - {data.Timestamp}:");
|
||||
logger.Debug($" Original Data : {BitConverter.ToString(bytes).Replace("-", " ")}");
|
||||
if (recvData.Length != 0) logger.Debug($" Decoded Data : {recvData}");
|
||||
return $@"
|
||||
@@ -580,8 +590,7 @@ public class UDPServer : IDisposable
|
||||
return;
|
||||
}
|
||||
|
||||
// 异步执行ARP刷新,避免阻塞主线程
|
||||
Task.Run(() => ExecuteArpFlush(ipAddr));
|
||||
ExecuteArpFlush(ipAddr);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -633,7 +642,7 @@ public class UDPServer : IDisposable
|
||||
/// 执行ARP刷新流程:先删除ARP条目,再用ping重新刷新
|
||||
/// </summary>
|
||||
/// <param name="ipAddr">目标IP地址</param>
|
||||
private async void ExecuteArpFlush(string ipAddr)
|
||||
private void ExecuteArpFlush(string ipAddr)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -806,33 +815,57 @@ public class UDPServer : IDisposable
|
||||
/// <returns>None</returns>
|
||||
public void Start()
|
||||
{
|
||||
this.isRunning = true;
|
||||
if (cancellationTokenSource != null && !cancellationTokenSource.Token.IsCancellationRequested)
|
||||
{
|
||||
logger.Warn("UDP Server is already running");
|
||||
return;
|
||||
}
|
||||
|
||||
cancellationTokenSource = new CancellationTokenSource();
|
||||
var cancellationToken = cancellationTokenSource.Token;
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var client in listeners)
|
||||
{
|
||||
Task.Run(() =>
|
||||
tasks.Add(Task.Run(async () =>
|
||||
{
|
||||
while (this.isRunning)
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var ep = new IPEndPoint(IPAddress.Any, listenPort);
|
||||
var result = client.Receive(ref ep);
|
||||
_ = ReceiveHandler(result, ep, DateTime.Now);
|
||||
// 使用 CancellationToken 来取消接收操作
|
||||
var result = await client.ReceiveAsync(cancellationToken);
|
||||
_ = ReceiveHandler(result.Buffer, result.RemoteEndPoint, DateTime.Now);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
logger.Debug("UDP receive operation was cancelled");
|
||||
break;
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
logger.Debug("UDP client was disposed");
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"Error: {ex.Message}");
|
||||
if (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
logger.Error($"Error in UDP receive: {ex.Message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}, cancellationToken));
|
||||
}
|
||||
|
||||
logger.Info("UDP Server started successfully");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Console.WriteLine(e.ToString());
|
||||
this.isRunning = false;
|
||||
logger.Error($"Failed to start UDP server: {e}");
|
||||
cancellationTokenSource?.Cancel();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -842,39 +875,71 @@ public class UDPServer : IDisposable
|
||||
/// <returns>None</returns>
|
||||
public void Stop()
|
||||
{
|
||||
foreach (var item in listeners)
|
||||
if (cancellationTokenSource == null || cancellationTokenSource.Token.IsCancellationRequested)
|
||||
{
|
||||
item.Close();
|
||||
logger.Warn("UDP Server is not running or already stopped");
|
||||
return;
|
||||
}
|
||||
this.isRunning = false;
|
||||
|
||||
try
|
||||
{
|
||||
logger.Info("Stopping UDP Server...");
|
||||
|
||||
// 取消所有操作
|
||||
cancellationTokenSource.Cancel();
|
||||
|
||||
// 等待所有任务完成,设置超时时间
|
||||
var waitTasks = Task.WhenAll(tasks);
|
||||
if (!waitTasks.Wait(TimeSpan.FromSeconds(5)))
|
||||
{
|
||||
logger.Warn("Some tasks did not complete within timeout period");
|
||||
}
|
||||
|
||||
// 关闭所有UDP客户端
|
||||
foreach (var client in listeners)
|
||||
{
|
||||
try
|
||||
{
|
||||
client.Close();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Warn($"Error closing UDP client: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
// 清理任务列表
|
||||
tasks.Clear();
|
||||
|
||||
logger.Info("UDP Server stopped successfully");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Error($"Error stopping UDP server: {ex.Message}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
cancellationTokenSource?.Dispose();
|
||||
cancellationTokenSource = null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 释放资源
|
||||
/// 实现IDisposable接口,确保资源正确释放
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
protected virtual void Dispose(bool disposing)
|
||||
{
|
||||
if (!disposed)
|
||||
{
|
||||
if (disposing)
|
||||
Stop();
|
||||
|
||||
foreach (var client in listeners)
|
||||
{
|
||||
Stop();
|
||||
udpDataLock?.Dispose();
|
||||
client?.Dispose();
|
||||
}
|
||||
|
||||
udpDataLock?.Dispose();
|
||||
disposed = true;
|
||||
}
|
||||
}
|
||||
|
||||
~UDPServer()
|
||||
{
|
||||
Dispose(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user