using MW.WorkFlow; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace MW.WorkFlow { /// /// 负责执行工作流活动,并支持暂停、恢复、停止、重启和跳转,基于工作流定义和取消功能。 /// public class WorkflowEngine : IDisposable { private const string WorkflowNameContextKey = "WorkflowName"; private const string ParentActivityNameContextKey = "ParentActivityName"; private const string CurrentStepIdContextKey = "CurrentStepId"; private const string NextStepIdContextKey = "NextStepId"; private const string WorkflowRuntimeTrackerContextKey = "WorkflowRuntimeTracker"; private const string WorkflowFailureMessageContextKey = "WorkflowFailureMessage"; private const string WorkflowFailureMessageKeyContextKey = "WorkflowFailureMessageKey"; private const string WorkflowFailureMessageArgumentsContextKey = "WorkflowFailureMessageArguments"; private readonly WorkflowDefinition _workflowDefinition; private readonly WorkflowContext _context; private string _currentStepId; private volatile bool _isPausedRequested; // 外部请求暂停的标志 private TaskCompletionSource _currentPauseSignal; // 当前的暂停信号 private CancellationTokenSource _workflowCancellationTokenSource; private Task _workflowExecutionTask; private bool disposedValue; private readonly object _stateLock = new object(); // 用于保护共享状态的锁 // 当前工作流的状态 public WorkflowState CurrentState { get; private set; } = WorkflowState.Created; public bool IsWorkflowPaused => _isPausedRequested; public bool IsWorkflowFinished => _currentStepId == null; public bool IsWorkflowRunning => _workflowExecutionTask != null && !_workflowExecutionTask.IsCompleted; public string CurrentStepId => _currentStepId; public Exception LastException { get; private set; } public WorkflowEngine(WorkflowDefinition workflowDefinition, WorkflowContext context) { _workflowDefinition = workflowDefinition ?? throw new ArgumentNullException(nameof(workflowDefinition)); _context = context ?? throw new ArgumentNullException(nameof(context)); _currentStepId = workflowDefinition.InitialStepId; _isPausedRequested = false; _currentPauseSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _currentPauseSignal.SetResult(true); // 初始时已经完成 } // 启动工作流的执行,可选参数允许指定起始步骤 public void Start(string startStepId = null) { lock (_stateLock) { if (IsWorkflowRunning) { Debug.WriteLine("[WorkflowEngine] 工作流已经在运行中,无法重复启动。"); return; } // 核心修改逻辑:处理起始步骤 if (!string.IsNullOrEmpty(startStepId)) { // 验证步骤是否存在 if (!_workflowDefinition.ContainsStep(startStepId)) { throw new ArgumentException($"指定的起始步骤ID '{startStepId}' 不存在于工作流定义中。", nameof(startStepId)); } _currentStepId = startStepId; } else if (_currentStepId == null) { // 如果未指定且当前没有步骤(例如上次运行已结束),重置为初始步骤 _currentStepId = _workflowDefinition.InitialStepId; } LastException = null; UpdateWorkflowState(WorkflowState.Running); _isPausedRequested = false; _workflowCancellationTokenSource?.Dispose(); _workflowCancellationTokenSource = new CancellationTokenSource(); // 确保 _currentPauseSignal 处于已完成状态,以便工作流立即开始 if (!_currentPauseSignal.Task.IsCompleted) { _currentPauseSignal.TrySetResult(true); } // 使用 Task.Run(ExecuteWorkflowInternalAsync) 避免嵌套 Task _workflowExecutionTask = Task.Run(ExecuteWorkflowInternalAsync); // 注册仅在发生未处理异常时的续体以记录异常,避免未观察异常 _workflowExecutionTask.ContinueWith(t => { Debug.WriteLine($"[WorkflowEngine] 工作流任务发生未处理异常: {t.Exception?.Flatten().InnerException}"); }, TaskContinuationOptions.OnlyOnFaulted); Debug.WriteLine($"[WorkflowEngine] 工作流已启动,从步骤 '{_currentStepId}' 开始。"); } } // 异步执行工作流中的活动 private async Task ExecuteWorkflowInternalAsync() { Debug.WriteLine("\n[WorkflowEngine] 工作流执行开始。"); CancellationToken cancellationToken = _workflowCancellationTokenSource.Token; Func pauseCheck = async () => { cancellationToken.ThrowIfCancellationRequested(); if (_isPausedRequested) { Debug.WriteLine("[WorkflowEngine] 工作流已暂停,等待恢复..."); var pauseTask = _currentPauseSignal.Task; var cancelTask = Task.Delay(Timeout.Infinite, cancellationToken); var finished = await Task.WhenAny(pauseTask, cancelTask).ConfigureAwait(false); if (finished == cancelTask) { // 如果取消任务先完成,则抛出取消 cancellationToken.ThrowIfCancellationRequested(); } // 等待 pauseTask 以传播其可能的异常/取消 await pauseTask.ConfigureAwait(false); Debug.WriteLine("[WorkflowEngine] 工作流已恢复。"); } }; ActivityControl activityControl = new ActivityControl(cancellationToken, pauseCheck); WorkflowStep currentStep = null; try { while (_currentStepId != null) { cancellationToken.ThrowIfCancellationRequested(); await pauseCheck().ConfigureAwait(false); try { currentStep = _workflowDefinition.GetStep(_currentStepId); } catch (ArgumentException ex) { Debug.WriteLine($"[WorkflowEngine] 错误:无法找到步骤ID '{_currentStepId}'。工作流终止。原因: {ex.Message}"); HandleWorkflowFault(ex, currentStep); _currentStepId = null; break; } IActivity currentActivity = currentStep.Activity; _context.SetData(WorkflowFailureMessageContextKey, string.Empty); _context.SetData(WorkflowFailureMessageKeyContextKey, null); _context.SetData(WorkflowFailureMessageArgumentsContextKey, null); UpdateExecutionContext(currentStep); TrackExecutionPointer(currentStep, currentStep.Id); Debug.WriteLine($"\n[WorkflowEngine] 执行步骤: {currentStep.Id}, 活动: {currentActivity.Name}"); ActivityResult result = ActivityResult.Failure; try { result = await currentActivity.ExecuteAsync(_context, activityControl).ConfigureAwait(false); } catch (OperationCanceledException) { Debug.WriteLine($"[WorkflowEngine] 活动 '{currentActivity.Name}' 被取消。"); result = ActivityResult.Canceled; if (CurrentState != WorkflowState.Stopped) { UpdateWorkflowState(WorkflowState.Canceled); } break; } catch (Exception ex) { Debug.WriteLine($"[WorkflowEngine] 活动 '{currentActivity.Name}' 执行时发生未处理的异常: {ex.Message}"); result = ActivityResult.Failure; HandleWorkflowFault(ex, currentStep); break; } Debug.WriteLine($"[WorkflowEngine] 步骤 '{currentStep.Id}' 完成,结果: {result}"); // 如果状态已变为 Faulted (可能在上面的 catch 中没 break 住,或者其他逻辑导致),双重保险停止 if (CurrentState == WorkflowState.Faulted || CurrentState == WorkflowState.Canceled) { break; } if (result == ActivityResult.Failure) { string failureMessage = ResolveWorkflowFailureMessage(currentActivity.Name); Debug.WriteLine($"[WorkflowEngine] {failureMessage}"); HandleWorkflowFailure(failureMessage, currentStep); break; } string nextStepCandidateId = null; foreach (var condition in currentStep.JumpConditions) { if (condition.Condition(_context, result)) { nextStepCandidateId = condition.ResolveTargetStepId(_context, result); Debug.WriteLine($"[WorkflowEngine] 条件跳转满足,跳转到步骤 '{nextStepCandidateId}'。"); break; } } if (nextStepCandidateId == null && !string.IsNullOrEmpty(currentStep.NextStepId)) { nextStepCandidateId = currentStep.NextStepId; Debug.WriteLine($"[WorkflowEngine] 默认跳转,跳转到步骤 '{nextStepCandidateId}'。"); } if (!string.IsNullOrWhiteSpace(nextStepCandidateId) && !_workflowDefinition.ContainsStep(nextStepCandidateId)) { throw new InvalidOperationException($"步骤 '{currentStep.Id}' 解析出的目标步骤 '{nextStepCandidateId}' 不存在于工作流定义中。"); } TrackExecutionPointer(currentStep, nextStepCandidateId); lock (_stateLock) { if (_currentStepId == currentStep.Id) { _currentStepId = nextStepCandidateId; } else { Debug.WriteLine($"[WorkflowEngine] 外部已修改当前步骤,跳过内部跳转逻辑。"); } } if (_currentStepId == null) { UpdateWorkflowState(WorkflowState.Completed); Debug.WriteLine($"[WorkflowEngine] 工作流已完成。"); } } } catch (OperationCanceledException) { Debug.WriteLine("[WorkflowEngine] 工作流执行被取消。"); if (CurrentState != WorkflowState.Stopped) { UpdateWorkflowState(WorkflowState.Canceled); } } catch (Exception ex) { Debug.WriteLine($"[WorkflowEngine] 工作流执行过程中发生未处理的异常: {ex.Message}"); HandleWorkflowFault(ex, currentStep); } finally { Debug.WriteLine("[WorkflowEngine] 工作流执行结束。"); lock (_stateLock) { _workflowExecutionTask = null; _currentStepId = null; _workflowCancellationTokenSource?.Dispose(); _workflowCancellationTokenSource = null; } } } // 更新工作流状态,并记录日志 private void UpdateWorkflowState(WorkflowState newState) { if (CurrentState != newState) { CurrentState = newState; Debug.WriteLine($"[WorkflowEngine] 工作流状态已更改: {CurrentState}"); } } private void HandleWorkflowFault(Exception ex, WorkflowStep currentStep) { LastException = ex; UpdateWorkflowState(WorkflowState.Faulted); if (_context.TryGetData(WorkflowRuntimeTrackerContextKey, out var tracker) && tracker != null) { _context.TryGetData(WorkflowNameContextKey, out var workflowName); _context.TryGetData(ParentActivityNameContextKey, out var flowNameFromContext); _context.TryGetData(CurrentStepIdContextKey, out var currentStepIdFromContext); tracker.ReportWorkflowFault(new WorkflowFaultInfo { WorkflowName = workflowName, FlowName = currentStep?.FlowName ?? flowNameFromContext, ActivityName = currentStep?.Activity?.Name, CurrentStepId = currentStep?.Id ?? currentStepIdFromContext ?? _currentStepId, ErrorMessage = ex?.Message, OccurredAt = DateTime.Now }); } } private void HandleWorkflowFailure(string errorMessage, WorkflowStep currentStep) { LastException = new InvalidOperationException(errorMessage); UpdateWorkflowState(WorkflowState.Faulted); if (_context.TryGetData(WorkflowRuntimeTrackerContextKey, out var tracker) && tracker != null) { _context.TryGetData(WorkflowNameContextKey, out var workflowName); _context.TryGetData(ParentActivityNameContextKey, out var flowNameFromContext); _context.TryGetData(CurrentStepIdContextKey, out var currentStepIdFromContext); tracker.ReportWorkflowFault(new WorkflowFaultInfo { WorkflowName = workflowName, FlowName = currentStep?.FlowName ?? flowNameFromContext, ActivityName = currentStep?.Activity?.Name, CurrentStepId = currentStep?.Id ?? currentStepIdFromContext ?? _currentStepId, ErrorMessage = errorMessage, OccurredAt = DateTime.Now }); } } private string ResolveWorkflowFailureMessage(string activityName) { string failureMessage; if (_context.TryGetData(WorkflowFailureMessageContextKey, out failureMessage) && !string.IsNullOrWhiteSpace(failureMessage)) { return failureMessage; } return $"活动 '{activityName}' 返回 Failure,工作流按失败结束。"; } private void UpdateExecutionContext(WorkflowStep currentStep) { _context.SetData(CurrentStepIdContextKey, currentStep.Id); _context.SetData(NextStepIdContextKey, currentStep.Id); _context.SetData(ParentActivityNameContextKey, currentStep.FlowName ?? string.Empty); } private void TrackExecutionPointer(WorkflowStep currentStep, string nextStepId) { _context.SetData(NextStepIdContextKey, nextStepId); if (_context.TryGetData(WorkflowRuntimeTrackerContextKey, out var tracker) && tracker != null) { _context.TryGetData(WorkflowNameContextKey, out var workflowName); tracker.UpdateExecutionPointer(new WorkflowExecutionPointer { WorkflowName = workflowName, FlowName = currentStep?.FlowName, ActivityName = currentStep?.Activity?.Name, CurrentStepId = currentStep?.Id, NextStepId = nextStepId, UpdatedAt = DateTime.Now }); } } // 暂停工作流 public void Pause() { lock (_stateLock) { if (!IsWorkflowRunning) { Debug.WriteLine("[WorkflowEngine] 工作流未运行,无法暂停。"); return; } if (_isPausedRequested) { Debug.WriteLine("[WorkflowEngine] 工作流已处于暂停状态。"); return; } UpdateWorkflowState(WorkflowState.Paused); _isPausedRequested = true; if (_currentPauseSignal.Task.IsCompleted) { _currentPauseSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } Debug.WriteLine("[WorkflowEngine] 工作流已请求暂停。"); } } // 恢复工作流 public void Resume() { lock (_stateLock) { if (_isPausedRequested) { _currentPauseSignal.TrySetResult(true); _isPausedRequested = false; UpdateWorkflowState(WorkflowState.Running); Debug.WriteLine("[WorkflowEngine] 工作流已恢复。"); } else { Debug.WriteLine("[WorkflowEngine] 工作流未暂停,无需恢复。"); } } } // 停止工作流 public async Task StopAsync() { Task taskToAwait = null; lock (_stateLock) { if (!IsWorkflowRunning) { Debug.WriteLine("[WorkflowEngine] 工作流未运行,无法停止。"); return; } if (_workflowCancellationTokenSource?.IsCancellationRequested == true) { taskToAwait = _workflowExecutionTask; } else { Debug.WriteLine("[WorkflowEngine] 已请求停止工作流。"); UpdateWorkflowState(WorkflowState.Stopped); _workflowCancellationTokenSource?.Cancel(); if (_isPausedRequested) { Resume(); } taskToAwait = _workflowExecutionTask; } } if (taskToAwait != null) { try { await taskToAwait.ConfigureAwait(false); } catch (OperationCanceledException) { Debug.WriteLine("[WorkflowEngine] 工作流任务被取消。"); } catch (Exception ex) { Debug.WriteLine($"[WorkflowEngine] 停止工作流时发生异常: {ex.Message}"); } } lock (_stateLock) { _workflowExecutionTask = null; _currentStepId = null; _workflowCancellationTokenSource?.Dispose(); _workflowCancellationTokenSource = null; _isPausedRequested = false; Debug.WriteLine("[WorkflowEngine] 工作流已停止。"); } } // 重启工作流 public async Task RestartAsync(string startFromStepId = null, bool clearContext = true) { await StopAsync().ConfigureAwait(false); lock (_stateLock) { if (clearContext) { _context.Clear(); } _currentStepId = startFromStepId ?? _workflowDefinition.InitialStepId; } Start(); } // 跳转到指定步骤 public void JumpTo(string stepId) { lock (_stateLock) { if (!_workflowDefinition.GetAllStepIds().Contains(stepId)) { throw new ArgumentException($"步骤ID '{stepId}' 不存在。", nameof(stepId)); } _currentStepId = stepId; var step = _workflowDefinition.GetStep(stepId); UpdateExecutionContext(step); TrackExecutionPointer(step, stepId); Debug.WriteLine($"[WorkflowEngine] 跳转到步骤 '{stepId}'。"); if (IsWorkflowPaused) { Resume(); } } } // 将异步 Dispose 改为同步;在 .NET Framework 中不要在 Dispose 中使用 async void/Task 未等待 protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { if (IsWorkflowRunning) { // 同步等待 StopAsync 完成以确保资源清理 StopAsync().GetAwaiter().GetResult(); } } disposedValue = true; } } public void Dispose() { Dispose(disposing: true); GC.SuppressFinalize(this); } } }