模块封装
shenyu-disruptor
定义了DisruptorProvider
、DisruptorProviderManage
、DataEvent
、QueueConsumerFactory
、DisrutporThreadFactory
等一系列通用接口
该模块的搭建了一个disruptor的初始化框架,
DisruptorProviderManage
提供Disruptor的初始化,可以在初始化是自定义参数,而初始化参数中,包含消费者工厂,初始化会将消费者工厂放置到QueueConsumer
的成员变量当中,有QueueConsumer
进行消息的侦听,一旦有消息,则由消费者工厂QueueConsumerFactory
创建QueueConsumerExecutor
进行消息的处理,QueueConsumerExecutor
可以拿到消息,是具体的操作。而在DisruptorProviderManage
对象中,成员变量provide是此次初始化的disruptor的生产者,由此provider进行消息的发布。
所以,这个模块是对disruptor的通用封装,可以使用任何类型的数据,外界使用该模块需要进行的操作是,继承QueueConsumerExecutor
其executor方法用来写具体的逻辑操作,实现QueueConsumerFactory
接口,用来创建自己的实现的QueueConsumerExecutor
,将工厂类用做DisruptorProviderManage
的构造参数,获得对象,之后调用DisruptorProviderManage
对象的start方法进行disruptor的初始化,disruptor便启动了,启动之后,就可以正常使用disruptor了,之后发布消息,则使用DisruptorProviderManage
对象获取provider,进行消息的发布和disruptor的关闭。
项目启动
RegisterClientServerDisruptorPublisher#start
,启动DisruptorProviderManage
public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));factory.addSubscribers(new ApiDocExecutorSubscriber(shenyuClientRegisterService));providerManage = new DisruptorProviderManage<>(factory);providerManage.startup();}
DisruptorProviderManage#startup(boolean)
,初始化Disruptor
配置。
public void startup(final boolean isOrderly) {OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());int newConsumerSize = this.consumerSize;EventFactory<DataEvent<T>> eventFactory;if (isOrderly) {newConsumerSize = 1;eventFactory = new OrderlyDisruptorEventFactory<>();} else {eventFactory = new DisruptorEventFactory<>();}Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,size,DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),ProducerType.MULTI,new BlockingWaitStrategy());@SuppressWarnings("all")QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];for (int i = 0; i < newConsumerSize; i++) {consumers[i] = new QueueConsumer<>(executor, consumerFactory);}disruptor.handleEventsWithWorkerPool(consumers);disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());disruptor.start();RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);}
发布事件
ShenyuClientRegisterEventPublisher#publishEvent
,发布事件
public void publishEvent(final DataTypeParent data) {DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();provider.onData(data);}
DisruptorProvider#onData
,调用ringBuffer
处理数据
public void onData(final T data) {if (isOrderly) {throw new IllegalArgumentException("The current provider is of orderly type. Please use onOrderlyData() method.");}try {ringBuffer.publishEvent(translatorOneArg, data);} catch (Exception ex) {logger.error("ex", ex);}}
QueueConsumer#onEvent
,处理数据
@Overridepublic void onEvent(final DataEvent<T> t) {if (t != null) {ThreadPoolExecutor executor = orderly(t);QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();queueConsumerExecutor.setData(t.getData());// help gct.setData(null);executor.execute(queueConsumerExecutor);}}
创建QueueConsumerExecutor
,获取所有的getSubscribers
,进行分组。
@Overridepublic QueueConsumerExecutor<Collection<DataTypeParent>> create() {Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> maps = getSubscribers().stream().map(e -> (ExecutorTypeSubscriber<DataTypeParent>) e).collect(Collectors.toMap(ExecutorTypeSubscriber::getType, Function.identity()));return new RegisterServerConsumerExecutor(maps);}
处理事件
RegisterServerConsumerExecutor#run
,线程执行,获取对应的ExecutorSubscriber
,调用executor
@Overridepublic void run() {Collection<DataTypeParent> results = getData().stream().filter(this::isValidData).collect(Collectors.toList());if (CollectionUtils.isEmpty(results)) {return;}selectExecutor(results).executor(results);}private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {final Optional<DataTypeParent> first = list.stream().findFirst();return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());}
相关博客
-
【开源项目】Disruptor框架介绍及快速入门
-
【源码解析】Disruptor框架的源码解析