微服务设计模式 - 事件溯源模式(Event Sourcing Pattern)

news/2024/11/14 2:39:55/

微服务设计模式 - 事件溯源模式(Event Sourcing Pattern)

event-sourcing-pattern

定义

事件溯源(Event Sourcing)是一种将所有状态更改保存为一系列事件的设计模式。每次系统状态发生变化时,都会生成一个事件,这些事件在事件存储库(Event Store)中按照时间顺序保存。通过重放这些事件,可以重建对象的当前状态。

结构

事件溯源模式的关键组件包括:

  • 命令(Command): 修改系统状态的操作。
  • 事件(Event): 描述系统状态变化的记录。
  • 事件存储库(Event Store): 存储所有事件的数据库。
  • 查询(Query): 获取系统或对象当前状态的操作,通过重放事件来构建当前状态。
命令 (Command)|v
事件 (Event)|v
事件存储库 (Event Store)|v
重放事件 (Replaying Events)|v
当前状态 (Current State)

工作方式

  1. 接收命令: 当系统接收一个命令时,比如"创建订单"。

  2. 生成事件: 根据命令生成一个事件,如"订单已创建或者订单已取消"。

  3. 存储事件: 将生成的事件存储在事件存储库中。

  4. 重放事件: 当需要查询当前状态时,通过重放所有相关事件来构建对象的当前状态。

event-sourcing

好处

  1. 可追溯性: 每个状态变化都可以通过事件追踪和重现,这对于调试、审计和恢复非常有用。
  2. 可靠性: 事件作为持久记录,可以重放以恢复任何时间点的系统状态。
  3. 解耦: 命令和查询的分离使得系统能针对不同需求进行优化(这个通常与CQRS模式结合)。

应用场景

  • 订单管理系统: 在电子商务系统中,订单的生命周期从创建到取消都可以通过事件进行跟踪。
  • 金融交易系统: 金融交易的每个步骤都可以完整记录,确保数据的准确性和可追溯性。

示例代码片段

假设我们有一个订单服务,通过事件溯源记录和重建订单状态。

// 事件类
public class OrderPlacedEvent {private String orderId;private String product;private int quantity;// getters and setters
}// 事件存储库接口
public interface EventStore {void saveEvent(Event event);List<Event> getEvents(String aggregateId);
}// 内存事件存储库的简单实现
public class InMemoryEventStore implements EventStore {private Map<String, List<Event>> store = new HashMap<>();@Overridepublic void saveEvent(Event event) {store.computeIfAbsent(event.getAggregateId(), k -> new ArrayList<>()).add(event);}@Overridepublic List<Event> getEvents(String aggregateId) {return store.getOrDefault(aggregateId, Collections.emptyList());}
}// 订单聚合根
public class Order {private String orderId;private List<Event> changes = new ArrayList<>();public void placeOrder(String productId, int quantity) {applyChange(new OrderPlacedEvent(orderId, productId, quantity));}private void applyChange(Event event) {// apply event to current statechanges.add(event);}public List<Event> getUncommittedChanges() {return changes;}
}// 订单服务
public class OrderService {private EventStore eventStore;public OrderService(EventStore eventStore) {this.eventStore = eventStore;}public void placeOrder(String orderId, String productId, int quantity) {Order order = new Order(orderId);order.placeOrder(productId, quantity);eventStore.saveEvent(new OrderPlacedEvent(orderId, productId, quantity));}public Order getOrder(String orderId) {List<Event> events = eventStore.getEvents(orderId);Order order = new Order(orderId);events.forEach(event -> order.applyChange(event));return order;}
}

Axon框架

介绍

Axon框架是一个专注于实现CQRS(Command Query Responsibility Segregation,命令查询责任分离)和事件溯源(Event Sourcing)模式的Java框架。它简化了复杂分布式系统的实现,通过结构化的方式处理事件和命令,使系统更易于扩展、维护和调试。该框架高度模块化,可以与Spring Boot无缝集成,也可以使用其他依赖注入框架或独立使用。

概念

1. 聚合(Aggregate)

在Axon中,聚合是应用程序逻辑的核心。它是处理命令和定义领域逻辑的实体或对象集合。聚合确保所有的修改操作以一致的方式应用。

2. 命令(Command)

命令是对聚合执行操作的请求。命令是指令性的,它表示希望系统发生某些变化。命令是由用户或系统其他部分生成,并由聚合处理。

3. 事件(Event)

事件是系统中已经发生的事实。它们是对命令执行结果的具体描述。事件由事件溯源仓储(Event Sourcing Repository)存储,可以用于重建聚合的状态。

4. 命令处理器(Command Handler)

命令处理器是处理命令并触发事件的组件。在Axon中,命令处理器通常是定义在聚合中的。

5. 事件处理器(Event Handler)

事件处理器用于处理产生的事件,并更新读模型或执行其他逻辑。

6. 命令网关(Command Gateway)

命令网关提供了一个简化发送命令的API,处理命令的路由、序列化和分发。

7. 事件存储(Event Store)

在事件驱动架构中,事件存储是一个核心组件,它记录所有由系统生成的事件,而不是仅仅存储应用程序的当前状态。事件存储的主要作用包括:

  1. 记录所有事件: 每次系统状态发生变化时,都会生成一个事件,这些事件被按时间顺序存储下来。事件存储使得我们能够追溯和重放这些事件,从而重建任何时间点的系统状态。
  2. 事件回放: 可以重放所有事件来重建聚合的当前状态,这在需要恢复或重建数据时非常有用。
  3. 审计和调试: 提供了系统行为的完整日志,可以用于审计和调试。

在Axon框架中,事件存储可以配置使用各种存储引擎,如关系型数据库(通过JDBC)、MongoDB、Axon Server等。Axon框架提供了灵活的方式来配置和使用事件存储。接下来,本来将在Spring Boot应用程序中配置Axon框架,并使用H2嵌入式数据库作为事件存储。

注解Annotation

以下是Axon框架中一些常用的重要注解及其用途:

1. @Aggregate

用于标记一个聚合根。这告诉Axon这是一个聚合,可以处理命令并生成事件。

@Aggregate
public class OrderAggregate {// Fields, Command Handlers, Event Sourcing Handlers
}
2. @CommandHandler

用于标记处理命令的方法或构造函数。该注解可以应用于聚合内部的方法或构造函数。

@CommandHandler
public void handle(CreateOrderCommand command) {// Command handling logic
}
3. @EventSourcingHandler

用于标记处理事件且根据事件更新聚合状态的方法。事件溯源处理器确保每个事件都能正确应用到聚合状态。

@EventSourcingHandler
public void on(OrderCreatedEvent event) {// Update aggregate state based on event
}
4. @QueryHandler

用于标记处理查询的方法。在CQRS模式下,查询处理器负责处理查询请求,并从读模型或事件溯源重建模型中获取数据。

@QueryHandler
public Order handle(OrderQuery query) {// Query handling logic to retrieve data
}
5. @Saga

Saga用于管理长时间运行的业务流程和跨多个聚合的事务。Saga负责协调和维护多个步骤之间的状态。

@Saga
public class OrderManagementSaga {// Fields, Event Handlers, Logic for managing saga state
}

在Spring Boot中的应用

Spring Boot可以通过事件溯源模式增强微服务架构的可扩展性和可靠性。我们可以使用Axon Framework,一个专门用于CQRS和事件溯源的框架。

项目结构

src
|-- main|-- java|-- com.example.eventdriven|-- aggregate|-- OrderAggregate.java|-- command|-- CreateOrderCommand.java|-- event|-- OrderCreatedEvent.java|-- OrderCancelledEvent.java|-- query|-- OrderQueryHandler.java|-- Order.java|-- OrderQuery.java|-- controller|-- OrderController.java|-- EventDrivenApplication.java|-- resources|-- application.yml

项目依赖

pom.xml

<dependencies><!-- Spring Boot dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Axon Framework dependencies --><dependency><groupId>org.axonframework</groupId><artifactId>axon-spring-boot-starter</artifactId><version>4.5.5</version></dependency><!-- Spring Data JPA for persistence --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!-- H2 Database for in-memory storage --><dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>runtime</scope></dependency>
</dependencies>

项目配置

application.yml,配置Axon框架和H2数据库。

# This configuration tells Axon to set up an event store with snapshot thresholds and default event handling processors. The storage engine (H2 database, in this case) is configured through Spring Data JPA settingsserver:port: 8080spring:datasource:url: jdbc:h2:mem:testdbdriver-class-name: org.h2.Driverusername: sapassword: passwordh2:console:enabled: truejpa:hibernate:ddl-auto: updateshow-sql: trueaxon:eventhandling:processors:default:mode: subscribingeventstore:snapshot-threshold: 100

主要源代码

1. Order Aggregate
// Define the Aggregate that will handle commands and produce events.
// File: src/main/java/com/example/eventdriven/aggregate/OrderAggregate.java
package com.example.eventdriven.aggregate;import com.example.eventdriven.command.CreateOrderCommand;
import com.example.eventdriven.event.OrderCreatedEvent;
import com.example.eventdriven.event.OrderCancelledEvent;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import static org.axonframework.modelling.command.AggregateLifecycle.apply;@Aggregate
public class OrderAggregate {private static final Logger logger = LoggerFactory.getLogger(OrderAggregate.class);@AggregateIdentifierprivate String orderId;private String product;private int quantity;private boolean active;public OrderAggregate() {// Default constructor needed by Axon}@CommandHandlerpublic OrderAggregate(CreateOrderCommand command) {// Validate command logic here, e.g., checking for null values or constraintsapply(new OrderCreatedEvent(command.getOrderId(), command.getProduct(), command.getQuantity()));}// Event Sourcing Handler for OrderCreatedEvent@EventSourcingHandlerpublic void on(OrderCreatedEvent event) {this.orderId = event.getOrderId();this.product = event.getProduct();this.quantity = event.getQuantity();this.active = true;logger.info("Order created: {}", event);}// Event Sourcing Handler for OrderCancelledEvent@EventSourcingHandlerpublic void on(OrderCancelledEvent event) {this.active = false;logger.info("Order cancelled: {}", event);}
}

在Axon框架中,apply函数是一个核心函数,用于在聚合内部记录和发布事件。通过调用apply方法,可以将一个事件记录下来,并确保相应的事件处理逻辑被触发。apply函数的主要功能包括:

  1. 触发事件处理器(Event Sourcing Handler):
    • apply方法触发标注了@EventSourcingHandler注解的方法,从而更新聚合的内部状态。
    • 这确保每个事件相关的状态变更都被正确应用到聚合中。
  2. 发布事件(Publish Event):
    • 事件会被发布到事件总线(Event Bus),从而通知所有对该事件感兴趣的组件。
    • 其他聚合、查询模型或外部系统可以订阅并响应这些事件。
2. Order Command
// Define a command class that carries the data necessary to create a new order.
// File: src/main/java/com/example/eventdriven/command/CreateOrderCommand.java
package com.example.eventdriven.command;import org.axonframework.modelling.command.TargetAggregateIdentifier;
public class CreateOrderCommand {@TargetAggregateIdentifierprivate String orderId;private String product;private int quantity;public CreateOrderCommand(String orderId, String product, int quantity) {this.orderId = orderId;this.product = product;this.quantity = quantity;}// Getters and setterspublic String getOrderId() {return orderId;}public String getProduct() {return product;}public int getQuantity() {return quantity;}
}
3. Order Events
// Define an event class that represents the creation of an order.
// File: src/main/java/com/example/eventdriven/event/OrderCreatedEvent.java
package com.example.eventdriven.event;public class OrderCreatedEvent {private String orderId;private String product;private int quantity;public OrderCreatedEvent(String orderId, String product, int quantity) {this.orderId = orderId;this.product = product;this.quantity = quantity;}// Getters and setterspublic String getOrderId() {return orderId;}public String getProduct() {return product;}public int getQuantity() {return quantity;}@Overridepublic String toString() {return "OrderCreatedEvent{" +"orderId='" + orderId + '\'' +", product='" + product + '\'' +", quantity=" + quantity +'}';}
}
// Define an event class that represents the Cancellation of an order.
// File: src/main/java/com/example/eventdriven/event/OrderCancelledEvent.java
package com.example.eventdriven.event;public class OrderCancelledEvent {private String orderId;public OrderCancelledEvent(String orderId) {this.orderId = orderId;}// Getters and setterspublic String getOrderId() {return orderId;}@Overridepublic String toString() {return "OrderCancelledEvent{" +"orderId='" + orderId + '\'' +'}';}
}
4. Order Query Handling
// Define a query handler to handle queries related to orders.
// File: src/main/java/com/example/eventdriven/query/OrderQueryHandler.java
package com.example.eventdriven.query;import com.example.eventdriven.event.OrderCreatedEvent;
import com.example.eventdriven.event.OrderCancelledEvent;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;@Component
public class OrderQueryHandler {@Autowiredprivate EventStore eventStore;@QueryHandlerpublic Order handle(OrderQuery query) {List<?> events = eventStore.readEvents(query.getOrderId()).asStream().toList();Order order = new Order();events.forEach(event -> {if (event instanceof OrderCreatedEvent) {order.apply((OrderCreatedEvent) event);} else if (event instanceof OrderCancelledEvent) {order.apply((OrderCancelledEvent) event);}// Handle other event types similarly});return order;}
}
// File: src/main/java/com/example/eventdriven/query/Order.java
package com.example.eventdriven.query;import com.example.eventdriven.event.OrderCreatedEvent;
import com.example.eventdriven.event.OrderCancelledEvent;public class Order {private String orderId;private String product;private int quantity;private boolean active;public void apply(OrderCreatedEvent event) {this.orderId = event.getOrderId();this.product = event.getProduct();this.quantity = event.getQuantity();this.active = true;}public void apply(OrderCancelledEvent event) {this.active = false;}// Getters and other methodspublic String getOrderId() {return orderId;}public String getProduct() {return product;}public int getQuantity() {return quantity;}public boolean isActive() {return active;}@Overridepublic String toString() {return "Order{" +"orderId='" + orderId + '\'' +", product='" + product + '\'' +", quantity=" + quantity +", active=" + active +'}';}
}
// File: src/main/java/com/example/eventdriven/query/OrderQuery.java
package com.example.eventdriven.query;
public class OrderQuery {private String orderId;public OrderQuery(String orderId) {this.orderId = orderId;}public String getOrderId() {return orderId;}
}
5. Order Controller
// Create a REST controller to handle HTTP requests.
// File: src/main/java/com/example/eventdriven/controller/OrderController.java
package com.example.eventdriven.controller;import com.example.eventdriven.command.CreateOrderCommand;
import com.example.eventdriven.query.OrderQuery;
import com.example.eventdriven.query.Order;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.queryhandling.QueryGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.UUID;
import java.util.concurrent.CompletableFuture;@RestController
@RequestMapping("/orders")
public class OrderController {@Autowiredprivate CommandGateway commandGateway;@Autowiredprivate QueryGateway queryGateway;@PostMappingpublic String createOrder(@RequestParam String product, @RequestParam int quantity) {String orderId = UUID.randomUUID().toString();CreateOrderCommand command = new CreateOrderCommand(orderId, product, quantity);commandGateway.sendAndWait(command);return "Order created with ID: " + orderId;}@GetMapping("/{orderId}")public CompletableFuture<Order> getOrder(@PathVariable String orderId) {return queryGateway.query(new OrderQuery(orderId), Order.class);}
}
6. Application Entry Point
// Define the main application class.
// File: src/main/java/com/example/eventdriven/EventDrivenApplication.java
package com.example.eventdriven;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class EventDrivenApplication {public static void main(String[] args) {SpringApplication.run(EventDrivenApplication.class, args);}
}

总结

cloud-native-definition-1

事件溯源模式通过将所有状态变化都记录为事件,实现了系统状态的可追溯性和可恢复性。这种模式在处理复杂状态变化和高可用性需求的系统(例如订单管理系统、金融交易系统)特别有用。在Spring Boot中,我们可以借助Axon Framework来实现事件溯源模式,从而提升微服务架构的扩展性和可靠性。


http://www.ppmy.cn/news/1546044.html

相关文章

【Rust设计模式之Fold模式】

Rust设计模式之Fold Fold &#xff08;折叠&#xff09; 如Rust Collection中的fold方法&#xff0c;是消耗迭代器适配器&#xff0c;将闭包应用于每一个元素&#xff0c;并将结果返回一样。Fold模式的中心思想也是如此&#xff0c;将元素折叠处理&#xff0c;最终计算出新的元…

关于QUERY_ALL_PACKAGES权限导致Google下架apk

谷歌商店被下架,原因是第三方使用了 QUERY_ALL_PACKAGES 权限&#xff1b; Google在高版本上限制了此权限的使用。当然&#xff0c;并不是 QUERY_ALL_PACKAGES 这个权限没有了&#xff0c;而是被列为敏感权限&#xff0c;必须有充分的理由说明&#xff0c;才允许上架 GP&#…

Bert快速入门

Python 语言 BERT 入门&#xff1a;让我们一起“吃透”BERT 1. 什么是 BERT&#xff1f; BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;是 Google 提出的预训练语言模型&#xff0c;它通过双向编码器理解文本中的上下文信息&#xf…

Hive-testbench套件使用文档

Hive-testbench套件使用文档 hive-testbench 是hortonworks的一个开源项目,用于测试和基准测试 Apache Hive 的工具集。它提供了一系列的测试数据集和查询样例,用于评估和比较 Hive 在不同配置和环境下的性能。hive-testbench 的主要目标是模拟真实的大规模数据集和复杂查询…

SpringBoot技术下的共享汽车运营平台

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理共享汽车管理系统的相关信息成为必然。开发…

定义全局键盘监听事件,el-dialog中删除不可用

场景&#xff1a;全局的div增加了鼠标监听事件&#xff0c;而且window中添加了键盘监听事件。 window.addEventListener(keydown, this.handleKeydown) window.addEventListener(keyup, this.handleKeyup) 事件冒泡&#xff0c;导致阻止无效。 1、在 el-dialog 上同时阻止默…

ONLYOFFICE 8.2深度测评:集成PDF编辑、数据可视化与AI功能的强大办公套件

本文 一、文档编辑与PDF支持主要功能概述 二、数据可视化和增强的表格工具数据可视化功能亮点 三、AI驱动的摘要功能AI摘要功能优势 四、演示文稿的增强功能主要更新 五、协同办公能力的提升协同功能更新 六、跨平台兼容与开放文档格式跨平台与兼容性 七、安全性与隐私保护安全…

Flink转换算子

Apache Flink 是一个用于处理无界和有界数据的开源流处理框架。在 Flink 中&#xff0c;转换&#xff08;Transformation&#xff09;是数据流处理的核心组件之一&#xff0c;它们定义了如何从输入数据集生成输出数据集。以下是 Flink 中一些常见的转换算子&#xff1a; Map: 将…