val system = ActorSystem("firstActorSystem",ConfigFactory.load()) val helloActor = system.actorOf(Props(new HelloActor),"HelloActor") helloActor ! "Hello"
/* * H E R E B E D R A G O N S ! * * There are two main functions of a Cell: message queueing and child lookup. * When switching out the UnstartedCell for its real replacement, the former * must be switched after all messages have been drained from the temporary * queue into the real mailbox, while the latter must be switched before * processing the very first message (i.e. before Cell.start()). Hence there * are two refs here, one for each function, and they are switched just so. */ @volatile private var _cellDoNotCallMeDirectly: Cell = _ @volatile private var _lookupDoNotCallMeDirectly: Cell = _ def underlying: Cell = Unsafe.instance.getObjectVolatile(this, cellOffset).asInstanceOf[Cell] def lookup = Unsafe.instance.getObjectVolatile(this, lookupOffset).asInstanceOf[Cell] @tailrec final def swapCell(next: Cell): Cell = { val old = underlying if (Unsafe.instance.compareAndSwapObject(this, cellOffset, old, next)) old else swapCell(next) } @tailrec final def swapLookup(next: Cell): Cell = { val old = lookup if (Unsafe.instance.compareAndSwapObject(this, lookupOffset, old, next)) old else swapLookup(next) }
def point(catchFailures: Boolean): this.type = underlying match { case u: UnstartedCell ⇒ val cell = try newCell(u) catch { case NonFatal(ex) if catchFailures ⇒ val safeDispatcher = system.dispatchers.defaultGlobalDispatcher new ActorCell(system, this, props, safeDispatcher, supervisor).initWithFailure(ex) } /* * The problem here was that if the real actor (which will start running * at cell.start()) creates children in its constructor, then this may * happen before the swapCell in u.replaceWith, meaning that those * children cannot be looked up immediately, e.g. if they shall become * routees. */ swapLookup(cell) cell.start() u.replaceWith(cell) this case null ⇒ throw new IllegalStateException("underlying cell is null") case _ ⇒ this // this happens routinely for things which were created async=false }
def replaceWith(cell: Cell): Unit = locked { try { def drainSysmsgQueue(): Unit = { // using while in case a sys msg enqueues another sys msg while (sysmsgQueue.nonEmpty) { var sysQ = sysmsgQueue.reverse sysmsgQueue = SystemMessageList.LNil while (sysQ.nonEmpty) { val msg = sysQ.head sysQ = sysQ.tail msg.unlink() cell.sendSystemMessage(msg) } } } drainSysmsgQueue() while (!queue.isEmpty) { cell.sendMessage(queue.poll()) // drain sysmsgQueue in case a msg enqueues a sys msg drainSysmsgQueue() } } finally { self.swapCell(cell) } }
/** * This is called by activate() to obtain the cell which is to replace the * unstarted cell. The cell must be fully functional. */ def newCell(old: UnstartedCell): Cell = new ActorCell(system, this, props, dispatcher, supervisor).init(sendSupervise = false, mailboxType)
/** * Initialize this cell, i.e. set up mailboxes and supervision. The UID must be * reasonably different from the previous UID of a possible actor with the same path, * which can be achieved by using ThreadLocalRandom.current.nextInt(). */ final def init(sendSupervise: Boolean, mailboxType: MailboxType): this.type = { /* * Create the mailbox and enqueue the Create() message to ensure that * this is processed before anything else. */ val mbox = dispatcher.createMailbox(this, mailboxType) /* * The mailboxType was calculated taking into account what the MailboxType * has promised to produce. If that was more than the default, then we need * to reverify here because the dispatcher may well have screwed it up. */ // we need to delay the failure to the point of actor creation so we can handle // it properly in the normal way val actorClass = props.actorClass val createMessage = mailboxType match { case _: ProducesMessageQueue[_] if system.mailboxes.hasRequiredType(actorClass) ⇒ val req = system.mailboxes.getRequiredType(actorClass) if (req isInstance mbox.messageQueue) Create(None) else { val gotType = if (mbox.messageQueue == null) "null" else mbox.messageQueue.getClass.getName Create(Some(ActorInitializationException( self, s"Actor [$self] requires mailbox type [$req] got [$gotType]"))) } case _ ⇒ Create(None) } swapMailbox(mbox) mailbox.setActor(this) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ mailbox.systemEnqueue(self, createMessage) if (sendSupervise) { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false)) } this }
/** * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation, * but can't be exposed to user defined mailbox subclasses. * * INTERNAL API */ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable
override final def run(): Unit = { try { if (!isClosed) { //Volatile read, needed here processAllSystemMessages() //First, deal with any system messages processMailbox() //Then deal with messages } } finally { setAsIdle() //Volatile write, needed here dispatcher.registerForExecution(this, false, false) } }
/** * Process the messages in the mailbox */ @tailrec private final def processMailbox( left: Int = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit = if (shouldProcessMessage) { val next = dequeue() if (next ne null) { if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next if (Thread.interrupted()) throw new InterruptedException("Interrupted while processing actor messages") processAllSystemMessages() if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) processMailbox(left - 1, deadlineNs) } }