112 lines
4.5 KiB
C#
112 lines
4.5 KiB
C#
using System;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace MW.WorkFlow
|
||
{
|
||
/// <summary>
|
||
/// 一个异步事件同步原语,类似于 ManualResetEvent,但能够传递数据。
|
||
/// 可以等待事件发生并获取数据,也可以将事件设置为已发生状态并附带数据。
|
||
/// 默认情况下,事件在 Set 后保持已设置状态,直到 Reset 被调用。
|
||
/// </summary>
|
||
/// <typeparam name="T">事件发生时要传递的数据类型。</typeparam>
|
||
public class AsyncEvent<T>
|
||
{
|
||
// 使用 volatile 确保 _tcs 的读写操作对所有线程可见
|
||
private volatile TaskCompletionSource<T> _tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||
private readonly object _lock = new object(); // 用于保护 _tcs 的替换操作
|
||
|
||
/// <summary>
|
||
/// 获取一个值,指示事件是否已设置。
|
||
/// </summary>
|
||
public bool IsSet => _tcs.Task.IsCompleted;
|
||
|
||
/// <summary>
|
||
/// 等待事件被设置为已发生状态,并获取传递的数据。
|
||
/// 如果事件已经设置,则立即返回数据。
|
||
/// </summary>
|
||
/// <param name="cancellationToken">用于取消等待的令牌。</param>
|
||
/// <returns>事件发生时传递的数据。</returns>
|
||
/// <exception cref="OperationCanceledException">如果在等待过程中取消令牌被请求。</exception>
|
||
public async Task<T> WaitAsync(CancellationToken cancellationToken = default)
|
||
{
|
||
Task<T> currentTask;
|
||
lock (_lock)
|
||
{
|
||
currentTask = _tcs.Task;
|
||
}
|
||
|
||
// 如果已经设置,直接返回结果
|
||
if (currentTask.IsCompleted)
|
||
{
|
||
return await currentTask;
|
||
}
|
||
|
||
// 注册取消回调,确保在取消时 tcs 被取消
|
||
CancellationTokenRegistration? registration = null;
|
||
if (cancellationToken.CanBeCanceled)
|
||
{
|
||
// 注意:这里需要确保在 TCS 完成或取消后,回调能被注销,避免资源泄露。
|
||
// 更好的做法是使用 CancellationToken.Register(Action) 返回的 IDisposable
|
||
// 但考虑到 TaskCompletionSource 的生命周期,通常当 Task 完成或取消时,
|
||
// 注册的回调也会在适当时候被清理。
|
||
registration = cancellationToken.Register(() =>
|
||
{
|
||
// 使用 TrySetCanceled 避免在 Task 已经完成时抛出异常
|
||
_tcs.TrySetCanceled(cancellationToken);
|
||
});
|
||
}
|
||
|
||
try
|
||
{
|
||
return await currentTask;
|
||
}
|
||
finally
|
||
{
|
||
// 确保在等待结束后注销取消回调
|
||
registration?.Dispose();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 将事件设置为已发生状态,并传递指定的数据。
|
||
/// 如果事件已经设置,此调用将被忽略。
|
||
/// </summary>
|
||
/// <param name="data">要传递的数据。</param>
|
||
/// <returns>如果事件成功设置为已发生状态,则为 true;否则为 false。</returns>
|
||
public bool Set(T data)
|
||
{
|
||
// TrySetResult 是线程安全的,如果已经设置则返回 false
|
||
// RunContinuationsAsynchronously 选项确保回调在线程池上运行,不会阻塞Set的调用者
|
||
bool result = _tcs.TrySetResult(data);
|
||
if (result)
|
||
{
|
||
Console.WriteLine($"[AsyncEvent] Event set with data: {data}");
|
||
}
|
||
return result;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 将事件重置为未设置状态。
|
||
/// 允许事件再次被等待。
|
||
/// </summary>
|
||
public void Reset()
|
||
{
|
||
lock (_lock)
|
||
{
|
||
// 如果当前 TaskCompletionSource 尚未完成,则不应该替换它
|
||
// 因为这可能导致正在等待的 Task 永远不会完成
|
||
if (_tcs.Task.IsCompleted)
|
||
{
|
||
_tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||
Console.WriteLine("[AsyncEvent] Event reset.");
|
||
}
|
||
else
|
||
{
|
||
Console.WriteLine("[AsyncEvent] Event is not yet completed, cannot reset.");
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|