1.写在前面
前面已经写过akka的很多文章了,具体如下:
- Spring Boot集成akka actor快速入门Demo
- Spring Boot集成Akka Stream快速入门Demo
- Spring Boot集成Akka remoting快速入门Demo
- Spring Boot集成Akka Cluster快速入门Demo
今天主要讲一下如何在一个akka集群环境中提交任务并在集群中执行
2.代码工程
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>akka</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.13</artifactId><version>2.6.0</version></dependency><!-- Akka Streams --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-stream_2.13</artifactId><version>2.6.0</version></dependency><!-- Akka Actor dependency --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor-typed_2.13</artifactId><version>2.6.0</version></dependency><!-- Akka Remote dependency --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.13</artifactId><version>2.6.0</version></dependency><!-- Akka Cluster dependency --><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster-typed_2.13</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies></project>
MasterActor
- 作用:
MasterActor
是系统中的主控制器或协调者。它负责管理和分配任务,监控工作进度,以及处理系统的全局状态。 - 功能:
- 任务分配:接收任务请求并将这些任务分发给工作节点(
WorkActor
)。 - 协调工作:协调多个
WorkActor
的工作,确保任务按预期执行。 - 结果汇总:汇总来自
WorkActor
的结果,可能还会对结果进行处理或存储。 - 监控与容错:监控
WorkActor
的状态,处理异常情况,进行故障恢复等。
- 任务分配:接收任务请求并将这些任务分发给工作节点(
package com.et.akka.cluster;import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.event.Logging;
import akka.event.LoggingAdapter;import java.util.HashMap;
import java.util.Map;public class MasterActor extends AbstractActor {private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);private final ActorRef workerRouter;// Constructor to initialize the worker routerpublic MasterActor(ActorRef workerRouter) {this.workerRouter = workerRouter;}public static Props props(ActorRef workerRouter) {return Props.create(MasterActor.class, workerRouter);}@Overridepublic Receive createReceive() {return receiveBuilder().match(TaskMessage.class, msg -> {log.info("Received task message: {}", msg.task);workerRouter.tell(msg, getSelf()); // Forward task to worker router}).build();}
}
WorkerRouterActor
- 作用:
WorkerRouterActor
是一个路由器,负责将任务分发给多个WorkActor
实例。它通常用于负载均衡和高效地管理工作负载。 - 功能:
- 任务路由:根据路由策略将任务分发给一个或多个
WorkActor
实例。常见的路由策略包括轮询、随机、最少工作量等。 - 负载均衡:确保工作负载在所有
WorkActor
实例中均匀分布。 - 动态调整:可以根据系统负载动态调整
WorkActor
的数量,进行扩展或收缩。
- 任务路由:根据路由策略将任务分发给一个或多个
package com.et.akka.cluster;import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.routing.RoundRobinPool;
import akka.routing.Router;public class WorkerRouterActor extends AbstractActor {private final ActorRef router;public WorkerRouterActor(int numberOfWorkers) {this.router = getContext().actorOf(new RoundRobinPool(numberOfWorkers).props(WorkerActor.props()), "workerRouter");}@Overridepublic Receive createReceive() {return receiveBuilder().match(TaskMessage.class, msg -> router.tell(msg, getSelf())).match(Terminated.class, t -> getContext().stop(getSelf())).build();}public static Props props(int numberOfWorkers) {return Props.create(WorkerRouterActor.class, numberOfWorkers);}
}
WorkActor
- 作用:
WorkActor
是实际执行任务的工作单元。它负责处理具体的工作负载,并返回结果给MasterActor
或通过WorkerRouterActor
进行汇总。 - 功能:
- 执行任务:接收任务并执行相关操作,如计算、数据处理等。
- 报告结果:完成任务后,将结果发送回
MasterActor
或其他负责汇总结果的组件。 - 错误处理:处理任务执行过程中可能出现的错误或异常情况。
package com.et.akka.cluster;import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;public class WorkerActor extends AbstractActor {private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);public static Props props() {return Props.create(WorkerActor.class);}@Overridepublic Receive createReceive() {return receiveBuilder().match(TaskMessage.class, msg -> {log.info("Processing task: {}", msg.task);// Simulate task processingThread.sleep(1000);log.info("Task completed: {}", msg.task);}).build();}
}
ClusterApp2
package com.et.akka.cluster;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;public class ClusterApp2 {public static void main(String[] args) {// Load configurationConfig config = ConfigFactory.load();ActorSystem system = ActorSystem.create("ClusterSystem", config);// Create WorkerRouterActor with 5 workersActorRef workerRouter = system.actorOf(WorkerRouterActor.props(5), "workerRouter");// Create MasterActorActorRef masterActor = system.actorOf(MasterActor.props(workerRouter), "masterActor");// Log cluster membershipCluster cluster = Cluster.get(system);System.out.println("Cluster initialized with self member: " + cluster.selfAddress());// Submit tasksmasterActor.tell(new TaskMessage("Task 1"), ActorRef.noSender());masterActor.tell(new TaskMessage("Task 2"), ActorRef.noSender());masterActor.tell(new TaskMessage("Task 3"), ActorRef.noSender());// Keep system alive for demonstration purposestry {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}system.terminate();}
}
只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
- GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)
3.测试
启动ClusterApp2类之中的main方法,查看日志,发现任务执行成功
Cluster initialized with self member: akka://ClusterSystem@127.0.0.1:2551 22:03:04.948 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.et.akka.cluster.MasterActor - Received task message: Task 1 22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 1 22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.MasterActor - Received task message: Task 2 22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.MasterActor - Received task message: Task 3 22:03:04.950 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 2 22:03:04.950 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 3 22:03:05.951 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 3 22:03:05.952 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 2 22:03:05.952 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 1