架构解密从分布式到微服务:聊聊分布式计算,初识Akka
从上图中还可以看到,Akka中的每个Actor都有一个Path (路径),对于一个Actor子系统ActorSystem (类似于μJavaActors中的- -个ActorManager,维护-一个Actor 命名空间)来说,顶级根路径是“/”,下面有两个子路径, 分别是user (用户空间)路径与system (系统空间)路径,在前者的路径分支上挂接了我们自己开发的Actor,后者则是Akka
初识 Akka
虽然Akka基于Scala而非Java语言编写而成,但由于Scala最终还是被编译为Java字节码并运行在JVM之上,所以我们可以认为Akka属于Java领域。
Akka官方对Akka的介绍如下。
●是对并发、并行程序的简单的高级别的抽象。
●是异步、 非阻塞、高性能的事件驱动编程模型。
●是非常 轻量的事件驱动处理机制(1GB内存可容纳约270万个Actor)。
通过7.2节对μJavaActors的讲解,我们知道,一 一个实际的Actor 系统是由许多个Actor实例组成的-一个复杂的树状结构,父Actor负责子Actor的生命周期并对它们实施必要的监管与控制,Akka项目则更加清晰地描述和定义了与之相关的编程模型。如下所示给出了Akka中的Actor层级与监管的示意图。
从上图中还可以看到,Akka中的每个Actor都有一个Path (路径),对于一个Actor子系统ActorSystem (类似于μJavaActors中的- -个ActorManager,维护-一个Actor 命名空间)来说,顶级根路径是“/”,下面有两个子路径, 分别是user (用户空间)路径与system (系统空间)路径,在前者的路径分支上挂接了我们自己开发的Actor,后者则是Akka本身的系统级的Actor所在的路径。
如下所示是一个典型的Akka Actor 的Path层级的结构示意图,我们看到每个下一层级的Actor的Path全路径的名称都是从根节点出发的完整路径,类似于文件目录结构的设计思路。
而将user空间与system空间分离的做法,又借鉴了Linux内核的思想,将系统级的Actor识别出来,从而针对性地实现了精细化调度及增强系统内核的稳定性与容器能力。
在Akka的Actor Path设计里还融入了URL的思想,使得Akka天然具备了分布式计算的能力。下面是Actor Path的完整定义方式:
akka://<actorsystemname>@ <hostname>: <port>/<actor path>
如果我们要访问位于一个远程机器10.0.0.1. 上的某个Actor,则可以这样引用:
ActorRef actor = context.actorFor ("akka:/ /app@10.0.0.1:2552/user/actorxxxx") ;
在分布式系统中,在通常情况下我们会部署多个Actor实例来响应某个业务请求,Akka 为此提供了基于Router组件派发请求的解决思路。如下所示是一个使用Router的原理图。
首先,我们定义一个Router (其实也是-一个Actor),然后为这个Router设置后端的转发路
由(routee), 客户端在发送消息时,要先获取Router的Path,并将消息发到Router上,最后由
Router将消息转发到后端的某个具体Actor.上。如下所示是来自Akka官网的一段Router配置
代码:
akka. actor . deployment {
/parent/ remoteGroup {
router = round- robin-group
routees .paths = [
"akka. tcp:/ /app@10.0.0.1:2552/user/workers/w1",
"akka. tcp:/ /app@10.0.0.2:2552/user/workers/w1",
"akka. tcp:/ /app@10.0.0.3:2552/user/workers/w1"]
}
}
从上面的介绍中我们看到了Akka的强大能力,那么它是如何运转的呢?下图给出了生动、直观的解释。
首先,用户应用(StudentSimulatorApp) 要通过Actor System这个Akka里最重要的组件来实现具体Actor的创建、引用(通过ActorRef)及消息发送等逻辑。消息投递过程中最重要的组件则是Message Dispatcher,它首先把收到的消息放入队列中,然后驱动派发线程去执行每个Actor的收件动作,收件动作就是把每个Actor自己的消息从队列中转移到自己的MailBox中,最后回调Actor的消息处理接口以完成消息的处理逻辑。如下所示为更详细的Akka运作原理图。
我们知道Message Dispatcher用到了线程池,那么具体用的是什么线程池呢?答案就在下面这段Akka的Message Dispatcher的配置信息里:
my-di spatcher
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
牛Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
牛Parallelism (threads) . . . ceil (available processors ★factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
#Setto1foras
fair as possible.
throughput = 100
}
没错,Akka 用的就是JDK 7提供的新的并发框架一Fork/Join, 如下图所示。
下面这张图给出了Akka测试
java.util.concurrent.ThreadPoolExecutor与ForkJoinPool时的性能对比结果。
ThreadPoolExecutor的性能之所以低于ForkJoinPool很多,主要的-一个原因是:在高并发情况下多线程锁竞争(来自共享的LinkedBlockingQueue)及由此引发的大量上下文切换,会导致系统并发上不去,而ForkJoinPool 的每个工作线程都有自己的任务队列,所以在这种高度竞争的情况下表现非常突出。再看看下面这张ForkJoinPool 的原理图,你可能会理解更深刻。
最后以一个编程案例来说明如何使用Akka库(Java)进行编程,该案例对应的Actor的拓扑图如下图所示。
这个例子为模拟标准的请求应答调用这种常见场景。首先,我们创建两个Actor,分别是SenderActor和ReceiverActor, SenderActor在启动后发送一个Hello消息给ReceiverActor,后者在收到消息后再发送相应的Hello Ack消息给SenderActor , SenderActor在收到应答后关闭自己,同时结束程序。
下面展示完整的Actor代码。
首先是SenderActor的代码:
import akka. actor . UntypedActor;
public class SenderActor extends UntypedActor {
@Override
public void onReceive (Object msg) {
System. out.println("received done ");
if (msg == ReceiverActor . Msg. DONE) {
getContext () .stop(getSelf()) ;
getContext() . system() .shutdown() ;
} else {
unhandled (msg) ;
}
}
}
接着是ReceiverActor的代码:
import akka. actor . UntypedActor;
public class ReceiverActor extends UntypedActor {
public static enum Msg {
GREET, DONE
}
@Override
public void onReceive (Object msg) {
if (msg == Msg . GREET) {
System. out.println("Hello World!");
getSender () . tell (Msg.DONE, getSelf());
} else{
unhandled (msg) ;
}
}
}
import akka. actor . ActorRef;
import akka. actor . ActorSystem;
import akka.actor. Props;
最后是主程序的代码:
public class Starter
public static void main(String[] args) {
// Create an Akka system
ActorSystem system
ActorSystem. create ("HellowSystem") ;
/ / create the sender actor
final ActorRef sender = system. actorOf (new Props (SenderActor.class) ,
"sender") ;
/ / create the receiver actor
final ActorRef greeter = system. actorOf (new Props (ReceiverActor.class) ,
"greeter") ;
greeter. tell (ReceiverActor .Msg. GREET, sender) ;
}
}
更多推荐
所有评论(0)