Spring Boot集成Akka Cluster实现在分布式节点中执行任务

devtools/2024/9/20 9:19:01/ 标签: maven, java, spring boot, akka

1.写在前面

前面已经写过akka的很多文章了,具体如下:

  • Spring Boot集成akka actor快速入门Demo
  • Spring Boot集成Akka Stream快速入门Demo
  • Spring Boot集成Akka remoting快速入门Demo
  • Spring Boot集成Akka Cluster快速入门Demo

今天主要讲一下如何在一个akka集群环境中提交任务并在集群中执行

2.代码工程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>akka</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.13</artifactId><version>2.6.0</version></dependency><!-- Akka Streams --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-stream_2.13</artifactId><version>2.6.0</version></dependency><!-- Akka Actor dependency --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor-typed_2.13</artifactId><version>2.6.0</version></dependency><!-- Akka Remote dependency --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.13</artifactId><version>2.6.0</version></dependency><!-- Akka Cluster dependency --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster-typed_2.13</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies></project>

MasterActor

 

  • 作用MasterActor 是系统中的主控制器或协调者。它负责管理和分配任务,监控工作进度,以及处理系统的全局状态。
  • 功能
    • 任务分配:接收任务请求并将这些任务分发给工作节点(WorkActor)。
    • 协调工作:协调多个 WorkActor 的工作,确保任务按预期执行。
    • 结果汇总:汇总来自 WorkActor 的结果,可能还会对结果进行处理或存储。
    • 监控与容错:监控 WorkActor 的状态,处理异常情况,进行故障恢复等。

 

package com.et.akka.cluster;import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.event.Logging;
import akka.event.LoggingAdapter;import java.util.HashMap;
import java.util.Map;public class MasterActor extends AbstractActor {private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);private final ActorRef workerRouter;// Constructor to initialize the worker routerpublic MasterActor(ActorRef workerRouter) {this.workerRouter = workerRouter;}public static Props props(ActorRef workerRouter) {return Props.create(MasterActor.class, workerRouter);}@Overridepublic Receive createReceive() {return receiveBuilder().match(TaskMessage.class, msg -> {log.info("Received task message: {}", msg.task);workerRouter.tell(msg, getSelf()); // Forward task to worker router}).build();}
}

WorkerRouterActor

 

  • 作用WorkerRouterActor 是一个路由器,负责将任务分发给多个 WorkActor 实例。它通常用于负载均衡和高效地管理工作负载。
  • 功能
    • 任务路由:根据路由策略将任务分发给一个或多个 WorkActor 实例。常见的路由策略包括轮询、随机、最少工作量等。
    • 负载均衡:确保工作负载在所有 WorkActor 实例中均匀分布。
    • 动态调整:可以根据系统负载动态调整 WorkActor 的数量,进行扩展或收缩。

 

package com.et.akka.cluster;import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.routing.RoundRobinPool;
import akka.routing.Router;public class WorkerRouterActor extends AbstractActor {private final ActorRef router;public WorkerRouterActor(int numberOfWorkers) {this.router = getContext().actorOf(new RoundRobinPool(numberOfWorkers).props(WorkerActor.props()), "workerRouter");}@Overridepublic Receive createReceive() {return receiveBuilder().match(TaskMessage.class, msg -> router.tell(msg, getSelf())).match(Terminated.class, t -> getContext().stop(getSelf())).build();}public static Props props(int numberOfWorkers) {return Props.create(WorkerRouterActor.class, numberOfWorkers);}
}

WorkActor

 

  • 作用WorkActor 是实际执行任务的工作单元。它负责处理具体的工作负载,并返回结果给 MasterActor 或通过 WorkerRouterActor 进行汇总。
  • 功能
    • 执行任务:接收任务并执行相关操作,如计算、数据处理等。
    • 报告结果:完成任务后,将结果发送回 MasterActor 或其他负责汇总结果的组件。
    • 错误处理:处理任务执行过程中可能出现的错误或异常情况。

 

package com.et.akka.cluster;import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;public class WorkerActor extends AbstractActor {private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);public static Props props() {return Props.create(WorkerActor.class);}@Overridepublic Receive createReceive() {return receiveBuilder().match(TaskMessage.class, msg -> {log.info("Processing task: {}", msg.task);// Simulate task processingThread.sleep(1000);log.info("Task completed: {}", msg.task);}).build();}
}

ClusterApp2

package com.et.akka.cluster;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;public class ClusterApp2 {public static void main(String[] args) {// Load configurationConfig config = ConfigFactory.load();ActorSystem system = ActorSystem.create("ClusterSystem", config);// Create WorkerRouterActor with 5 workersActorRef workerRouter = system.actorOf(WorkerRouterActor.props(5), "workerRouter");// Create MasterActorActorRef masterActor = system.actorOf(MasterActor.props(workerRouter), "masterActor");// Log cluster membershipCluster cluster = Cluster.get(system);System.out.println("Cluster initialized with self member: " + cluster.selfAddress());// Submit tasksmasterActor.tell(new TaskMessage("Task 1"), ActorRef.noSender());masterActor.tell(new TaskMessage("Task 2"), ActorRef.noSender());masterActor.tell(new TaskMessage("Task 3"), ActorRef.noSender());// Keep system alive for demonstration purposestry {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}system.terminate();}
}

只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)

3.测试

启动ClusterApp2类之中的main方法,查看日志,发现任务执行成功

Cluster initialized with self member: akka://ClusterSystem@127.0.0.1:2551
22:03:04.948 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.et.akka.cluster.MasterActor - Received task message: Task 1
22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 1
22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.MasterActor - Received task message: Task 2
22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.MasterActor - Received task message: Task 3
22:03:04.950 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 2
22:03:04.950 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 3
22:03:05.951 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 3
22:03:05.952 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 2
22:03:05.952 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 1

4.引用

  • https://doc.akka.io/docs/akka/current/common/cluster.html#gossip

http://www.ppmy.cn/devtools/113183.html

相关文章

Linux: network: IPv6: ESP: UDP checksum error 一例

文章目录 问题分析解决方法问题 最近遇到一个问题,操作系统的内核版本是:3.10.0-693.21.1.el7.x86_64 #1 SMP Wed Mar 7 19:03:37 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux 使用的是virtio-net虚拟网卡,设置网卡的MTU-1500,配置IP6的地址,用ESP加密UDP包,如果UDP内部数…

Vue与React的Diff算法

虚拟DOM 定义 虚拟DOM是一种用于在前端开发中模拟真实DOM的技术。它是一种抽象的数据结构&#xff08;简单来说就是一个Javascript对象&#xff09;&#xff0c;用于描述HTML或XML文档的结构和内容。通过将页面的状态和结构保存在内存中&#xff0c;而不是直接操作真实的DOM&am…

OCR2.0--General OCR Theory

引领光学字符识别&#xff08;OCR&#xff09;的新篇章 引言&#xff1a;OCR技术进化的必要性 光学字符识别&#xff08;OCR&#xff09;是一项广泛应用的技术&#xff0c;它能够从图像中提取字符并将其转换为可编辑格式。虽然OCR-1.0在过去取得了广泛应用&#xff0c;但传统…

iOS的传递链与响应链机制

iOS的框架分为&#xff1a;应用层、触摸层、媒体层、核心服务层、核心系统操作层以及内核和驱动层。 详见https://blog.csdn.net/ScheenDuan/article/details/134274203?spm1001.2014.3001.5501 其中触摸事件涉及到触摸层中的UIKit中的UIResponse&#xff0c;只有继承了UIRe…

在Ubuntu 16.04上安装R的方法

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 简介 R 是一种流行的开源编程语言&#xff0c;专门用于统计计算和图形处理。它被广泛用于开发统计软件和进行数据分析。R 很容易扩展&a…

前端发布 CDN缓存

公司给服务器加了CDN&#xff0c;导致有时前端代码上传打包后&#xff0c;正式环境页面效果却不更新。每次都需要去找运维刷CDN…让我彻底记住了CDN缓存 CDN&#xff08;Content Delivery Network&#xff0c;内容分发网络&#xff09;是一种广泛使用的互联网技术&#xff0c;…

详解 Pandas 的 rename 函数

Pandas 的 rename 函数主要是用于对 DataFrame 的行名和列名进行重命名&#xff0c;其基本语法如下&#xff1a; 一、修改行名 1. 数据准备 import pandas as pddf pd.DataFrame({"Jan" : [1, 2, 3],"Feb": [4, 5, 6],"Mar": [7, 8, 9] })pr…

动态ip切换频率是快点好还是慢点好

在网络爬虫、数据采集、网络营销等活动中&#xff0c;动态IP切换成为了一种常见的策略&#xff0c;用以规避访问限制、提高访问效率或隐藏真实身份。然而&#xff0c;关于动态IP的切换频率&#xff0c;一直存在着一个争议&#xff1a;是切换得快点好&#xff0c;还是慢点好&…

C++ ——string的模拟实现

目录 前言 浅记 1. reserve&#xff08;扩容&#xff09; 2. push_back&#xff08;尾插&#xff09; 3. iterator&#xff08;迭代器&#xff09; 4. append&#xff08;尾插一个字符串&#xff09; 5. insert 5.1 按pos位插入一个字符 5.2 按pos位插入一个字符串 …

C++学习笔记----7、使用类与对象获得高性能(一)---- 书写类(3)

2.4、this指针 每个正常的成员函数调用都会隐含地传递一个指针给到对象&#xff0c;它就是被可能我的天this的隐藏参数。使用该指针访问数据成员或者调用成员函数&#xff0c;也可以将其传递给其他的成员函数或者函数。有时候它对消除有歧义的名字很有用。例如&#xff0c;可以…

Navicat使用 笔记04

Navicat调用数据库 1.创建一个自己的链接&#xff08;文件-->新建连接-->MySQL&#xff09; 进入到这个界面中&#xff1a; 【注意&#xff1a;密码是下载登录软件时设定过的】 创建一个连接完成&#xff08;通过双击激活&#xff09;。 2.在创建好的连接中创建数据库…

从零到一,数字文创IP是如何在基地中孵化成长的?

在数字时代的浪潮下&#xff0c;数字文创IP孵化基地正成为培育创新的肥沃土壤&#xff0c;见证着一个个数字文创 IP 从无到有、茁壮成长。 数字文创IP孵化基地首先为创意的萌发提供了空间。这里汇聚了各路富有创造力的人才&#xff0c;他们的思想在这里碰撞&#xff0c;灵感的火…

strncpy函数的使用和模拟实现

目录 1.头文件 2.strncpy函数功能 2.1情况二&#xff1a; 3.strncpy函数&#xff08;模拟实现&#xff09; 方源一把抓住VS2022&#xff0c;催动春秋产的气息&#xff0c;顷刻炼化&#xff01; 1.头文件 strncpy函数的使用需要包括头文件<string.h> #include<string…

Simulink常用英文单词缩写及基本操作

Simulink常用英文单词缩写及基本操作 常用快捷键 快捷键说明 Shift 鼠标左键加选选择多个元件拖动 Ctrl 鼠标左键复制选中的元件并跟随鼠标拖动复制操作 Del 删除选中的元件 Ctrl R 旋转元件 Ctrl I 水平翻转Flip Block Ctrl 另一个模块快速连接信号线自动连线 C…

94. 二叉树的中序遍历

思路 中序遍历的遍历顺序&#xff1a;左子树、根、右子树 # Definition for a binary tree node. # class TreeNode(object): # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left left # self.right right class S…

顺序表(c语言实现)

顺序表是一种数据结构&#xff0c;它在计算机内存中以连续的存储位置来存储数据元素。 一、特点 1. 随机访问&#xff1a;可以在常数时间内访问特定位置的元素&#xff0c;例如&#xff0c;通过下标可以快速找到对应元素。 2. 存储密度高&#xff1a;不需要额外的指针来链接…

车载软件架构 --- SOA设计与应用(下)

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明自己,无利益不试图说服别人,是精神上的节…

RabbitMQ高级篇,进阶内容

强烈建议在看本篇博客之前快速浏览文章&#xff1a;RabbitMQ基础有这一篇就够了 RabbitMQ高级篇 0. 前言1. 发送者的可靠性1.1 生产者重试机制1.2 生产者确认机制1.3 实现生产者确认 2. MQ的可靠性2.1 MQ持久化2.2 LazyQueue 3. 消费者的可靠性3.1 消费者确认机制3.2 失败重试策…

openssh升级到openssh9.8p1版本

需要准备的RPM文件 openssh-9.8p1-1.el7.x86_64.rpm openssh-clients-9.8p1-1.el7.x86_64.rpm openssh-server-9.8p1-1.el7.x86_64.rpm如果需要编译文件,请参考文章:OpenSSH9.8p1编译rpm包 开始升级openssh 查询原openssh信息 查询原openssh是否有安装openssh-askpass,若…

FreeRTOS学习——链表list

FreeRTOS学习——链表&#xff08;列表&#xff09;list&#xff0c;仅用于记录自己阅读与学习源码 FreeRTOS Kernel V10.5.1 参考大佬的好文章&#xff1a; freertos内核原理 Day1(链表) FreeRTOS-链表的源码解析 *list_t只能存储指向list_item_t的指针。每个list_item_t都…