C# .NET 中的反应式系统

ops/2024/11/13 9:09:51/

概述:反应式系统已成为构建健壮、可扩展和响应迅速的应用程序的强大范式。这些系统被设计为更具弹性、弹性和消息驱动性,确保它们在各种条件下保持响应,包括高负载、网络延迟和故障。在本文中,我们将探讨 .NET 生态系统中反应式系统的概念,利用 (Rx.NET) 和 来说明一个复杂的用例:用于实时监控和分析股票市场数据的实时仪表板。Reactive ExtensionsActor model (Akka.NET)了解反应式系统反应式系统旨在对事件、负载、故障甚至用户做出反应。反应性宣言概述了四个关键特征:响应式:系统及时响应。弹性:系统在面对故障时保持响应。弹性:系统在不同的工作负载下保持响应。消息驱动:系反应式系统已成为构建健壮、可扩展和响应迅速的应用程序的强大范式。

这些系统被设计为更具弹性、弹性和消息驱动性,确保它们在各种条件下保持响应,包括高负载、网络延迟和故障。

在本文中,我们将探讨 .NET 生态系统中反应式系统的概念,利用 (Rx.NET) 和 来说明一个复杂的用例:用于实时监控和分析股票市场数据的实时仪表板。Reactive ExtensionsActor model (Akka.NET)

了解反应式系统

反应式系统旨在对事件、负载、故障甚至用户做出反应。反应性宣言概述了四个关键特征:

  • 响应式:系统及时响应。

  • 弹性:系统在面对故障时保持响应。

  • 弹性:系统在不同的工作负载下保持响应。

  • 消息驱动:系统依靠异步消息传递来确保松耦合、隔离和位置透明。

实时股市仪表板

想象一下这样的场景:金融机构希望为其用户提供一个实时仪表板,该仪表板显示股票市场趋势、重大股票变动警报并提供实时分析。

此应用程序需要处理大量数据,有效地处理数据,并立即更新用户界面。它是反应式系统的完美候选者。

工具:Rx.NET 和 Akka.NET

为了应对这一挑战,我们将使用两个强大的库:

  • **Rx.NET (Reactive Extensions for .NET):**一个库,用于使用可观察序列和 LINQ 样式的查询运算符编写异步和基于事件的程序。

  • Akka.NET:一个开源工具包和运行时,用于在 .NET 上构建高度并发、分布式和容错的事件驱动应用程序。

在 .NET 中构建反应式系统 — 图片来源:由 Author 创建

构建我们的应用程序

我们的应用程序由几个组件组成:

  1. 数据摄取服务:连接到模拟股票市场数据流。

  2. 处理引擎:分析重大事件(例如,价格急剧上涨)的数据。

  3. 仪表盘服务:实时更新实时仪表盘。

先决条件

  • 安装 Rx.NET 软件包 (System.Reactive)

  • 安装 Akka.NET 软件包 (Akka)

项目结构

此示例的结构如下,全部位于单个控制台应用程序项目中:

  • StockTick类来表示股票数据。

  • StockMarketSimulator类来模拟股票数据流。

  • Akka.NET actors:用于更新仪表板和处理重大动作。DashboardActorSignificantMovementActor

  • 将所有内容连接在一起的主类。Program

步骤 1:设置数据引入服务

我们需要模拟股票市场价格的实时数据馈送。为简单起见,假设我们有一个返回流的函数,其中是一个表示股票符号、价格和时间戳的类。GetStockStream()IObservable<StockTick>StockTick

我们将使用 Rx.NET 定期生成对象流。 方法每秒(或您喜欢的任何其他合理间隔)创建一个价格变动,然后将这些变动映射到具有随机生成的价格和交易品种的对象。StockTickObservable.IntervalStockTick

using System;  
using System.Reactive.Linq;  
using System.Reactive.Threading.Tasks;  
using System.Threading.Tasks;  public class StockTick  
{  public string Symbol { get; set; }  public double Price { get; set; }  public DateTime Timestamp { get; set; }  
}  public static class StockMarketSimulator  
{  private static readonly Random rand = new Random();  private static readonly string[] symbols = new[] {   "AAPL",   "MSFT",   "GOOGL",   "AMZN",   "FB" };  public static IObservable<StockTick> GetStockStream()  {  return Observable.Interval(TimeSpan.FromSeconds(1))  .Select(_ => new StockTick  {  Symbol = symbols[rand.Next(symbols.Length)],  Price = Math.Round(100 + (rand.NextDouble() * 1000), 2),  Timestamp = DateTime.Now  });  }  
}

第 2 步:使用 Rx.NET 分析数据

Rx.NET 在处理数据流方面大放异彩。我们可以订阅并使用 LINQ 查询来实时处理和分析股票即时报价。IObservable<StockTick>

让我们假设,一个重大的变动是在 5 秒缓冲窗口内价格变化超过 30%。

using System;  
using System.Linq;  
using System.Reactive.Linq;  public class StockAnalysis  
{  public static void AnalyzeStockTicks(IActorRef significantMovementActor)  {  IObservable<StockTick> stockStream = StockMarketSimulator.GetStockStream();  return stockStream  .GroupBy(tick => tick.Symbol)  .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(30)))  .Select(buffer =>  {  var firstTick = buffer.First();  var lastTick = buffer.Last();  // Avoid potential issues if the buffer is empty  if (firstTick == null || lastTick == null) return null;  var priceChange = Math.Abs((lastTick.Price - firstTick.Price) / firstTick.Price);  return new  {  Symbol = firstTick.Symbol,  PriceChange = priceChange,  StartPrice = firstTick.Price,  EndPrice = lastTick.Price,  Timestamp = DateTime.Now  };  })  .Where(x => x != null && x.PriceChange > 0.05); // Filter for more than 5% price change   }  
}

对每个股票品种的报价进行分组,缓冲 30 秒,然后进行分析以发现重大的价格变化。如果价格变化超过 5%,则认为价格变动显著,并且有关变动的信息将打印到控制台。

调用是 Rx.NET 模式的重要组成部分,其中订阅了处理的结果(重大变动)。该方法是触发可观察序列的执行,并定义如何处理每个发出的项目(重大的股价变动)。.Subscribe(movement => { ... })Subscribe

步骤 3:集成仪表板更新的 Akka.NET

我们将创建一个 Akka.NET 参与者系统,以处理我们的 Rx.NET 分析检测到的重大股票变动。此执行组件系统将由两个主要执行组件组成:

  • DashboardActor:负责接收重要的移动消息并更新仪表板。

  • SignificantMovementActor:订阅可观察对象,并在检测到重大移动时向 发送消息。significantMovementsDashboardActor

public class SignificantMovement  
{  public string Symbol { get; set; }  public double PriceChange { get; set; }  public double StartPrice { get; set; }  public double EndPrice { get; set; }  public DateTime Timestamp { get; set; }  
}using Akka.Actor;  // Akka.NET Actor for dashboard updates  
public class DashboardActor : ReceiveActor  
{  public DashboardActor()  {  Receive\<SignificantMovement>(movement =>  {  // Logic to update the dashboard with the significant movement  Console.WriteLine($"Dashboard updated for {movement.Symbol}: {movement.PriceChange \* 100:F2}% change, from {movement.StartPrice} to {movement.EndPrice}");  });  }  
}using Akka.Actor;  
using System;  
using System.Reactive.Linq;  // Akka.NET Actor to handle significant movements  
public class SignificantMovementActor : ReceiveActor  
{  private readonly IActorRef _dashboardActor;  public SignificantMovementActor(IActorRef dashboardActor, IObservable<dynamic> significantMovements)  {  this._dashboardActor = dashboardActor;  significantMovements.Subscribe(movement =>  {  var significantMovement = new SignificantMovement  {  Symbol = movement.Symbol,  PriceChange = movement.PriceChange,  StartPrice = movement.StartPrice,  EndPrice = movement.EndPrice,  Timestamp = movement.Timestamp  };  _dashboardActor.Tell(significantMovement);  });  }  
}

创建执行组件系统,并在主应用程序逻辑中将所有内容绑定在一起。

using Akka.Actor;  
using System;  class Program  
{  static void Main(string[] args)  {  var system = ActorSystem.Create("StockMonitorSystem");  var dashboardActor = system.ActorOf<DashboardActor>("dashboardActor");  var significantMovements = StockAnalysis.AnalyzeStockTicks();  var props = Props.Create(() => new SignificantMovementActor(dashboardActor, significantMovements));  system.ActorOf(props, "significantMovementActor");  Console.WriteLine("System is running. Press any key to exit...");  Console.ReadLine();  system.Terminate().Wait();  }  
}

该方法设置并运行一个 Akka.NET actor 系统,该系统与 Rx.NET 集成,以处理和显示显着的库存变动。Main

让我们分解该方法的每个部分,以了解它是如何工作的,以及它如何实现对股票市场数据模拟和处理的持续监听。Main

var system = ActorSystem.Create("StockMonitorSystem");

此行初始化名为 的 Akka.NET 执行组件系统的新实例。执行组件系统是一个分层的执行组件组,它为创建执行组件、调度消息和管理执行组件生命周期提供基础结构。这是使用 Akka.NET 的入口点。StockMonitorSystem

var dashboardActor = system.ActorOf<DashboardActor>("dashboardActor");

这将在执行组件系统中创建类型的执行组件。 是用于实例化 actor 的方法,返回对新创建的 actor 的引用。 是为此执行组件实例提供的名称,可用于在系统中查找它。DashboardActorActorOfdashboardActor

var props = Props.Create(() => new SignificantMovementActor(dashboardActor, significantMovements));

Props 是一个配置类,用于描述如何创建 actor 的实例。通过使用 ,您可以指定应如何构造 ,包括其依赖项。在这里,它采用一个 lambda 表达式,该表达式构造一个新的 ,传入引用和可观察序列。Props.CreateSignificantMovementActorSignificantMovementActordashboardActorsignificantMovements

system.ActorOf(props, "significantMovementActor");

此行创建使用前面定义的 props 的实例。它类似于 的创建方式,但使用 用于更复杂的初始化。SignificantMovementActordashboardActorprops

现在,从 Rx.NET 创建的参与者和可观察序列被设置为连续处理和显示重要的库存变动。侦听可观察对象并将其收到的任何消息转发到 .反过来,将处理这些消息(在本例中,将信息打印到控制台)。SignificantMovementActorsignificantMovementsdashboardActordashboardActor

Console.ReadLine();

此行至关重要,因为它可以防止应用程序立即退出。它等待用户按下一个键,然后再继续,有效地保持应用程序(以及参与者系统)运行并能够处理传入的模拟股票数据。

system.Terminate().Wait();

system.Terminate()启动执行组件系统的关闭,停止所有执行组件并释放资源。这是一个异步操作,返回 .Task

.Wait()在终止任务上,确保应用程序在执行组件系统完全关闭之前不会退出。这对于干净和优雅的退出非常重要,确保完成所有正在进行的处理并正确释放资源。

因此,该方法建立了一个连续的、反应性的处理管道,使用 Akka.NET 参与者来处理和显示重大的股票市场走势。Main

问:我能否通过发布到队列并从队列中侦听而不是使用 Rx.NET 和 Akka.NET 来实现相同的功能?

是的,队列可用于类似的数据流和处理,但 Rx.NET 提供了更具表现力和简洁的流处理能力,而 Akka.NET 提供了一个强大的框架,用于使用基于参与者的模型构建并发和分布式系统,从而增强容错能力和系统响应能力。


http://www.ppmy.cn/ops/5235.html

相关文章

ElasticSearch可视化工具:kibana + elasticsearch-head

kibana 下载 地址&#xff1a;https://www.elastic.co/cn/downloads/kibana 下载别的版本&#xff1a;https://www.elastic.co/cn/downloads/past-releases#kibana 将Kibana安装包解压缩 进入config目录&#xff0c;在kibana.yml中添加es服务器地址。&#xff08;如果之前没…

AppBuilder升级!工作流编排正式上线!AssistantsAPI开放邀测!

>>【v0.5.3版本】 上线时间&#xff1a;2024/4/14 关键发版信息&#xff1a; 低代码态&#xff1a;新增工作流&#xff0c;低代码制作组件 自定义组件&#xff1a;支持用户自定义创建组件&#xff0c;并被Agent自动编排调用
 工作流框架&#xff1a;组件支持流式编排…

【HCIP学习】OSPF协议基础

一、OSPF基础 1、技术背景&#xff08;RIP中存在的问题&#xff09; RIP中存在最大跳数为15的限制&#xff0c;不能适应大规模组网 周期性发送全部路由信息&#xff0c;占用大量的带宽资源 路由收敛速度慢 以跳数作为度量值 存在路由环路可能性 每隔30秒更新 2、OSPF协议…

OWASP发布10大开源软件风险清单

3月20日&#xff0c;xz-utils 项目被爆植入后门震惊了整个开源社区&#xff0c;2021 年 Apache Log4j 漏洞事件依旧历历在目。倘若该后门未被及时发现&#xff0c;那么将很有可能成为影响最大的软件供应链漏洞之一。近几年爆发的一系列供应链漏洞和风险&#xff0c;使得“加强开…

简介:Asp.Net Core进阶高级编程教程

课程简介目录 &#x1f680;前言一、课程背景二、课程目的三、课程特点四、课程适合人员六、最后 &#x1f680;前言 本文是《.Net Core进阶编程课程》教程专栏的导航站&#xff08;点击链接&#xff0c;跳转到专栏主页&#xff0c;欢迎订阅&#xff0c;持续更新…&#xff09…

【笔记】Android 网络漫游更新网络状态、运营商名称等信息日志分析

业务知识 漫游有国内和国际漫游之分,Android代码定义如下: //frameworks/base/telephony/java/android/telephony/ServiceState.java/*** registered in a domestic roaming network* @hide*/@SystemApipublic static final int ROAMING_TYPE_DOMESTIC = 2;/*** registered…

ssm 体检预约管理系统开发mysql数据库web结构java编程计算机网页源码eclipse项目

一、源码特点 ssm 体检预约管理系统是一套完善的信息系统&#xff0c;结合springMVC框架完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码和数据库&#xff0c; 系统主要采用B/S…

手撸词法分析器(C/C++)

手撸词法分析器&#xff08;C/C&#xff09; 一.背景二.什么是词法分析器&#xff1f;三.代码四.思考 一.背景 这学期开设了编译原理&#xff0c;要求写个基本的词法分析器。所以博主就自己写了一份代码&#xff0c;也比较简单基础。 二.什么是词法分析器&#xff1f; 简单来…