文章目录
- spring发布订阅示例
- 同步核心源码分析
- 如何配置异步
- 事务问题
观察者模式又称为发布订阅模式,定义为:对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖它的对象都得到通知并被自动更新。
如下图所示,一个场景中,主要流程是用户下单,并且返回操作,但是下单之后,要做很多与主干业务无关的流程,如发送短信、发送邮件、积分增加等等。
这种模式想必大家多多少少都了解过,但是大家讨论最多的就是通过使用mq中间件实现发布订阅模式,但是如果没有中间件呢?本文重点阐述,在没有中间件的情况下,如何做到使用观察者模式解耦。
spring发布订阅示例
spring和guava都实现了比较优雅的发布订阅的框架,但是默认开发都会在spring的框架下开发。因此通过spring来实现发布订阅会更加方便一点。
主要业务流程代码,用户完成下单并且返回,并且发布订单下单事件。
@Autowiredprivate ApplicationContext applicationContext;public String saveOrder(Order order){this.orderMapper.save(order);//创建下单事件this.applicationContext.publishEvent(new SaveOrderEvent(order));return "下单成功";}
创建下单事件类,描述该事件信息和当事件发生时可以添加一些通用代码。
// 下单事件
public class SaveOrderEvent extends ApplicationEvent {public SaveOrderEvent(Object source) {//编写事件后的代码super(source);}
}
事件监听,这里的写法示例两种(我也没研究过其他的写法)。一种是作用在类上,一个类一个监听事件,另一种可以总用在方法上,通过@EventListener
来实现监听。
@Component
public class SaveOrderEventListener implements ApplicationListener<SaveOrderEvent> {@Overridepublic void onApplicationEvent(SaveOrderEvent event) {try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("线程name:"+Thread.currentThread().getName()+"类打印信息"+event);}
}
@Service
public class OrderService{@EventListener(SaveOrderEvent.class)public void methodTest(SaveOrderEvent event){System.out.println("线程name:"+Thread.currentThread().getName()+"方法中打印"+event);}
}
执行代码后,打印日志如下:
线程name:http-nio-6902-exec-3方法中打印com.example.demo.task.SaveOrderEvent[source=Order{orderId=‘12312312’}]
线程name:http-nio-6902-exec-3类打印信息com.example.demo.task.SaveOrderEvent[source=Order{orderId=‘12312312’}]
同步核心源码分析
我原本以为spring的发布订阅模式是异步的,这可能是我用mq中间件用的太多了缘故吧。实际上,默认情况下spring的发布订阅是同步的(可以通过配置实现异步)。为了更进一步测试他是异步还是同步, 我在一个订阅事件中设置了休眠,然后就因为这个休眠,影响了主流程的响应时间。为此我还去研究了它的源码。最终找到核心源码为下org.springframework.context.event.SimpleApplicationEventMulticaster#multicastEvent
:
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {//如果有配置执行器,则为异步,如果没有配置,则为同步。Executor executor = getTaskExecutor();if (executor != null) {executor.execute(() -> invokeListener(listener, event));}else {invokeListener(listener, event);}}}
如何配置异步
我在寻找了很多资料,大部分博客是使用@Async注解实现,其实仔细观察其源码可发现,官方这种写法,肯定是可以通过配置实现异步。因此,我在此示例两种写法。
方法一:
代码示例如下,自定义一个线程池,并且把线程池设置上即可。个人认为是相对符合官方的配置思路。
@Bean("taskExecutor")public Executor getExecutor() {ThreadPoolExecutor executor = new ThreadPoolExecutor(10,20,60,TimeUnit.SECONDS,new ArrayBlockingQueue(10000));return executor;}@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)public ApplicationEventMulticaster initEventMulticaster(@Qualifier("taskExecutor") Executor taskExecutor) {SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor);return simpleApplicationEventMulticaster;}
方法2:
通过@Async
注解实现,在使用@Async
的时候一般都会自定义线程池,因为@Async
的默认线程池为SimpleAsyncTaskExecutor
,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
在Application
上添加@EnableAsync
注解。然后在方法上添加注解即可@Async
即可
@Component
public class SaveOrderEventListener implements ApplicationListener<SaveOrderEvent> {@Override@Async("taskExecutor")public void onApplicationEvent(SaveOrderEvent event) {try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("线程name:"+Thread.currentThread().getName()+"类打印信息"+event);}
}
事务问题
异步、解耦和事务在一起本身是一个伪命题,至少我并为发现一套完美的异步事务解决方案。但是,为了追求数据一致性,开发者也是在一直努力着。如果要是想使用spring的异步发布订阅的时候实现数据一致性。也许可以尝试下@TransactionalEventListener
。从命名上来看,即可得出,他是一个事件监听加上了事务的扩展。只不过加入了回调
的方式来解决,这样就能够在事务进行Commited
,Rollback
…等的时候才会去进行
Event的处理,达到事务同步的目的。
//配置监听,事务commit之后执行。@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void methodTest(SaveOrderEvent event){System.out.println("线程name:"+Thread.currentThread().getName()+"方法中打印"+event);}
如上,配置了事务执行之后执行。并且它还有很多其他的监听方式,如:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION。这种的相对于@EventListener
功能更加多了一些。毕竟在真实的场景中,经常是有事务存在的,并且为了减小事务的执行时间,要求第三方的接口调用不在事务中执行。