using MW.WorkFlow;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace MW.WorkFlow
{
///
/// 负责执行工作流活动,并支持暂停、恢复、停止、重启和跳转,基于工作流定义和取消功能。
///
public class WorkflowEngine : IDisposable
{
private const string WorkflowNameContextKey = "WorkflowName";
private const string ParentActivityNameContextKey = "ParentActivityName";
private const string CurrentStepIdContextKey = "CurrentStepId";
private const string NextStepIdContextKey = "NextStepId";
private const string WorkflowRuntimeTrackerContextKey = "WorkflowRuntimeTracker";
private const string WorkflowFailureMessageContextKey = "WorkflowFailureMessage";
private const string WorkflowFailureMessageKeyContextKey = "WorkflowFailureMessageKey";
private const string WorkflowFailureMessageArgumentsContextKey = "WorkflowFailureMessageArguments";
private readonly WorkflowDefinition _workflowDefinition;
private readonly WorkflowContext _context;
private string _currentStepId;
private volatile bool _isPausedRequested; // 外部请求暂停的标志
private TaskCompletionSource _currentPauseSignal; // 当前的暂停信号
private CancellationTokenSource _workflowCancellationTokenSource;
private Task _workflowExecutionTask;
private bool disposedValue;
private readonly object _stateLock = new object(); // 用于保护共享状态的锁
// 当前工作流的状态
public WorkflowState CurrentState { get; private set; } = WorkflowState.Created;
public bool IsWorkflowPaused => _isPausedRequested;
public bool IsWorkflowFinished => _currentStepId == null;
public bool IsWorkflowRunning => _workflowExecutionTask != null && !_workflowExecutionTask.IsCompleted;
public string CurrentStepId => _currentStepId;
public Exception LastException { get; private set; }
public WorkflowEngine(WorkflowDefinition workflowDefinition, WorkflowContext context)
{
_workflowDefinition = workflowDefinition ?? throw new ArgumentNullException(nameof(workflowDefinition));
_context = context ?? throw new ArgumentNullException(nameof(context));
_currentStepId = workflowDefinition.InitialStepId;
_isPausedRequested = false;
_currentPauseSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_currentPauseSignal.SetResult(true); // 初始时已经完成
}
// 启动工作流的执行,可选参数允许指定起始步骤
public void Start(string startStepId = null)
{
lock (_stateLock)
{
if (IsWorkflowRunning)
{
Debug.WriteLine("[WorkflowEngine] 工作流已经在运行中,无法重复启动。");
return;
}
// 核心修改逻辑:处理起始步骤
if (!string.IsNullOrEmpty(startStepId))
{
// 验证步骤是否存在
if (!_workflowDefinition.ContainsStep(startStepId))
{
throw new ArgumentException($"指定的起始步骤ID '{startStepId}' 不存在于工作流定义中。", nameof(startStepId));
}
_currentStepId = startStepId;
}
else if (_currentStepId == null)
{
// 如果未指定且当前没有步骤(例如上次运行已结束),重置为初始步骤
_currentStepId = _workflowDefinition.InitialStepId;
}
LastException = null;
UpdateWorkflowState(WorkflowState.Running);
_isPausedRequested = false;
_workflowCancellationTokenSource?.Dispose();
_workflowCancellationTokenSource = new CancellationTokenSource();
// 确保 _currentPauseSignal 处于已完成状态,以便工作流立即开始
if (!_currentPauseSignal.Task.IsCompleted)
{
_currentPauseSignal.TrySetResult(true);
}
// 使用 Task.Run(ExecuteWorkflowInternalAsync) 避免嵌套 Task
_workflowExecutionTask = Task.Run(ExecuteWorkflowInternalAsync);
// 注册仅在发生未处理异常时的续体以记录异常,避免未观察异常
_workflowExecutionTask.ContinueWith(t =>
{
Debug.WriteLine($"[WorkflowEngine] 工作流任务发生未处理异常: {t.Exception?.Flatten().InnerException}");
}, TaskContinuationOptions.OnlyOnFaulted);
Debug.WriteLine($"[WorkflowEngine] 工作流已启动,从步骤 '{_currentStepId}' 开始。");
}
}
// 异步执行工作流中的活动
private async Task ExecuteWorkflowInternalAsync()
{
Debug.WriteLine("\n[WorkflowEngine] 工作流执行开始。");
CancellationToken cancellationToken = _workflowCancellationTokenSource.Token;
Func pauseCheck = async () =>
{
cancellationToken.ThrowIfCancellationRequested();
if (_isPausedRequested)
{
Debug.WriteLine("[WorkflowEngine] 工作流已暂停,等待恢复...");
var pauseTask = _currentPauseSignal.Task;
var cancelTask = Task.Delay(Timeout.Infinite, cancellationToken);
var finished = await Task.WhenAny(pauseTask, cancelTask).ConfigureAwait(false);
if (finished == cancelTask)
{
// 如果取消任务先完成,则抛出取消
cancellationToken.ThrowIfCancellationRequested();
}
// 等待 pauseTask 以传播其可能的异常/取消
await pauseTask.ConfigureAwait(false);
Debug.WriteLine("[WorkflowEngine] 工作流已恢复。");
}
};
ActivityControl activityControl = new ActivityControl(cancellationToken, pauseCheck);
WorkflowStep currentStep = null;
try
{
while (_currentStepId != null)
{
cancellationToken.ThrowIfCancellationRequested();
await pauseCheck().ConfigureAwait(false);
try
{
currentStep = _workflowDefinition.GetStep(_currentStepId);
}
catch (ArgumentException ex)
{
Debug.WriteLine($"[WorkflowEngine] 错误:无法找到步骤ID '{_currentStepId}'。工作流终止。原因: {ex.Message}");
HandleWorkflowFault(ex, currentStep);
_currentStepId = null;
break;
}
IActivity currentActivity = currentStep.Activity;
_context.SetData(WorkflowFailureMessageContextKey, string.Empty);
_context.SetData