EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:IEventIAsyncEventHandler<TEvent>

IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。

其中,Publish<TEvent>PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法PublishPublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle

总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:https://github.com/DonPangPang/soda-event-bus

实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。

public interface IEvent
{

}

public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{
    Task HandleAsync(IEvent @event);

    void HandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。

public interface IEventBus
{
    void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
    Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;

    void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。

LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。

public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{
    void Publish(TEvent @event);
    Task PublishAsync(TEvent @event) ;
    
    void AutoHandle();
}

public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>
    where TEvent: IEvent
{
    readonly IServiceProvider _servicesProvider = serviceProvider;

    private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();

    public void Publish(TEvent @event)
    {
        Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");
        _eventChannel.Writer.WriteAsync(@event);
    }

    private CancellationTokenSource Cts { get; } = new();

    public void Cancel()
    {
        Cts.Cancel();
    }
    
    public async Task PublishAsync(TEvent @event)
    {
        await _eventChannel.Writer.WriteAsync(@event);
    }

    public void AutoHandle()
    {
        // 确保只启动一次
        if (!Cts.IsCancellationRequested) return;

        Task.Run(async () =>
        {
            while (!Cts.IsCancellationRequested)
            {
                var reader = await _eventChannel.Reader.ReadAsync();
                await HandleAsync(reader);
            }
        }, Cts.Token);
    }

    async Task HandleAsync(TEvent @event)
    {
        var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();

        if (handler is null)
        {
            throw new NullReferenceException($"No handler for event {@event.GetType().Name}");
        }
        try
        {
            await handler.HandleAsync(@event);
        }
        catch (Exception ex)
        {
            handler.HandleException( @event, ex);
        }
    }
}
LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{
    private readonly IServiceProvider _serviceProvider = serviceProvider;

    private class ChannelKey
    {
        public required string Key { get; init; }
        public int Subscribers { get; set; }

        public override bool Equals(object? obj)
        {
            if (obj is ChannelKey key)
            {
                return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
            }

            return false;
        }

        public override int GetHashCode()
        {
            return 0;
        }
    }

    private Channel<IEvent> Rent(string channel)
    {
        _channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);

        if (value != null) return value;
        value = Channel.CreateUnbounded<IEvent>();
        _channels.TryAdd(new ChannelKey() { Key = channel }, value);
        return value;
    }

    private Channel<IEvent> Rent(ChannelKey channelKey)
    {
        _channels.TryGetValue(channelKey, out var value);
        if (value != null) return value;
        value = Channel.CreateUnbounded<IEvent>();
        _channels.TryAdd(channelKey, value);
        return value;
    }

    private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();

    private CancellationTokenSource Cts { get; } = new();

    public void Cancel()
    {
        Cts.Cancel();
        _channels.Clear();
        Cts.TryReset();
    }

    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
    }

    public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
    {
        Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
    }

    public void OnSubscribe<TEvent>() where TEvent : IEvent
    {
        var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
                         new ChannelKey() { Key = typeof(TEvent).Name };
        channelKey.Subscribers++;

        Task.Run(async () =>
        {
            try
            {
                while (!Cts.IsCancellationRequested)
                {
                    var @event = await ReadAsync(channelKey);

                    var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
                    if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
                    try
                    {
                        await handler.HandleAsync((TEvent)@event);
                    }
                    catch (Exception ex)
                    {
                        handler.HandleException((TEvent)@event, ex);
                    }
                }
            }
            catch (Exception e)
            {
                throw new InvalidOperationException("Error on onSubscribe handler", e);
            }
        }, Cts.Token);
    }

    private async Task<IEvent> ReadAsync(string channel)
    {
        return await Rent(channel).Reader.ReadAsync(Cts.Token);
    }

    private async Task<IEvent> ReadAsync(ChannelKey channel)
    {
        return await Rent(channel).Reader.ReadAsync(Cts.Token);
    }
}
LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ILocalEventBus: IEventBus
{

}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{
    private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();
    
    
    public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
    {
        if (options.Pool)
        {
            Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
            EventBusPool.Publish(@event);
        }
        else
        {
            var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            manager.Publish(@event);
        }
    }

    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        if (options.Pool)
        {
            Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
            await EventBusPool.PublishAsync(@event);
        }
        else
        {
            var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            await manager.PublishAsync(@event);
        }
    }

    public void OnSubscribe<TEvent>() where TEvent : IEvent
    {
        if (options.Pool)
        {
            Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
            EventBusPool.OnSubscribe<TEvent>();
        }
        else
        {
            var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            manager.AutoHandle();
        }
    }
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

相关文章

C# this关键字的作用

关键字在C#中主要用于引用当前对象,区分字段与局部变量,调用其他构造函数以及传递当前对象给其他方法或构造函数。

C# winfrom中excel文件导入导出

在C#交流群里,看到很多小伙伴在excel数据导入导出到C#界面上存在疑惑,所以今天专门做了这个主题,希望大家有所收获!环境:win10+vs2017界面:主要以演示为主,所以没有做优化,然后主界面上添加两个按钮,分别命名为ExportExcel和ImportExcel,添加两个dataGridView,分别是dataGridView1和dataGridView2然后在窗体加载程序中给dataGr...

C#中的浅度和深度复制(C#如何复制一个对象)

接着,我们修改了复制得到的对象及其引用类型字段的属性值,最后输出原始对象和复制对象的属性值。这意味着如果一个类包含引用类型成员,在执行深度复制时,不仅复制这些引用,还会递归地复制引用所指向的对象,直到所有的引用都指向全新的对象实例。当进行浅复制时,系统会创建一个新的对象实例,但这个新对象的字段将与原始对象中的值类型字段具有相同的值,而对于引用类型字段,则仅仅是复制了。也就是说,如果一个类中有引用类型的成员变量(比如数组、其他自定义类的对象等),那么浅复制后,新对象和原对象的这些引用类型成员仍然指向。

C# 实现微信自定义分享

在实际的应用中,我们可能不是简单的将该网页的链接直接分享出去,而是生成符合实际需要的URL,微信称其为自定义分享。

C# 常用排序算法(冒泡排序 插入排序 选择排序 快速排序 归并排序 堆排序)

建堆阶段将无序列表转换为堆,排序阶段将堆的根节点依次取出,并调整堆,完成排序。它使用分治法的思想,通过选择一个基准元素,将列表分成两个子列表,并对每个子列表递归地进行排序。它重复地遍历要排序的列表,比较相邻的两个元素,并交换它们的位置,直到列表排序完成为止。每次遍历都会将最大的元素移动到列表的末尾。每次选择未排序部分的最小元素,并将其放到已排序部分的末尾,逐步构建有序序列。它将列表分成较小的子列表,对每个子列表进行排序,然后再将子列表合并成较大的有序列表,直到整个列表排序完成。

.Net Core Policy 基于策略授权

在ASP.NET Core中,重新设计了一种更加灵活的授权方式:基于策略的授权, 它是授权的核心.在使用基于策略的授权时,首先要定义授权策略,而授权策略本质上就是对Claims的一系列断言。基于角色的授权和基于Scheme的授权,只是一种语法上的便捷,最终都会生成授权策略。除了OperationAuthorizationRequirement外,都有对应的快捷添加方法,比如RequireClaim,RequireRole,RequireUserName等。新建类 PermissionHandler。

【.NET Core】深入理解任务并行库 (TPL)

是和空间中的一组公共类型和API。TPL的目的是通过简化将并行和并发添加到应用程序的过程来提高开发人员的工作效率。TPL动态缩放并发的程度以最有效地使用所有可用的处理器。此外,TPL还处理工作分区,ThreadPool上的线程调度、取消支持、状态管理以及其他低级别的细节操作。通过使用TPL,你可以在将精力集中于程序要完成的工作,同时最大程度地提高代码的性能。在.NET Framework4中,首选TPL编写多线程代码和并行代码。但是,并不是所有代码都适合并行化。

FluentValidation在C# WPF中的应用

1. 引言在.NET开发领域,FluentValidation以其优雅、易扩展的特性成为开发者进行属性验证的首选工具。它不仅适用于Web开发,如MVC、Web API和ASP.NET CORE,同样也能完美集成在WPF应用程序中,提供强大的数据验证功能。本文将深入探讨如何在C# WPF项目中运用FluentValidation进行属性验证,并展示如何通过MVVM模式实现这一功能。2. 功能概览我们的目标是构建一个WPF应用程序,它能够通过FluentValidation实现以下验证功能:验证Vie

【.NET Core】Lazy<T> 实现延迟加载详解

延迟初始化是一种将对象的创建延迟到第一次需要用时的技术。简而言之,就是对象的初始化发生在第一次需要调用的时候执行。通常所说的延迟初始化和延迟实例化的意思是相同。通过使用延迟基础,可以避免应用程序不必要的计算和内存消耗。从.NET 4.0开始,可以使用Lazy来实现对象的延迟初始化,从而优化系统的性能。延迟初始化就是将对象的初始化延迟到第一次使用该对象时。延迟初始化是我们优化程序性能的一种方式。如创建一个对象时需要花费很大的开销,而这一对象在系统运行过程中不一定会用到。

各版本 操作系统 对 .NET Framework 与 .NET Core 支持

有两种类型的受支持版本:长期支持 (LTS) 版本和标准期限支持 (STS) 版本。所有版本的质量都是一样的。唯一的区别是支持的时间长短。LTS 版本可获得为期三年的免费支持和补丁。STS 版本可获得 18 个月的免费支持和修补程序。有关详细信息,请参阅。从上图中我们可以看出,.Net5及以下版本已经不再受到官方支持;而.Net7看起来也是过渡版本,支持时间较短,本文从 .Net Core 3.1 开始介绍支持的系统,可能不是很全面,仅供参考。

【C#】.net core 6.0 依赖注入生命周期

对于.net core而言,依赖注入生命周期有三种瞬态(Transient)、作用域(Scoped)和单例(Singleton),无论使用哪种生命周期,都需要确保对象的线程安全性,并正确地处理依赖关系。

C#动态生成带参数的小程序二维码

在微信小程序管理后台,我们可以生成下载标准的小程序二维码,提供主程序入口功能。在实际应用开发中,小程序二维码是可以携带参数的,可以动态进行生成
返回
顶部