using Microsoft.AspNetCore.SignalR; using System.Collections.Concurrent; using DotNext; using Common; using server.Hubs; namespace server.Services; public class ProgressReporter : ProgressInfo, IProgress { private int _progress = 0; private int _stepProgress = 1; private int _expectedSteps = 100; private int _parentProportion = 100; public int Progress => _progress; public int MaxProgress { get; set; } = 100; public int StepProgress { get => _stepProgress; set { _stepProgress = value; _expectedSteps = MaxProgress / value; } } public int ExpectedSteps { get => _expectedSteps; set { _expectedSteps = value; MaxProgress = Number.IntPow(10, Number.GetLength(value)); _stepProgress = MaxProgress / value; } } public Func? ReporterFunc { get; set; } = null; public ProgressReporter? Parent { get; set; } public ProgressReporter? Child { get; set; } private ProgressStatus _status = ProgressStatus.Pending; private string _errorMessage; public string TaskId { get; set; } = Guid.NewGuid().ToString(); public int ProgressPercent => _progress * 100 / MaxProgress; public ProgressStatus Status => _status; public string ErrorMessage => _errorMessage; public ProgressReporter(Func? reporter = null, int initProgress = 0, int maxProgress = 100, int step = 1) { _progress = initProgress; MaxProgress = maxProgress; StepProgress = step; ReporterFunc = reporter; } public ProgressReporter(int parentProportion, int expectedSteps = 100, Func? reporter = null) { this._parentProportion = parentProportion; MaxProgress = Number.IntPow(10, Number.GetLength(expectedSteps)); StepProgress = MaxProgress / expectedSteps; ReporterFunc = reporter; } private async void ForceReport(int value) { try { if (ReporterFunc != null) await ReporterFunc(value); if (Parent != null) Parent.Increase((value - _progress) / StepProgress * _parentProportion / (MaxProgress / StepProgress)); _progress = value; } catch (OperationCanceledException ex) { _errorMessage = ex.Message; this._status = ProgressStatus.Canceled; } catch (Exception ex) { _errorMessage = ex.Message; this._status = ProgressStatus.Failed; } } public async void Report(int value) { if (this._status == ProgressStatus.Pending) this._status = ProgressStatus.InProgress; else if (this.Status != ProgressStatus.InProgress) return; if (value > MaxProgress) return; ForceReport(value); } public void Increase(int? value = null) { if (this._status == ProgressStatus.Pending) this._status = ProgressStatus.InProgress; else if (this.Status != ProgressStatus.InProgress) return; if (value.HasValue) { if (_progress + value.Value >= MaxProgress) return; this.Report(_progress + value.Value); } else { if (_progress + StepProgress >= MaxProgress) return; this.Report(_progress + StepProgress); } } public void Finish() { this._status = ProgressStatus.Completed; this.ForceReport(MaxProgress); } public void Cancel() { this._status = ProgressStatus.Canceled; this._errorMessage = "User Cancelled"; this.ForceReport(_progress); } public void Error(string message) { this._status = ProgressStatus.Failed; this._errorMessage = message; this.ForceReport(_progress); } public ProgressReporter CreateChild(int proportion, int expectedSteps = 100) { var child = new ProgressReporter(proportion, expectedSteps); child.Parent = this; this.Child = child; return child; } } public class ProgressTrackerService : BackgroundService { private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger(); private readonly ConcurrentDictionary _taskMap = new(); private readonly IHubContext _hubContext; private class TaskProgressInfo { public ProgressReporter Reporter { get; set; } public string? ConnectionId { get; set; } public required CancellationToken CancellationToken { get; set; } public required CancellationTokenSource CancellationTokenSource { get; set; } public required DateTime UpdatedAt { get; set; } } public ProgressTrackerService(IHubContext hubContext) { _hubContext = hubContext; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { var now = DateTime.UtcNow; foreach (var kvp in _taskMap) { var info = kvp.Value; // 超过 1 分钟且任务已完成/失败/取消 if ((now - info.UpdatedAt).TotalMinutes > 1 && (info.Reporter.Status == ProgressStatus.Completed || info.Reporter.Status == ProgressStatus.Failed || info.Reporter.Status == ProgressStatus.Canceled)) { _taskMap.TryRemove(kvp.Key, out _); logger.Info($"Cleaned up task {kvp.Key}"); } } } catch (Exception ex) { logger.Error(ex, "Error during ProgressTracker cleanup"); } await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken); } } public (string, ProgressReporter) CreateTask(CancellationToken? cancellationToken = null) { CancellationTokenSource? cancellationTokenSource; if (cancellationToken.HasValue) { cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken.Value); } else { cancellationTokenSource = new CancellationTokenSource(); } var progressInfo = new TaskProgressInfo { ConnectionId = null, UpdatedAt = DateTime.UtcNow, CancellationToken = cancellationTokenSource.Token, CancellationTokenSource = cancellationTokenSource, }; var progress = new ProgressReporter(async value => { cancellationTokenSource.Token.ThrowIfCancellationRequested(); // 通过 SignalR 推送进度 if (progressInfo.ConnectionId != null) await _hubContext.Clients.Client(progressInfo.ConnectionId).OnReceiveProgress(progressInfo.Reporter); }); progressInfo.Reporter = progress; _taskMap.TryAdd(progressInfo.Reporter.TaskId, progressInfo); return (progressInfo.Reporter.TaskId, progress); } public Optional GetReporter(string taskId) { if (_taskMap.TryGetValue(taskId, out var info)) { return info.Reporter; } return Optional.None; } public Optional GetProgressStatus(string taskId) { if (_taskMap.TryGetValue(taskId, out var info)) { return info.Reporter.Status; } return Optional.None; } public bool BindTask(string taskId, string connectionId) { if (_taskMap.TryGetValue(taskId, out var info) && info != null) { lock (info) { info.ConnectionId = connectionId; } return true; } return false; } public bool CancelTask(string taskId) { try { if (_taskMap.TryGetValue(taskId, out var info) && info != null) { lock (info) { info.CancellationTokenSource.Cancel(); info.Reporter.Cancel(); info.UpdatedAt = DateTime.UtcNow; } return true; } return false; } catch (Exception ex) { logger.Error(ex, $"Failed to cancel task {taskId}"); return false; } } }