Files
test_demo/MX-PD-盘古 - new/PanGu.DieBonderApp/Mw.WorkFlow/WorkflowEngine.cs
Shi.Ji e31d3560bb 添加 MX-PD-盘古 项目文件
将 MX-PD-盘古 - new 目录下的所有文件添加到主仓库
2026-05-18 11:43:09 +08:00

530 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using MW.WorkFlow;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace MW.WorkFlow
{
/// <summary>
/// 负责执行工作流活动,并支持暂停、恢复、停止、重启和跳转,基于工作流定义和取消功能。
/// </summary>
public class WorkflowEngine : IDisposable
{
private const string WorkflowNameContextKey = "WorkflowName";
private const string ParentActivityNameContextKey = "ParentActivityName";
private const string CurrentStepIdContextKey = "CurrentStepId";
private const string NextStepIdContextKey = "NextStepId";
private const string WorkflowRuntimeTrackerContextKey = "WorkflowRuntimeTracker";
private const string WorkflowFailureMessageContextKey = "WorkflowFailureMessage";
private const string WorkflowFailureMessageKeyContextKey = "WorkflowFailureMessageKey";
private const string WorkflowFailureMessageArgumentsContextKey = "WorkflowFailureMessageArguments";
private readonly WorkflowDefinition _workflowDefinition;
private readonly WorkflowContext _context;
private string _currentStepId;
private volatile bool _isPausedRequested; // 外部请求暂停的标志
private TaskCompletionSource<bool> _currentPauseSignal; // 当前的暂停信号
private CancellationTokenSource _workflowCancellationTokenSource;
private Task _workflowExecutionTask;
private bool disposedValue;
private readonly object _stateLock = new object(); // 用于保护共享状态的锁
// 当前工作流的状态
public WorkflowState CurrentState { get; private set; } = WorkflowState.Created;
public bool IsWorkflowPaused => _isPausedRequested;
public bool IsWorkflowFinished => _currentStepId == null;
public bool IsWorkflowRunning => _workflowExecutionTask != null && !_workflowExecutionTask.IsCompleted;
public string CurrentStepId => _currentStepId;
public Exception LastException { get; private set; }
public WorkflowEngine(WorkflowDefinition workflowDefinition, WorkflowContext context)
{
_workflowDefinition = workflowDefinition ?? throw new ArgumentNullException(nameof(workflowDefinition));
_context = context ?? throw new ArgumentNullException(nameof(context));
_currentStepId = workflowDefinition.InitialStepId;
_isPausedRequested = false;
_currentPauseSignal = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_currentPauseSignal.SetResult(true); // 初始时已经完成
}
// 启动工作流的执行,可选参数允许指定起始步骤
public void Start(string startStepId = null)
{
lock (_stateLock)
{
if (IsWorkflowRunning)
{
Debug.WriteLine("[WorkflowEngine] 工作流已经在运行中,无法重复启动。");
return;
}
// 核心修改逻辑:处理起始步骤
if (!string.IsNullOrEmpty(startStepId))
{
// 验证步骤是否存在
if (!_workflowDefinition.ContainsStep(startStepId))
{
throw new ArgumentException($"指定的起始步骤ID '{startStepId}' 不存在于工作流定义中。", nameof(startStepId));
}
_currentStepId = startStepId;
}
else if (_currentStepId == null)
{
// 如果未指定且当前没有步骤(例如上次运行已结束),重置为初始步骤
_currentStepId = _workflowDefinition.InitialStepId;
}
LastException = null;
UpdateWorkflowState(WorkflowState.Running);
_isPausedRequested = false;
_workflowCancellationTokenSource?.Dispose();
_workflowCancellationTokenSource = new CancellationTokenSource();
// 确保 _currentPauseSignal 处于已完成状态,以便工作流立即开始
if (!_currentPauseSignal.Task.IsCompleted)
{
_currentPauseSignal.TrySetResult(true);
}
// 使用 Task.Run(ExecuteWorkflowInternalAsync) 避免嵌套 Task<Task>
_workflowExecutionTask = Task.Run(ExecuteWorkflowInternalAsync);
// 注册仅在发生未处理异常时的续体以记录异常,避免未观察异常
_workflowExecutionTask.ContinueWith(t =>
{
Debug.WriteLine($"[WorkflowEngine] 工作流任务发生未处理异常: {t.Exception?.Flatten().InnerException}");
}, TaskContinuationOptions.OnlyOnFaulted);
Debug.WriteLine($"[WorkflowEngine] 工作流已启动,从步骤 '{_currentStepId}' 开始。");
}
}
// 异步执行工作流中的活动
private async Task ExecuteWorkflowInternalAsync()
{
Debug.WriteLine("\n[WorkflowEngine] 工作流执行开始。");
CancellationToken cancellationToken = _workflowCancellationTokenSource.Token;
Func<Task> pauseCheck = async () =>
{
cancellationToken.ThrowIfCancellationRequested();
if (_isPausedRequested)
{
Debug.WriteLine("[WorkflowEngine] 工作流已暂停,等待恢复...");
var pauseTask = _currentPauseSignal.Task;
var cancelTask = Task.Delay(Timeout.Infinite, cancellationToken);
var finished = await Task.WhenAny(pauseTask, cancelTask).ConfigureAwait(false);
if (finished == cancelTask)
{
// 如果取消任务先完成,则抛出取消
cancellationToken.ThrowIfCancellationRequested();
}
// 等待 pauseTask 以传播其可能的异常/取消
await pauseTask.ConfigureAwait(false);
Debug.WriteLine("[WorkflowEngine] 工作流已恢复。");
}
};
ActivityControl activityControl = new ActivityControl(cancellationToken, pauseCheck);
WorkflowStep currentStep = null;
try
{
while (_currentStepId != null)
{
cancellationToken.ThrowIfCancellationRequested();
await pauseCheck().ConfigureAwait(false);
try
{
currentStep = _workflowDefinition.GetStep(_currentStepId);
}
catch (ArgumentException ex)
{
Debug.WriteLine($"[WorkflowEngine] 错误无法找到步骤ID '{_currentStepId}'。工作流终止。原因: {ex.Message}");
HandleWorkflowFault(ex, currentStep);
_currentStepId = null;
break;
}
IActivity currentActivity = currentStep.Activity;
_context.SetData(WorkflowFailureMessageContextKey, string.Empty);
_context.SetData<object>(WorkflowFailureMessageKeyContextKey, null);
_context.SetData<object[]>(WorkflowFailureMessageArgumentsContextKey, null);
UpdateExecutionContext(currentStep);
TrackExecutionPointer(currentStep, currentStep.Id);
Debug.WriteLine($"\n[WorkflowEngine] 执行步骤: {currentStep.Id}, 活动: {currentActivity.Name}");
ActivityResult result = ActivityResult.Failure;
try
{
result = await currentActivity.ExecuteAsync(_context, activityControl).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
Debug.WriteLine($"[WorkflowEngine] 活动 '{currentActivity.Name}' 被取消。");
result = ActivityResult.Canceled;
if (CurrentState != WorkflowState.Stopped)
{
UpdateWorkflowState(WorkflowState.Canceled);
}
break;
}
catch (Exception ex)
{
Debug.WriteLine($"[WorkflowEngine] 活动 '{currentActivity.Name}' 执行时发生未处理的异常: {ex.Message}");
result = ActivityResult.Failure;
HandleWorkflowFault(ex, currentStep);
break;
}
Debug.WriteLine($"[WorkflowEngine] 步骤 '{currentStep.Id}' 完成,结果: {result}");
// 如果状态已变为 Faulted (可能在上面的 catch 中没 break 住,或者其他逻辑导致),双重保险停止
if (CurrentState == WorkflowState.Faulted || CurrentState == WorkflowState.Canceled)
{
break;
}
if (result == ActivityResult.Failure)
{
string failureMessage = ResolveWorkflowFailureMessage(currentActivity.Name);
Debug.WriteLine($"[WorkflowEngine] {failureMessage}");
HandleWorkflowFailure(failureMessage, currentStep);
break;
}
string nextStepCandidateId = null;
foreach (var condition in currentStep.JumpConditions)
{
if (condition.Condition(_context, result))
{
nextStepCandidateId = condition.ResolveTargetStepId(_context, result);
Debug.WriteLine($"[WorkflowEngine] 条件跳转满足,跳转到步骤 '{nextStepCandidateId}'。");
break;
}
}
if (nextStepCandidateId == null && !string.IsNullOrEmpty(currentStep.NextStepId))
{
nextStepCandidateId = currentStep.NextStepId;
Debug.WriteLine($"[WorkflowEngine] 默认跳转,跳转到步骤 '{nextStepCandidateId}'。");
}
if (!string.IsNullOrWhiteSpace(nextStepCandidateId) && !_workflowDefinition.ContainsStep(nextStepCandidateId))
{
throw new InvalidOperationException($"步骤 '{currentStep.Id}' 解析出的目标步骤 '{nextStepCandidateId}' 不存在于工作流定义中。");
}
TrackExecutionPointer(currentStep, nextStepCandidateId);
lock (_stateLock)
{
if (_currentStepId == currentStep.Id)
{
_currentStepId = nextStepCandidateId;
}
else
{
Debug.WriteLine($"[WorkflowEngine] 外部已修改当前步骤,跳过内部跳转逻辑。");
}
}
if (_currentStepId == null)
{
UpdateWorkflowState(WorkflowState.Completed);
Debug.WriteLine($"[WorkflowEngine] 工作流已完成。");
}
}
}
catch (OperationCanceledException)
{
Debug.WriteLine("[WorkflowEngine] 工作流执行被取消。");
if (CurrentState != WorkflowState.Stopped)
{
UpdateWorkflowState(WorkflowState.Canceled);
}
}
catch (Exception ex)
{
Debug.WriteLine($"[WorkflowEngine] 工作流执行过程中发生未处理的异常: {ex.Message}");
HandleWorkflowFault(ex, currentStep);
}
finally
{
Debug.WriteLine("[WorkflowEngine] 工作流执行结束。");
lock (_stateLock)
{
_workflowExecutionTask = null;
_currentStepId = null;
_workflowCancellationTokenSource?.Dispose();
_workflowCancellationTokenSource = null;
}
}
}
// 更新工作流状态,并记录日志
private void UpdateWorkflowState(WorkflowState newState)
{
if (CurrentState != newState)
{
CurrentState = newState;
Debug.WriteLine($"[WorkflowEngine] 工作流状态已更改: {CurrentState}");
}
}
private void HandleWorkflowFault(Exception ex, WorkflowStep currentStep)
{
LastException = ex;
UpdateWorkflowState(WorkflowState.Faulted);
if (_context.TryGetData<IWorkflowRuntimeTracker>(WorkflowRuntimeTrackerContextKey, out var tracker) && tracker != null)
{
_context.TryGetData<string>(WorkflowNameContextKey, out var workflowName);
_context.TryGetData<string>(ParentActivityNameContextKey, out var flowNameFromContext);
_context.TryGetData<string>(CurrentStepIdContextKey, out var currentStepIdFromContext);
tracker.ReportWorkflowFault(new WorkflowFaultInfo
{
WorkflowName = workflowName,
FlowName = currentStep?.FlowName ?? flowNameFromContext,
ActivityName = currentStep?.Activity?.Name,
CurrentStepId = currentStep?.Id ?? currentStepIdFromContext ?? _currentStepId,
ErrorMessage = ex?.Message,
OccurredAt = DateTime.Now
});
}
}
private void HandleWorkflowFailure(string errorMessage, WorkflowStep currentStep)
{
LastException = new InvalidOperationException(errorMessage);
UpdateWorkflowState(WorkflowState.Faulted);
if (_context.TryGetData<IWorkflowRuntimeTracker>(WorkflowRuntimeTrackerContextKey, out var tracker) && tracker != null)
{
_context.TryGetData<string>(WorkflowNameContextKey, out var workflowName);
_context.TryGetData<string>(ParentActivityNameContextKey, out var flowNameFromContext);
_context.TryGetData<string>(CurrentStepIdContextKey, out var currentStepIdFromContext);
tracker.ReportWorkflowFault(new WorkflowFaultInfo
{
WorkflowName = workflowName,
FlowName = currentStep?.FlowName ?? flowNameFromContext,
ActivityName = currentStep?.Activity?.Name,
CurrentStepId = currentStep?.Id ?? currentStepIdFromContext ?? _currentStepId,
ErrorMessage = errorMessage,
OccurredAt = DateTime.Now
});
}
}
private string ResolveWorkflowFailureMessage(string activityName)
{
string failureMessage;
if (_context.TryGetData<string>(WorkflowFailureMessageContextKey, out failureMessage) &&
!string.IsNullOrWhiteSpace(failureMessage))
{
return failureMessage;
}
return $"活动 '{activityName}' 返回 Failure工作流按失败结束。";
}
private void UpdateExecutionContext(WorkflowStep currentStep)
{
_context.SetData(CurrentStepIdContextKey, currentStep.Id);
_context.SetData(NextStepIdContextKey, currentStep.Id);
_context.SetData(ParentActivityNameContextKey, currentStep.FlowName ?? string.Empty);
}
private void TrackExecutionPointer(WorkflowStep currentStep, string nextStepId)
{
_context.SetData(NextStepIdContextKey, nextStepId);
if (_context.TryGetData<IWorkflowRuntimeTracker>(WorkflowRuntimeTrackerContextKey, out var tracker) && tracker != null)
{
_context.TryGetData<string>(WorkflowNameContextKey, out var workflowName);
tracker.UpdateExecutionPointer(new WorkflowExecutionPointer
{
WorkflowName = workflowName,
FlowName = currentStep?.FlowName,
ActivityName = currentStep?.Activity?.Name,
CurrentStepId = currentStep?.Id,
NextStepId = nextStepId,
UpdatedAt = DateTime.Now
});
}
}
// 暂停工作流
public void Pause()
{
lock (_stateLock)
{
if (!IsWorkflowRunning)
{
Debug.WriteLine("[WorkflowEngine] 工作流未运行,无法暂停。");
return;
}
if (_isPausedRequested)
{
Debug.WriteLine("[WorkflowEngine] 工作流已处于暂停状态。");
return;
}
UpdateWorkflowState(WorkflowState.Paused);
_isPausedRequested = true;
if (_currentPauseSignal.Task.IsCompleted)
{
_currentPauseSignal = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
Debug.WriteLine("[WorkflowEngine] 工作流已请求暂停。");
}
}
// 恢复工作流
public void Resume()
{
lock (_stateLock)
{
if (_isPausedRequested)
{
_currentPauseSignal.TrySetResult(true);
_isPausedRequested = false;
UpdateWorkflowState(WorkflowState.Running);
Debug.WriteLine("[WorkflowEngine] 工作流已恢复。");
}
else
{
Debug.WriteLine("[WorkflowEngine] 工作流未暂停,无需恢复。");
}
}
}
// 停止工作流
public async Task StopAsync()
{
Task taskToAwait = null;
lock (_stateLock)
{
if (!IsWorkflowRunning)
{
Debug.WriteLine("[WorkflowEngine] 工作流未运行,无法停止。");
return;
}
if (_workflowCancellationTokenSource?.IsCancellationRequested == true)
{
taskToAwait = _workflowExecutionTask;
}
else
{
Debug.WriteLine("[WorkflowEngine] 已请求停止工作流。");
UpdateWorkflowState(WorkflowState.Stopped);
_workflowCancellationTokenSource?.Cancel();
if (_isPausedRequested)
{
Resume();
}
taskToAwait = _workflowExecutionTask;
}
}
if (taskToAwait != null)
{
try
{
await taskToAwait.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
Debug.WriteLine("[WorkflowEngine] 工作流任务被取消。");
}
catch (Exception ex)
{
Debug.WriteLine($"[WorkflowEngine] 停止工作流时发生异常: {ex.Message}");
}
}
lock (_stateLock)
{
_workflowExecutionTask = null;
_currentStepId = null;
_workflowCancellationTokenSource?.Dispose();
_workflowCancellationTokenSource = null;
_isPausedRequested = false;
Debug.WriteLine("[WorkflowEngine] 工作流已停止。");
}
}
// 重启工作流
public async Task RestartAsync(string startFromStepId = null, bool clearContext = true)
{
await StopAsync().ConfigureAwait(false);
lock (_stateLock)
{
if (clearContext)
{
_context.Clear();
}
_currentStepId = startFromStepId ?? _workflowDefinition.InitialStepId;
}
Start();
}
// 跳转到指定步骤
public void JumpTo(string stepId)
{
lock (_stateLock)
{
if (!_workflowDefinition.GetAllStepIds().Contains(stepId))
{
throw new ArgumentException($"步骤ID '{stepId}' 不存在。", nameof(stepId));
}
_currentStepId = stepId;
var step = _workflowDefinition.GetStep(stepId);
UpdateExecutionContext(step);
TrackExecutionPointer(step, stepId);
Debug.WriteLine($"[WorkflowEngine] 跳转到步骤 '{stepId}'。");
if (IsWorkflowPaused)
{
Resume();
}
}
}
// 将异步 Dispose 改为同步;在 .NET Framework 中不要在 Dispose 中使用 async void/Task 未等待
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
if (IsWorkflowRunning)
{
// 同步等待 StopAsync 完成以确保资源清理
StopAsync().GetAwaiter().GetResult();
}
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
}