/** * Construct an [[akka.actor.ActorSelection]] from the given path, which is * parsed for wildcards (these are replaced by regular expressions * internally). No attempt is made to verify the existence of any part of * the supplied path, it is recommended to send a message and gather the * replies in order to resolve the matching set of actors. */ def actorSelection(path: String): ActorSelection = path match { caseRelativeActorPath(elems) ⇒ if (elems.isEmpty) ActorSelection(provider.deadLetters, "") elseif (elems.head.isEmpty) ActorSelection(provider.rootGuardian, elems.tail) else ActorSelection(lookupRoot, elems) caseActorPathExtractor(address, elems) ⇒ ActorSelection(provider.rootGuardianAt(address), elems) case _ ⇒ ActorSelection(provider.deadLetters, "") }
/** * Given an ActorPath it returns the Address and the path elements if the path is well-formed */ object ActorPathExtractor extendsPathUtils { def unapply(addr: String): Option[(Address, immutable.Iterable[String])] = try { valuri=newURI(addr) uri.getRawPath match { casenull ⇒ None case path ⇒ AddressFromURIString.unapply(uri).map((_, split(path, uri.getRawFragment).drop(1))) } } catch { case _: URISyntaxException ⇒ None } }
/** * Construct an ActorSelection from the given string representing a path * relative to the given target. This operation has to create all the * matching magic, so it is preferable to cache its result if the * intention is to send messages frequently. */ def apply(anchorRef: ActorRef, elements: Iterable[String]): ActorSelection = { val compiled: immutable.IndexedSeq[SelectionPathElement] = elements.collect({ case x if !x.isEmpty ⇒ if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) SelectChildPattern(x) elseif (x == "..") SelectParent else SelectChildName(x) })(scala.collection.breakOut) newActorSelection with ScalaActorSelection { override valanchor= anchorRef override valpath= compiled } }
case s @ Send(message, senderOption, recipientRef, _) ⇒ valrecipientAddress= recipientRef.path.address def createAndRegisterWritingEndpoint(): ActorRef = { endpoints.registerWritableEndpoint( recipientAddress, uid = None, createEndpoint( recipientAddress, recipientRef.localAddressToUse, transportMapping(recipientRef.localAddressToUse), settings, handleOption = None, writing = true)) } endpoints.writableEndpointWithPolicyFor(recipientAddress) match { caseSome(Pass(endpoint, _)) ⇒ endpoint ! s caseSome(Gated(timeOfRelease)) ⇒ if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s else extendedSystem.deadLetters ! s caseSome(Quarantined(uid, _)) ⇒ // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have // the Quarantined tombstone and we know what UID we don't want to accept, so use it. createAndRegisterWritingEndpoint() ! s case None ⇒ createAndRegisterWritingEndpoint() ! s }
def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { caseSome(Pass(e, _)) ⇒ thrownewIllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]") case _ ⇒ // note that this overwrites Quarantine marker, // but that is ok since we keep the quarantined uid in addressToRefuseUid addressToWritable += address → Pass(endpoint, uid) writableToAddress += endpoint → address endpoint }
private def handleSend(send: Send): Unit = if (send.message.isInstanceOf[SystemMessage]) { valsequencedSend= send.copy(seqOpt = Some(nextSeq())) tryBuffer(sequencedSend) // If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it. // GotUid will kick resendAll() causing the messages to be properly written. // Flow control by not sending more when we already have many outstanding. if (uidConfirmed && resendBuffer.nonAcked.size <= settings.SysResendLimit) writer ! sequencedSend } else writer ! send
val writing: Receive = { case s: Send ⇒ if (!writeSend(s)) { enqueueInBuffer(s) scheduleBackoffTimer() context.become(buffering) } // We are in Writing state, so buffer is empty, safe to stop here case FlushAndStop ⇒ flushAndStop() case AckIdleCheckTimer if ackDeadline.isOverdue() ⇒ trySendPureAck() }
/** * A message all Actors will understand, that when processed will reply with * [[akka.actor.ActorIdentity]] containing the `ActorRef`. The `messageId` * is returned in the `ActorIdentity` message as `correlationId`. */ @SerialVersionUID(1L) finalcaseclassIdentify(messageId: Any) extendsAutoReceivedMessage with NotInfluenceReceiveTimeout