def actorOf(props: Props, name: String): ActorRef = if (guardianProps.isEmpty) guardian.underlying.attachChild(props, name, systemService = false) elsethrownewUnsupportedOperationException( s"cannot create top-level actor [$name] from the outside on ActorSystem with custom user guardian")
/** * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node. * * INTERNAL API */ private[akka] classLocalActorRefprivate[akka] ( _system: ActorSystemImpl, _props: Props, _dispatcher: MessageDispatcher, _mailboxType: MailboxType, _supervisor: InternalActorRef, override val path: ActorPath) extendsActorRefWithCell with LocalRef
/** * The one and only default dispatcher. */ def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
1 2 3 4 5
/** * The id of the default dispatcher, also the full key of the * configuration of the default dispatcher. */ finalvalDefaultDispatcherId="akka.actor.default-dispatcher"
default-dispatcher { # Must be one of the following # Dispatcher, PinnedDispatcher, or a FQCN to a classinheriting # MessageDispatcherConfigurator with a public constructor with # both com.typesafe.config.Config parameter and # akka.dispatch.DispatcherPrerequisites parameters. # PinnedDispatcher must be used together with executor=thread-pool-executor. type = "Dispatcher" # Which kind of ExecutorService to use forthis dispatcher # Valid options: # - "default-executor"requires a "default-executor" section # - "fork-join-executor"requires a "fork-join-executor" section # - "thread-pool-executor"requires a "thread-pool-executor" section # - "affinity-pool-executor"requires an "affinity-pool-executor" section # - A FQCN of a classextendingExecutorServiceConfigurator executor="default-executor" # This will be used if you have set "executor = "default-executor"". # If an ActorSystem is created with a given ExecutionContext, this # ExecutionContext will be used as the default executor for all # dispatchers in the ActorSystem configured with # executor = "default-executor". Note that "default-executor" # is the default value for executor, and therefore used if not # specified otherwise. If no ExecutionContext is given, # the executor configured in "fallback" will be used. default-executor { fallback = "fork-join-executor" } }
abstractclassMessageDispatcherConfigurator(_config: Config, val prerequisites: DispatcherPrerequisites) { val config: Config = newCachingConfig(_config) /** * Returns an instance of MessageDispatcher given the configuration. * Depending on the needs the implementation may return a new instance for * each invocation or return the same instance every time. */ def dispatcher(): MessageDispatcher def configureExecutor(): ExecutorServiceConfigurator = { def configurator(executor: String): ExecutorServiceConfigurator = executor match { casenull | "" | "fork-join-executor" ⇒ newForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case"thread-pool-executor" ⇒ newThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case"affinity-pool-executor" ⇒ newAffinityPoolConfigurator(config.getConfig("affinity-pool-executor"), prerequisites) case fqcn ⇒ valargs= List( classOf[Config] → config, classOf[DispatcherPrerequisites] → prerequisites) prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({ case exception ⇒ thrownewIllegalArgumentException( ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s], make sure it has an accessible constructor with a [%s,%s] signature""") .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) }).get } config.getString("executor") match { case"default-executor" ⇒ newDefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback"))) case other ⇒ configurator(other) } } }
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = { dispatcherConfigurators.get(id) match { casenull ⇒ // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup. // That shouldn't happen often and in case it does the actual ExecutorService isn't // created until used, i.e. cheap. valnewConfigurator= if (cachingConfig.hasPath(id)) configuratorFrom(config(id)) elsethrownewConfigurationException(s"Dispatcher [$id] not configured") dispatcherConfigurators.putIfAbsent(id, newConfigurator) match { casenull ⇒ newConfigurator case existing ⇒ existing } case existing ⇒ existing } }
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = { if (!cfg.hasPath("id")) thrownewConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render) cfg.getString("type") match { case"Dispatcher" ⇒ newDispatcherConfigurator(cfg, prerequisites) case"BalancingDispatcher" ⇒ // FIXME remove this case in 2.4 thrownewIllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " + "During a migration period you can still use BalancingDispatcher by specifying the full class name: " + classOf[BalancingDispatcherConfigurator].getName) case"PinnedDispatcher" ⇒ newPinnedDispatcherConfigurator(cfg, prerequisites) case fqn ⇒ valargs= List(classOf[Config] → cfg, classOf[DispatcherPrerequisites] → prerequisites) prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({ case exception ⇒ thrownewConfigurationException( ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " + "make sure it has constructor with [com.typesafe.config.Config] and " + "[akka.dispatch.DispatcherPrerequisites] parameters") .format(fqn, cfg.getString("id")), exception) }).get } }
/** * Configurator for creating [[akka.dispatch.Dispatcher]]. * Returns the same dispatcher instance for each invocation * of the `dispatcher()` method. */ classDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extendsMessageDispatcherConfigurator(config, prerequisites) { privatevalinstance=newDispatcher( this, config.getString("id"), config.getInt("throughput"), config.getNanosDuration("throughput-deadline-time"), configureExecutor(), config.getMillisDuration("shutdown-timeout")) /** * Returns the same dispatcher instance for each invocation */ override def dispatcher(): MessageDispatcher = instance }
private def lookupConfigurator(id: String): MailboxType = { mailboxTypeConfigurators.get(id) match { casenull ⇒ // It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup. valnewConfigurator= id match { // TODO RK remove these two for Akka 2.3 case"unbounded" ⇒ UnboundedMailbox() case"bounded" ⇒ newBoundedMailbox(settings, config(id)) case _ ⇒ if (!settings.config.hasPath(id)) thrownewConfigurationException(s"Mailbox Type [${id}] not configured") valconf= config(id) valmailboxType= conf.getString("mailbox-type") match { case"" ⇒ thrownewConfigurationException(s"The setting mailbox-type, defined in [$id] is empty") case fqcn ⇒ valargs= List(classOf[ActorSystem.Settings] → settings, classOf[Config] → conf) dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ case exception ⇒ thrownewIllegalArgumentException( s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" + " constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters", exception) }).get } if (!mailboxNonZeroPushTimeoutWarningIssued) { mailboxType match { case m: ProducesPushTimeoutSemanticsMailbox if m.pushTimeOut.toNanos > 0L ⇒ warn(s"Configured potentially-blocking mailbox [$id] configured with non-zero pushTimeOut (${m.pushTimeOut}), " + s"which can lead to blocking behavior when sending messages to this mailbox. " + s"Avoid this by setting `$id.mailbox-push-timeout-time` to `0`.") mailboxNonZeroPushTimeoutWarningIssued = true case _ ⇒ // good; nothing to see here, move along, sir. } } mailboxType } mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match { casenull ⇒ newConfigurator case existing ⇒ existing } case existing ⇒ existing } }
default-mailbox { # FQCN of the MailboxType. The Class of the FQCN must have a public # constructor with # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. mailbox-type = "akka.dispatch.UnboundedMailbox" # If the mailbox is bounded then it uses this setting to determine its # capacity. The provided value must be positive. # NOTICE: # Up to version 2.1 the mailbox type was determined based on this setting; # this is no longer the case, the type must explicitly be a bounded mailbox. mailbox-capacity = 1000 # If the mailbox is bounded then this is the timeout for enqueueing # in case the mailbox is full. Negative values signify infinite # timeout, which should be avoided as it bears the risk of dead-lock. mailbox-push-timeout-time = 10s # For Actor with Stash: The default capacity of the stash. # If negative(or zero) then an unbounded stash is used(default) # If positive then a bounded stash is used and the capacity is set using # the property stash-capacity = -1 }
/** * MailboxType is a factory to create MessageQueues for an optionally * provided ActorContext. * * <b>Possibly Important Notice</b> * * When implementing a custom mailbox type, be aware that there is special * semantics attached to `system.actorOf()` in that sending to the returned * ActorRef may—for a short period of time—enqueue the messages first in a * dummy queue. Top-level actors are created in two steps, and only after the * guardian actor has performed that second step will all previously sent * messages be transferred from the dummy queue into the real mailbox. */ trait MailboxType { def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue } trait ProducesMessageQueue[T <: MessageQueue] /** * UnboundedMailbox is the default unbounded MailboxType used by Akka Actors. */ finalcaseclassUnboundedMailbox() extendsMailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { def this(settings: ActorSystem.Settings, config: Config) = this() final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = newUnboundedMailbox.MessageQueue }
/** * Initialize: make a dummy cell which holds just a mailbox, then tell our * supervisor that we exist so that he can create the real Cell in * handleSupervise(). * * Call twice on your own peril! * * This is protected so that others can have different initialization. */ def initialize(async: Boolean): this.type = underlying match { casenull ⇒ swapCell(newUnstartedCell(system, this, props, supervisor)) swapLookup(underlying) supervisor.sendSystemMessage(Supervise(this, async)) if (!async) point(false) this case other ⇒ thrownewIllegalStateException("initialize called more than once!") } /** * This method is supposed to be called by the supervisor in handleSupervise() * to replace the UnstartedCell with the real one. It assumes no concurrent * modification of the `underlying` field, though it is safe to send messages * at any time. */ def point(catchFailures: Boolean): this.type