Akka-Remote-心跳保活

1.ProtocolStateActor

ProtocolStateActor通过状态机维持akka链路状态,

1.1 状态维持

image.png

image.png

  • systemEnqueue

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    if (Mailbox.debug) println(receiver + " having enqueued " + message)
    val currentList = systemQueueGet
    if (currentList.head == NoMessage) {
    if (actor ne null) actor.dispatcher.mailboxes.deadLetterMailbox.systemEnqueue(receiver, message)
    } else {
    if (!systemQueuePut(currentList, message :: currentList)) {
    message.unlink()
    systemEnqueue(receiver, message)
    }
    }
  • processMailbox

1
2
3
4
5
6
7
8
9
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
}