1.概述
 Akka是一个开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于JVM的应用。
1.1 akka特性
- 易于构建并行和分布式应用 (Simple Concurrency & Distribution) - 比较容易构建分布式应用
- 可靠性(Resilient by Design) - 系统具备自愈能力,在本地/远程都有监护。
- 高性能(High Performance) - 在单机中每秒可发送50000000个消息。内存占用小,1GB内存中可保存2500000个actors。
- 弹性,无中心(Elastic — Decentralized) - 自适应的负责均衡,路由,分区,配置
- 可扩展(Extensible) - 可以使用Akka 扩展包进行扩展。- 1.2 主要模块
- akka-actors - akka的核心,一个用于并发和分发的模型
 
- akka-stream - 一种直观而安全的方式来实现异步、非阻塞的回压流处理。
 
- akka-http - 现代的、快速的、异步的、流的HTTP服务器和客户端。
 
- akka-cluster - 通过在多个节点上分布您的系统来获得弹性和弹性。
 
- akka-sharding - 根据用户的身份,在集群中分配您的参与者。
 
- Distributed Data - 最终一致,高度读取和写入可用,低延迟数据
 
- Akka Persistence - 为参与者的事件包允许他们在重新启动后到达相同的状态。(持久化)
 
- Akka Management - 在云系统上运行Akka系统的扩展(k8s,aws,…)
 
- Alpakka 
- 优势 - 事件驱动模型(Event-driven model) - Actor 通过响应消息来执行工作。Actor 之间的通信是异步的,允许 Actor 发送消息并继续自己的工作,而不是阻塞等待响应。 
- 强隔离原则(Strong isolation principles) - 与 Java 中的常规对象不同,Actor 在调用的方法方面,没有一个公共 API。相反,它的公共 API 是通过 Actor 处理的消息来定义的。这可以防止 Actor 之间共享状态;观察另一个 Actor 状态的唯一方法是向其发送请求状态的消息。 
- 位置透明(Location transparency) - 系统通过工厂方法构造 Actor 并返回对实例的引用。因为位置无关紧要,所以 Actor 实例可以启动、停止、移动和重新启动,以向上和向下扩展以及从意外故障中恢复。 
- 轻量级(Lightweight) - 每个实例只消耗几百个字节,这实际上允许数百万并发 Actor 存在于一个应用程序中。
 
- 劣势 
- Akka的关键要素 - FSM: Actor状态维护
- MailBox:消息队列
- 派发器:线程调度
- 序列化:java,pb
- 网络传输:netty
 
- 部分日志 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 access-app
 enequeue send.msg: Thread[default-remote-dispatcher-15,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue518854215 ActorSelectionMessage({"method":"simple_app","headers":{},"timeSign":1638372980972,"appMessage":"msg_access_to_app","appId":10000,"targetResourceId":"test-target-id"},Vector(user, NodeAvatar),false)
 dequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue518854215 ActorSelectionMessage({"method":"simple_app","headers":{},"timeSign":1638372980972,"appMessage":"msg_access_to_app","appId":10000,"targetResourceId":"test-target-id"},Vector(user, NodeAvatar),false)
 enequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1962688179 ActorSelectionMessage({"method":"simple_app","headers":{},"timeSign":1638372980972,"appMessage":"msg_access_to_app","appId":10000,"targetResourceId":"test-target-id"},Vector(user, NodeAvatar),false)
 dequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1962688179 ActorSelectionMessage({"method":"simple_app","headers":{},"timeSign":1638372980972,"appMessage":"msg_access_to_app","appId":10000,"targetResourceId":"test-target-id"},Vector(user, NodeAvatar),false)
 enequeue msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1722048285 InboundPayload(size = 397 bytes)
 dequeue msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1722048285 InboundPayload(size = 397 bytes)
 enequeue msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1539873324 {"method":"simple_app","headers":{},"timeSign":1638372980972,"appMessage":"msg_app_return_to_access","appId":10000,"targetResourceId":"test-target-id"}
 dequeue msg: Thread[default-dispatcher-28,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1539873324 {"method":"simple_app","headers":{},"timeSign":1638372980972,"appMessage":"msg_app_return_to_access","appId":10000,"targetResourceId":"test-target-id"}
 enequeue send.msg: Thread[default-remote-dispatcher-7,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue518854215 ActorSelectionMessage({"method":"simple_app","headers":{},"timeSign":1638372990978,"appMessage":"msg_access_to_app","appId":10000,"targetResourceId":"test-target-id"},Vector(user, NodeAvatar),false)
 dequeue send.msg: Thread[default-remote-dispatcher-7,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue518854215 ActorSelectionMessage({"method":"simple_app","headers":{},"timeSign":1638372990978,"appMessage":"msg_access_to_app","appId":10000,"targetResourceId":"test-target-id"},Vector(user, NodeAvatar),false)
 enequeue send.msg: Thread[default-remote-dispatcher-7,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1962688179 ActorSelectionMessage({"method":"simple_app","headers":{},"timeSign":1638372990978,"appMessage":"msg_access_to_app","appId":10000,"targetResourceId":"test-target-id"},Vector(user, NodeAvatar),false)
 dequeue send.msg: Thread[default-remote-dispatcher-7,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1962688179 ActorSelectionMessage({"method":"simple_app","headers":{},"timeSign":1638372990978,"appMessage":"msg_access_to_app","appId":10000,"targetResourceId":"test-target-id"},Vector(user, NodeAvatar),false)
 enequeue msg: Thread[default-remote-dispatcher-7,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1722048285 InboundPayload(size = 397 bytes)
 dequeue msg: Thread[default-remote-dispatcher-7,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1722048285 InboundPayload(size = 397 bytes)
 enequeue msg: Thread[default-remote-dispatcher-7,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1859244097 {"method":"simple_app","headers":{},"timeSign":1638372990978,"appMessage":"msg_app_return_to_access","appId":10000,"targetResourceId":"test-target-id"}
 dequeue msg: Thread[.default-dispatcher-28,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1859244097 {"method":"simple_app","headers":{},"timeSign":1638372990978,"appMessage":"msg_app_return_to_access","appId":10000,"targetResourceId":"test-target-id"}
 app-access
 enequeue msg: Thread[default-remote-dispatcher-13,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue2012634163 InboundPayload(size = 462 bytes)
 dequeue msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue2012634163 InboundPayload(size = 462 bytes)
 enequeue msg: Thread[SandBox-akka.actor.default-dispatcher-36,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1334187884 {"method":"simple_app","headers":{},"timeSign":1638372980972,"targetResourceId":"test-target-id","appMessage":"msg_access_to_app","appId":10000}
 dequeue msg: Thread[SandBox-akka.actor.default-dispatcher-34,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1334187884 {"method":"simple_app","headers":{},"timeSign":1638372980972,"targetResourceId":"test-target-id","appMessage":"msg_access_to_app","appId":10000}
 enequeue msg: Thread[SandBox-akka.actor.default-dispatcher-34,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue360110022 {"method":"simple_app","headers":{},"timeSign":1638372980972,"targetResourceId":"test-target-id","appMessage":"msg_access_to_app","appId":10000}
 dequeue msg: Thread[SandBox-akka.actor.default-dispatcher-36,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue360110022 {"method":"simple_app","headers":{},"timeSign":1638372980972,"targetResourceId":"test-target-id","appMessage":"msg_access_to_app","appId":10000}
 enequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1624011765 {"method":"simple_app","headers":{},"timeSign":1638372980972,"targetResourceId":"test-target-id","appMessage":"msg_app_return_to_access","appId":10000}
 dequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1624011765 {"method":"simple_app","headers":{},"timeSign":1638372980972,"targetResourceId":"test-target-id","appMessage":"msg_app_return_to_access","appId":10000}
 enequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue787426676 {"method":"simple_app","headers":{},"timeSign":1638372980972,"targetResourceId":"test-target-id","appMessage":"msg_app_return_to_access","appId":10000}
 dequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue787426676 {"method":"simple_app","headers":{},"timeSign":1638372980972,"targetResourceId":"test-target-id","appMessage":"msg_app_return_to_access","appId":10000}
 enequeue msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue2012634163 InboundPayload(size = 462 bytes)
 dequeue msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue2012634163 InboundPayload(size = 462 bytes)
 enequeue msg: Thread[SandBox-akka.actor.default-dispatcher-34,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue464599832 {"method":"simple_app","headers":{},"timeSign":1638372990978,"targetResourceId":"test-target-id","appMessage":"msg_access_to_app","appId":10000}
 dequeue msg: Thread[SandBox-akka.actor.default-dispatcher-2,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue464599832 {"method":"simple_app","headers":{},"timeSign":1638372990978,"targetResourceId":"test-target-id","appMessage":"msg_access_to_app","appId":10000}
 enequeue msg: Thread[SandBox-akka.actor.default-dispatcher-2,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1322027443 {"method":"simple_app","headers":{},"timeSign":1638372990978,"targetResourceId":"test-target-id","appMessage":"msg_access_to_app","appId":10000}
 dequeue msg: Thread[SandBox-akka.actor.default-dispatcher-34,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1322027443 {"method":"simple_app","headers":{},"timeSign":1638372990978,"targetResourceId":"test-target-id","appMessage":"msg_access_to_app","appId":10000}
 enequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1624011765 {"method":"simple_app","headers":{},"timeSign":1638372990978,"targetResourceId":"test-target-id","appMessage":"msg_app_return_to_access","appId":10000}
 dequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue1624011765 {"method":"simple_app","headers":{},"timeSign":1638372990978,"targetResourceId":"test-target-id","appMessage":"msg_app_return_to_access","appId":10000}
 enequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue787426676 {"method":"simple_app","headers":{},"timeSign":1638372990978,"targetResourceId":"test-target-id","appMessage":"msg_app_return_to_access","appId":10000}
 dequeue send.msg: Thread[default-remote-dispatcher-5,5,main] que:class akka.dispatch.UnboundedMailbox$MessageQueue787426676 {"method":"simple_app","headers":{},"timeSign":1638372990978,"targetResourceId":"test-target-id","appMessage":"msg_app_return_to_access","appId":10000}- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12- //收消息 
 if (msg.message() instanceof IInnerMessage){
 
 }
 //发消息
 if (msg.message() instanceof EndpointManager.Send){
 
 }
 //payload数据
 if (msg.message() instanceof AssociationHandle.InboundPayload){
 }- 2.1 Akka-Actor
- actor类型(path+uid)- (/)根actor
- (/user)用户actor,业务开发使用- system.actorOf(),user根路径
- context.actorOf(),用户actor创建子路径
 
- (system)系统actor
 

- 生命周期- actorOf- 创建actor
 
- preStart- actor对象创建调用,只调用一次
 
- preRestart- 调用postRestart,actor异常触发
 
- stop- 调用postStop,actor异常触发    
 
- 调用postStop,actor异常触发   
 
- actorOf
- 收发消息- receive- 接收消息- 接收队列
- 分发器
 
- 消息应答- 发送方noSender模式- 无法回复,回复会出现死信
 
- 发送方携带Sender模式- getSender()模式:依赖actor上下文,重启积压消息无法送达
- actorSelection(getSender().Path)模式,选择新的uid,重启积压消息可送达
 
 
- 发送方noSender模式
 
- 接收消息
- ask- 处理模型- 异步处理
- 同步应答
 
- 处理流程- CompletableFuture
- AskableActorSelection askAble = new AskableActorSelection(akkaSelection);
- askAble.ask(xxx).onComplete{c.complete(xxx)}
 
 
- 处理模型
- tell- 处理模型- 无业务返回应答,收消息成功Ack
 
- 处理流程- actorSelection(path).tell(xxx)
- getSender().tell(xxx)
 
 
- 处理模型
- forward- 特性- 转发消息
- 携带发送者
 
- 场景- router模式下转发消息
 
 
- 特性
 
- receive
- 调度器(分发器)- Dispatcher:- 一个基于事件的调度程序,它将一组 Actor 绑定到线程池,如果未指定调度器,则使用默认调度器。
- 可共享性:Unlimited
- 邮箱:任意,为每个 Actor 创建一个
- 用例:默认调度器,Bulkheading
- 驱动:java.util.concurrent.ExecutorService。使用fork-join-executor、thread-pool-executor或akka.dispatcher.ExecutorServiceConfigurator的FQCN指定的executor。
 
- PinnedDispatcher:- 这个调度器为每个使用它的 Actor 指定唯一的线程;即每个 Actor 将拥有自己的线程池,池中只有一个线程。
- 可共享性:None
- 邮箱:任意,为每个 Actor 创建一个
- 用例:Bulkheading
- 驱动:任何akka.dispatch.ThreadPoolExecutorConfigurator。默认情况下为thread-pool-executor。
 
- CallingThreadDispatcher:- 此调度器仅在当前调用的线程上运行。这个调度器不创建任何新的线程,但是它可以从不同的线程并发地用于同一个 Actor。有关详细信息和限制,请参阅「CallingThreadDispatcher」。
- 可共享性:Unlimited
- 邮箱:任意,为每个 Actor 创建一个(按需)
- 用例:Testing
- 驱动:调用线程(duh)
 
 
- Dispatcher:
- 容错(错误处理)- 监督策略- OneForOneStrategy- 默认(推荐),父Actor只对出问题的子actor进行处理
 
- AllForOneStrategy- 父Actor对出问题的子actor以及他的所有兄弟节点进行处理
 
- 异常处理- 继续(resume) :Actor 继续处理下一条消息;
- 停止(stop) :停 止 Actor,不再做任何操作;
- 重启(restart) :新建一个 Actor,代替原来的 Actor;
- 向上反映(escalate) :将异常信息传递给下一个监督者。
 
- 默认监督策略- ActorInitializationException将停止失败的子 Actor
- ActorKilledException将停止失败的子 Actor
- DeathPactException将停止失败的子 Actor
- Exception将重新启动失败的子 Actor
- 其他类型的Throwable将向上反映到父级 Actor
- 如果异常一直升级到根守护者,它将以与上面定义的默认策略相同的方式处理它。
 
 
- OneForOneStrategy
 
- 监督策略
- 邮箱- UnboundedMailbox- 默认邮箱
- 底层是一个java.util.concurrent.ConcurrentLinkedQueue
- 阻塞: 否
- 有界: 否
- 配置名称:”unbounded” 或 “akka.dispatch.UnboundedMailbox”
 
- SingleConsumerOnlyUnboundedMailbox- 底层是一个非常高效的多生产者单消费者队列,不能被用于BalancingDispatcher
- 阻塞: 否
- 有界: 否
- 配置名称:”akka.dispatch.SingleConsumerOnlyUnboundedMailbox”
 
- BoundedMailbox- 底层是一个java.util.concurrent.LinkedBlockingQueue
- 阻塞: 是
- 有界: 是
- 配置名称:”bounded” 或 “akka.dispatch.BoundedMailbox”
 
- UnboundedPriorityMailbox- 底层是一个java.util.concurrent.PriorityBlockingQueue
- 阻塞: 是
- 有界: 否
- 配置名称:”akka.dispatch.UnboundedPriorityMailbox”
 
- BoundedPriorityMailbox- 底层是一个 java.util.PriorityBlockingQueue包装为akka.util.BoundedBlockingQueue
- 阻塞: 是
- 有界: 是
- 配置名称:”akka.dispatch.BoundedPriorityMailbox”
 
 
- UnboundedMailbox
- 路由- router也是一种actor 类型- 它路由到来的消息到其他的actors,其他那些actors就叫做routees(被路由对象)
 
- 路由策略- akka.routing.RoundRobinRoutingLogic_ _ 轮询
- **akka.routing._RandomRoutingLogic _**随机
- akka.routing._SmallestMailboxRoutingLogic __ _空闲
- akka.routing.BroadcastRoutingLogic 广播
- akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
- akka.routing._**TailChoppingRoutingLogic ** _尾部断续
- akka.routing.ConsistentHashingRoutingLogic_ _一致性哈希
 
 
- router也是一种actor 类型

- FSM(状态机) - State(S) x Event(E) -> Actions (A), State(S’)
- 如果我们处于状态S,并且事件E发生,那么我们应该执行操作A,并向状态S’过渡。
 
- 持久化(Persistence) 
- 状态 - 空闲(Idle) - 无通信关联
 
- 活跃(Active) - 发送消息或者入站连接成功
 
- 被守护(Gated) - 远程链路通信失败(akka.remote.retry-gate-closed-for 参数控制时间),被守护状态可转换为空闲状态
 
- 被隔离(Quarantined) - 通信失败无法恢复时会转换为(Quarantined)状态  
 
 
- 序列化 - Akka 提供了内置的支持序列化以及的扩展, 你可以使用内置的序列化功能,也可以扩展 - 配置- akka.actor.serializers.java=”akka.serialization.JavaSerializer”
- 
 
 
- 配置
- 内置序列化 - akka.serialization.JavaSerializer
- akka.remote.serialization.ProtobufSerializer
 
- 外部扩展 - 自定义序列化
- io.altoo.akka.serialization.kryo.KryoSerializer
 
 
- 网络 - netty- tcp
- udp
 
 
- netty
3.主要流程
3.1 发送消息
- 向远端发送消息分为两类:getSender().tell; actorSelection.tell();- getSender().tell,复用原有id,重启后原有消息会丢失,接受放会转变成deadletter
- actorSelection.tell(),基于路径和地址选用消息,id为有效id发送成功不会造成消息丢失
 

3.2 接受消息
- 业务触发

- 网络传递

4.注意事项
- 异常处理- 业务侧捕获异常,异常产生可能会造成actor重启或者关闭,期间出现消息丢失
 
- 发送消息- 确保不丢消息的情况下- 同步:可以采用ask同步,加重试
- 异步:可采用ask异步,加重试
 
- 提高吞吐量- 采用tell模式
 
- 携带发送者转发- forward
 
- 消息回复- 建议用actorSelection.tell回复,可以保证重启后消息不丢失。
 
 
- 确保不丢消息的情况下
- 网络- 关闭链接复用,在重启的特定的情况下会存在链接与actor关联失败情况
 
- 死信处理- 监控系统消息发送或者接受失败,可观察现有信息送达状态,降低无效的资源消耗及错误逻辑,发现潜在问题
- 解决系统中非重启出现的deadletter
 
- 线程处理- actor并发处理,减少公共成员变量访问
 
- Actor状态管理
https://zhuanlan.zhihu.com/p/38662453
https://doc.akka.io/docs/akka/current/remoting.html#lifecycle-and-failure-recovery-model
https://www.cnblogs.com/tankaixiong/p/11225259.html
http://doc.yonyoucloud.com/doc/akka-doc-cn/2.3.6/scala/book/chapter1/01_what_is_akka.html
 
        