【开源项目】ShenYu网关中Disruptor的使用

news/2024/11/23 0:54:15/

模块封装

shenyu-disruptor定义了DisruptorProviderDisruptorProviderManageDataEventQueueConsumerFactoryDisrutporThreadFactory等一系列通用接口
该模块的搭建了一个disruptor的初始化框架,
DisruptorProviderManage提供Disruptor的初始化,可以在初始化是自定义参数,而初始化参数中,包含消费者工厂,初始化会将消费者工厂放置到QueueConsumer的成员变量当中,有QueueConsumer进行消息的侦听,一旦有消息,则由消费者工厂QueueConsumerFactory创建QueueConsumerExecutor进行消息的处理,QueueConsumerExecutor可以拿到消息,是具体的操作。而在DisruptorProviderManage对象中,成员变量provide是此次初始化的disruptor的生产者,由此provider进行消息的发布。
所以,这个模块是对disruptor的通用封装,可以使用任何类型的数据,外界使用该模块需要进行的操作是,继承QueueConsumerExecutor其executor方法用来写具体的逻辑操作,实现QueueConsumerFactory接口,用来创建自己的实现的QueueConsumerExecutor,将工厂类用做DisruptorProviderManage的构造参数,获得对象,之后调用DisruptorProviderManage对象的start方法进行disruptor的初始化,disruptor便启动了,启动之后,就可以正常使用disruptor了,之后发布消息,则使用DisruptorProviderManage对象获取provider,进行消息的发布和disruptor的关闭。

项目启动

RegisterClientServerDisruptorPublisher#start,启动DisruptorProviderManage

    public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));factory.addSubscribers(new ApiDocExecutorSubscriber(shenyuClientRegisterService));providerManage = new DisruptorProviderManage<>(factory);providerManage.startup();}

DisruptorProviderManage#startup(boolean),初始化Disruptor配置。

    public void startup(final boolean isOrderly) {OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());int newConsumerSize = this.consumerSize;EventFactory<DataEvent<T>> eventFactory;if (isOrderly) {newConsumerSize = 1;eventFactory = new OrderlyDisruptorEventFactory<>();} else {eventFactory = new DisruptorEventFactory<>();}Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,size,DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),ProducerType.MULTI,new BlockingWaitStrategy());@SuppressWarnings("all")QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];for (int i = 0; i < newConsumerSize; i++) {consumers[i] = new QueueConsumer<>(executor, consumerFactory);}disruptor.handleEventsWithWorkerPool(consumers);disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());disruptor.start();RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);}

发布事件

ShenyuClientRegisterEventPublisher#publishEvent,发布事件

    public void publishEvent(final DataTypeParent data) {DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();provider.onData(data);}

DisruptorProvider#onData,调用ringBuffer处理数据

    public void onData(final T data) {if (isOrderly) {throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");}try {ringBuffer.publishEvent(translatorOneArg, data);} catch (Exception ex) {logger.error("ex", ex);}}

QueueConsumer#onEvent,处理数据

    @Overridepublic void onEvent(final DataEvent<T> t) {if (t != null) {ThreadPoolExecutor executor = orderly(t);QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();queueConsumerExecutor.setData(t.getData());// help gct.setData(null);executor.execute(queueConsumerExecutor);}}

创建QueueConsumerExecutor,获取所有的getSubscribers,进行分组。

        @Overridepublic QueueConsumerExecutor<Collection<DataTypeParent>> create() {Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> maps = getSubscribers().stream().map(e -> (ExecutorTypeSubscriber<DataTypeParent>) e).collect(Collectors.toMap(ExecutorTypeSubscriber::getType, Function.identity()));return new RegisterServerConsumerExecutor(maps);}

处理事件

RegisterServerConsumerExecutor#run,线程执行,获取对应的ExecutorSubscriber,调用executor

    @Overridepublic void run() {Collection<DataTypeParent> results = getData().stream().filter(this::isValidData).collect(Collectors.toList());if (CollectionUtils.isEmpty(results)) {return;}selectExecutor(results).executor(results);}private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {final Optional<DataTypeParent> first = list.stream().findFirst();return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());}

相关博客

  • 【开源项目】Disruptor框架介绍及快速入门

  • 【源码解析】Disruptor框架的源码解析

在这里插入图片描述


http://www.ppmy.cn/news/82971.html

相关文章

【科普】电压和接地真的存在吗?如何测试?

经常在实验室干活的&#xff0c;难免不被电击过&#xff0c;尤其是在干燥的北方&#xff0c;“被电”是常有的事情&#xff0c;我记得有一次拿着射频线往仪表上拧的时候&#xff0c;遇到过一次严重的电火花&#xff0c;瞬间将仪表的射频口边缘烧出了一个疤&#xff0c;实验室遭…

学习如何在VS Code中创建一个Golang/Go项目,并运行一个简单的Golang程序

学习目标: 学习如何在VS Code中创建一个Golang项目,并运行一个简单的Golang程序。 学习内容: 在VS Code 手动输入命令创建一个Golang项目在VS Code 不输入命令创建一个Golang项目1. 在VS Code 手动输入命令创建一个Golang项目 步骤1:在VS Code中创建一个新文件夹,用于存放…

总结858

一周小结&#xff1a; 英语主要背了国王的演讲&#xff0c;林肯的演讲&#xff0c;读一篇六级文章&#xff0c;这句子不长也不短&#xff0c;适合练语感&#xff0c;不要求完全背诵 高等数学&#xff1a;结束多元微分专题&#xff0c;然后是二重积分&#xff0c;明天能结束该…

星戈瑞Sulfo-CY7 NHS ester标记带氨基的荧光Cyanine7-NHS

Sulfo-CY7 NHS ester是一种常用的蛋白质标记试剂&#xff0c;是一种高度稳定的荧光染料&#xff0c;能够被用于各种细胞成像实验中。利用这种染料&#xff0c;科学家们可以标记细胞膜、细胞核、细胞器等&#xff0c;从而观察细胞的各种结构和功能。 产品名称&#xff1a;水溶性…

【ArcGIS Pro二次开发】(32):选择集(SelectionSet)

在ArcGIS Pro SDK中&#xff0c;SelectionSet类用于处理地图或图层中的【选定要素集合】。 一般情况下&#xff0c;当你使用【选择】工具进行要素的选择时&#xff0c;就会得到【Selection】的内容。 SelectionSet类提供了许多成员、属性和方法&#xff0c;用于管理和操作选择…

Ibatis与Hibernate的区别

Ibatis与Hibernate的区别 一、 hibernate与ibatis之间的比较&#xff1a; hibernate 是当前最流行的o/r mapping框架&#xff0c;它出身于sf.NET&#xff0c;现在已经成为jboss的一部分了。 ibatis 是另外一种优秀的o/r mapping框架&#xff0c;目前属于apache的一个子项…

网络编程--IOCP完成端口

写在前面 基于重叠IO模型实现回声服务器一文中基于重叠IO模型实现了一个简单的回声服务器&#xff0c;这里再回看&#xff0c;可以发送有一些明显的缺点&#xff1a;在循环中重复调用非阻塞模式的accept和进入alertable wait&#xff08;可警告等待状态&#xff09;的SleepEx函…

介绍几种常见的运维发布策略

随着Devops的发展&#xff0c;为了提高运维发布的成功率&#xff0c;探索出了多种发布策略。本文简单介绍几种常见发布策略, 以及它们适用的场景和优缺点。 第一种&#xff0c;停机发布 这是最早的一种发布策略&#xff0c;停机发布会在发布以前关闭服务&#xff0c;停止用户…