Spring Boot集成Akka Cluster快速入门Demo

server/2024/9/24 8:13:27/

1.什么是Akka Cluster?

Akka Cluster将多个JVM连接整合在一起,实现消息地址的透明化和统一化使用管理,集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序,部署到很多JVM上去实现程序的分布式并行运算(单机也可以起很多节点构成集群)。更重要的是, Akka Cluster集群构建与Actor编程没有直接的联系,集群构建是在ActorSystem层面上,实现了Actor消息地址的透明化,无需考虑目标运行环节是否分布式,可以按照正常的Actor编程模式进行开发。 我们知道,分布式集群是由若干节点组成的,那么节点的发现及状态管理是分布式系统一个比较重要的任务。Akka Cluster中将节点的生命周期划分为:

member-states

 

  • joining - 当尝试加入集群时的初始状态
  • up - 加入集群后的正常状态
  • leaving / exiting - 节点退出集群时的中间状态
  • down - 集群无法感知某节点后,将其标记为down
  • removed - 从集群中被删除,以后也无法再加入集群

其实当参数akka.cluster.allow-weakly-up-members启用时(默认是启用的),还有个weakly up,它是用于集群出现分裂时,集群无法收敛,则leader无法将状态置为up的临时状态。这个后面再解释。 图中还有两个特殊的名词:

  • fd* - 这个表示akka的错误检测机制Faiulre Detector被触发后,将节点标记为unreachable
  • unreachable* - unreachable不是一个真正的节点状态,更多的像是一个flag,用来描述集群无法与该节点进行通讯。当错误检测机制侦测到这个节点又能正常通讯时,会移除这个flag。

市面上大多数产品的分布式管理一般用的是注册中心机制,例如zk、consul或etcd。其实是节点把自己的信息注册到所使用的注册中心里,而master通过接受注册中心的通知得知新节点信息。显然本质上是一种master/slave的架构。这种架构有两个问题:

  1. master节点一般是单一的,一旦挂了影响就比较大(所以很多master都采用了HA机制),也就是所谓的系统单点故障;
  2. 通常节点的地址发现是要走master去获取的,当系统并发大时,master节点就可能成为性能瓶颈,即单点性能瓶颈。

Akka可能就是考虑这两点,采用了P2P的模式,这样任何一个节点都可以作为”master”,任何的节点都可以用来寻找其他节点地址。那它是怎么做到的呢?答案是Gossip协议和CRDT。这里不做过多解释,感兴趣的话可以自己去翻阅相关介绍

2.代码工程

实验目的

搭建一个简单akka custer集群

pom.xml

<!-- Akka Cluster dependency -->
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster-typed_2.13</artifactId><version>2.6.0</version>
</dependency>

cluster

node1.conf

akka {actor {provider = "cluster"  }remote {artery {canonical.hostname = "127.0.0.1"canonical.port = 2551 }}cluster {seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551","akka://ClusterSystem@127.0.0.1:2552"]}
}

node2.conf

akka {actor {provider = "cluster"}remote {artery {canonical.hostname = "127.0.0.1"canonical.port = 2552  }}cluster {seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551","akka://ClusterSystem@127.0.0.1:2552"]}
}

集群监听器

package com.et.akka.cluster;import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
import akka.cluster.ClusterEvent;public class ClusterListener extends AbstractBehavior<ClusterEvent.ClusterDomainEvent> {public ClusterListener(ActorContext<ClusterEvent.ClusterDomainEvent> context) {super(context);Cluster cluster = Cluster.get(context.getSystem());cluster.subscriptions().tell(Subscribe.create(getContext().getSelf(), ClusterEvent.ClusterDomainEvent.class));}@Overridepublic Receive<ClusterEvent.ClusterDomainEvent> createReceive() {return newReceiveBuilder().onMessage(ClusterEvent.MemberUp.class, this::onMemberUp).onMessage(ClusterEvent.MemberRemoved.class, this::onMemberRemoved).onAnyMessage(event -> {System.out.println("Received cluster event: " + event);return this;}).build();}private Behavior<ClusterEvent.ClusterDomainEvent> onMemberUp(ClusterEvent.MemberUp memberUp) {System.out.println("Member is Up: " + memberUp.member());return this;}private Behavior<ClusterEvent.ClusterDomainEvent> onMemberRemoved(ClusterEvent.MemberRemoved memberRemoved) {System.out.println("Member is Removed: " + memberRemoved.member());return this;}public static Behavior<ClusterEvent.ClusterDomainEvent> create() {return Behaviors.setup(ClusterListener::new);}
}

启动集群

package com.et.akka.cluster;import akka.actor.typed.ActorSystem;
import akka.cluster.ClusterEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;import java.io.File;public class ClusterApp {public static void main(String[] args) {Config configNode1 = ConfigFactory.parseFile(new File("D:/IdeaProjects/ETFramework/akka/src/main/resources/node1.conf")).withFallback(ConfigFactory.load());ActorSystem<ClusterEvent.ClusterDomainEvent> systemNode1 = ActorSystem.create(ClusterListener.create(), "ClusterSystem", configNode1);System.out.println("Node 1 started with config from node1.conf");Config configNode2 = ConfigFactory.parseFile(new File("D:/IdeaProjects/ETFramework/akka/src/main/resources/node2.conf")).withFallback(ConfigFactory.load());ActorSystem<ClusterEvent.ClusterDomainEvent> systemNode2 = ActorSystem.create(ClusterListener.create(), "ClusterSystem", configNode2);System.out.println("Node 2 started with config from node2.conf");}
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)

3.测试

启动集群(执行ClusterApp里面的main方法),查看日志可以看到2个节点都起来了

23:00:19.201 [ClusterSystem-akka.actor.default-dispatcher-6] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Welcome from [akka://ClusterSystem@127.0.0.1:2551]
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2551, status = Up)
Received cluster event: MemberJoined(Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Joining))
Received cluster event: LeaderChanged(Some(akka://ClusterSystem@127.0.0.1:2551))
Received cluster event: RoleLeaderChanged(dc-default,Some(akka://ClusterSystem@127.0.0.1:2551))
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()
23:00:19.645 [ClusterSystem-akka.actor.default-dispatcher-5] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2552] to [Up]
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Up)
Received cluster event: SeenChanged(false,Set(akka://ClusterSystem@127.0.0.1:2551))
Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Up)
Received cluster event: ReachabilityChanged()
Received cluster event: SeenChanged(true,Set(akka://ClusterSystem@127.0.0.1:2551, akka://ClusterSystem@127.0.0.1:2552))
Received cluster event: ReachabilityChanged()

4.引用

  • Cluster Specification • Akka Documentation

http://www.ppmy.cn/server/117576.html

相关文章

find 命令:搜索文件

一、命令简介 ​find​命令的作用是搜索文件和目录。 相关命令&#xff1a;locate、whereis ‍ 二、命令参数 find [起始目录] [匹配模式] [条件]选项和参数 ​-name pattern​: 根据文件名模式查找文件​-type type​: 根据文件类型查找文件&#xff08;如f​表示普通文…

Android 进程间通信

在 Android 中&#xff0c;进程间通信 (IPC, Inter-Process Communication) 是指在不同进程之间进行数据交换的机制。Android 提供了几种主要的 IPC 方法&#xff0c;每种方法适用于不同的场景。 1. Binder 机制 Binder 是 Android 核心的 IPC 机制&#xff0c;底层是通过操作…

HTTPS和HTTP区别是什么?

HTTP和HTTPS是两种协议&#xff0c;分别是HyperText Transfer Protocol和HyperText Transfer Protocol Secure。 HTTPS还经常被称为HTTP over SSL或者HTTP over TSL&#xff0c;HTTPS经由HTTP进行通信&#xff0c;但利用SSL/TLS来加密数据包。 他们的区别主要由以下几个方面&…

ARM驱动学习之5 LEDS驱动

ARM驱动学习之5 LEDS驱动 知识点&#xff1a; • linuxGPIO申请函数和赋值函数 – gpio_request – gpio_set_value • 三星平台配置GPIO函数 – s3c_gpio_cfgpin • GPIO配置输出模式的宏变量 – S3C_GPIO_OUTPUT注意点&#xff1a; DRIVER_NAME 和 DEVICE_NAME 匹配。实现步…

禁用win10的自动更新功能

禁用win10自动更新的方法&#xff1a; 方法1&#xff1a;系统设置 开始->设置&#xff08;win i&#xff09;->更新和安全->高级选项->暂停更新&#xff08;只能暂停35天&#xff09; 方法2&#xff1a;服务 开始->运行&#xff08;win r&#xff09;->s…

麒麟操作系统搭建Nacos集群

Nacos 集群搭建 2.4.2 环境介绍 操作系统Kylin Linux Advanced Server V10 (Lance)Kylin Linux Advanced Server V10 (Lance)Kylin Linux Advanced Server V10 (Lance)内核版本Linux 4.19.90-52.22.v2207.ky10.aarch64Linux 4.19.90-52.22.v2207.ky10.aarch64Linux 4.19.90-52…

SEGGERS实时系统embOS推出Linux端模拟器

SEGGER 发布了两个新的 embOS 仿真模拟器&#xff1a;embOS Sim Linux 和 embOS-MPU Sim Linux。 通过模拟 Linux 主机系统上的硬件&#xff0c;取代物理硬件&#xff0c;为开发人员提供了一种无缝的方式来构建原型和测试应用程序。 embOS Sim Linux 端口支持 32 位和 64 位系…

c4d的重命名工具(支持模型和材质) 及 python窗口定义

不是我牛逼&#xff0c;是豆包牛逼&#xff01; 一个简化版的窗口 import c4d from c4d import guiclass MyDialog(gui.GeDialog):def __init__(self):super().__init__()self.SetTitle("My Dialog")def CreateLayout(self):# 设置对话框布局return Truemy_dialog …