Actors API¶
For an overview of pulsar Actor
check out the design documentation.
High Level Functions¶
spawn¶
-
pulsar.async.actor.spawn(**kwargs)[source]¶ Spawn a new
Actorand return anFuture.Parameter kwargs
These optional parameters are:
aidthe actor idnamethe actor name- actor hooks such as
start,stoppingandperiodic_task
Returns: an Future.A typical usage:
>>> def do_something(actor): ... >>> a = spawn(start=do_something, ...) >>> a.aid 'ba42b02b' >>> a.called True >>> p = a.result() >>> p.address ('127.0.0.1', 46691)
send¶
-
pulsar.async.actor.send(target, action, *args, **params)[source]¶ Send a message to
targetThe message is to perform a given
action. The actor sending the message is obtained via theget_actor()function.Parameters: - target – the
Actorid or anActorProxyor name of the target actor which will receive the message. - action – the remote command
to perform in the
targetActor. - args – positional arguments to pass to the
remote command
action. - params – dictionary of parameters to pass to
remote command
action.
Returns: an
Futureif the action acknowledge the caller or None.Typical example:
>>> r = send(p,'ping') >>> r.result() 'pong'
- target – the
get_actor¶
Actor¶
At the core of the library we have the Actor class which defines
the primitive of pulsar concurrent framework. Actor’s instances communicate
with each other via messages in a share-nothing architecture.
-
class
pulsar.async.actor.Actor(impl)[source]¶ The base class for parallel execution in pulsar.
In computer science, the Actor model is a mathematical model of concurrent computation that treats actors as the universal primitives of computation. In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
The current implementation allows for actors to perform specific tasks such as listening to a socket, acting as http server, consuming a task queue and so forth.
To spawn a new actor:
>>> from pulsar import spawn >>> a = spawn() >>> a.is_alive() True
Here
ais actually a reference to the remote actor, it is anActorProxy.ATTRIBUTES
-
impl¶ The
Concurrencyimplementation for thisActor.
-
_loop¶ An event loop which listen for input/output events on sockets or socket-like objects. It is the driver of the
Actor. If the_loopstops, theActorstops running and goes out of scope.
-
mailbox¶ Used to send and receive actor messages.
-
address¶ The socket address for this
Actor.mailbox.
-
proxy¶ Instance of a
ActorProxyholding a reference to thisActor. The proxy is a lightweight representation of the actor which can be shared across different processes (i.e. it is picklable).
-
state¶ The actor numeric state.
-
extra¶ A dictionary which can be populated with extra parameters useful for other actors. This dictionary is included in the dictionary returned by the
info()method. Check the info command for how to obtain information about an actor.
-
info_state¶ Current state description string. One of
initial,running,stopping,closedandterminated.
-
next_periodic_task¶ The
asyncio.Handlefor the next actor periodic task.
-
monitors¶ Dictionary of monitors or None
-
managed_actors¶ Dictionary of managed actors or None
-
registered¶ Dictionary of registered actors or None
-
start(exit=True)[source]¶ Called after forking to start the actor’s life.
This is where logging is configured, the
mailboxis registered and the_loopis initialised and started. Calling this method more than once does nothing.
-
send(target, action, *args, **kwargs)[source]¶ Send a message to
targetto performactionwith given positionalargsand key-valuedkwargs. Returns a coroutine or a Future.
-
stop(exc=None, exit_code=None)[source]¶ Gracefully stop the
Actor.Implemented by the
Concurrency.stop()method of theimplattribute.
-
actorparams()[source]¶ Returns a dictionary of parameters for spawning actors.
The dictionary is passed to the spawn method when creating new actors. Fire the on_params actor hook.
-
is_running()[source]¶ Trueif actor is running, that is when thestateis equal to ACTOR_STATES.RUN and the loop is running.
-
started()[source]¶ Trueif actor has started.It does not necessarily mean it is running. Its state is greater or equal ACTOR_STATES.RUN.
-
closed()[source]¶ Trueif actor has exited in an clean fashion.Its
stateis ACTOR_STATES.CLOSE.
-
stopped()[source]¶ Trueif actor has exited.Its
stateis greater or equal to ACTOR_STATES.CLOSE.
-
info()[source]¶ Return a nested dictionary of information related to the actor status and performance. The dictionary contains the following entries:
actora dictionary containing information regarding the type of actor and its status.eventsa dictionary of information about the event loop running the actor.extratheextraattribute (you can use it to add stuff).systemsystem info.
This method is invoked when you run the info command from another actor.
-
Actor Internals¶
ActorProxy¶
-
class
pulsar.async.proxy.ActorProxy(impl)[source]¶ A proxy for a remote
Actor.This is a lightweight class which delegates function calls to the underlying remote object.
It is picklable and therefore can be send from actor to actor using actor message passing.
For example, lets say we have a proxy
a, to send a message to it:from pulsar import send send(a, 'echo', 'hello there!')
will send the command
echoto actorawith parameter"hello there!".-
address¶ the socket address of the underlying
Actor.mailbox.
-
ActorProxyMonitor¶
-
class
pulsar.async.proxy.ActorProxyMonitor(impl)[source]¶ A specialised
ActorProxyclass.It contains additional information about the remote underlying
Actor. Instances of this class serialise intoActorProxy.The
ActorProxyMonitoris special since it lives in theArbiterdomain and it is used by theArbiter(or aMonitor) to monitor the state of the spawned actor.-
impl¶ The
Concurrencyinstance for the remoteActor. This dictionary is constantly updated by the remote actor by sending the info message.
-
mailbox¶ This is the connection with the remote actor. It is available once the actor handshake between the actor and the monitor has completed. The
mailboxis a server-sideMailboxProtocolinstance and it is used by thesend()function to send messages to the remote actor.
-
notified¶ Last time this
ActorProxyMonitorwas notified by the remote actor.
-
proxy¶ The
ActorProxyfor this monitor.
-
Messages¶
Actor communicate with each other via the send() function
which uses the via Actor.mailbox attribute of the actor in the
current context. When an actor communicate with
another remote actor it does so by sending an action to it
with positional and/or key-valued arguments. For example:
send(target, 'ping')
will send the ping action to target from the actor in the current context of execution. The above is equivalent to:
get_actor().send(target, 'ping')
Each action is implemented via the command() decorator implemented
in the pulsar.async.commands module.
A list of standard commands
is available in the design documentation.
Concurrency¶
The Concurrency class implements the behaviour of an Actor
and therefore allows for decoupling between the Actor abstraction
and its implementation (bridge pattern).
Base Concurrency¶
-
class
pulsar.async.concurrency.Concurrency[source]¶ Actor
Concurrency.Responsible for the actual spawning of actors according to a concurrency implementation. Instances are picklable and are shared between the
Actorand itsActorProxyMonitor. This is an abstract class, derived classes must implement thestartmethod.Parameters: - concurrency – string indicating the concurrency implementation.
Valid choices are
monitor,process,thread. - actor_class –
Actoror one of its subclasses. - timeout – timeout in seconds for the actor.
- kwargs – additional key-valued arguments to be passed to the actor constructor.
-
actor_class¶ alias of
Actor
-
hand_shake(actor)[source]¶ Perform the hand shake for
actorThe hand shake occurs when the
actoris in starting state. It performs the following actions:- set the
actoras the actor of the current thread - bind two additional callbacks to the
startevent - fire the
startevent
If the hand shake is successful, the actor will eventually results in a running state.
- set the
-
periodic_task(actor, **kw)[source]¶ Implement the actor period task.
This is an internal method called periodically by the
Actor._loopto ping the actor monitor. If successful return aFuturecalled back with the acknowledgement from the monitor.
- concurrency – string indicating the concurrency implementation.
Valid choices are
Monitor Concurrency¶
-
class
pulsar.async.concurrency.MonitorConcurrency[source]¶ Concurrencyclass for aMonitor.Monitors live in the main thread of the master process and therefore do not require to be spawned.
-
periodic_task(monitor, **kw)[source]¶ Override the
Concurrency.periodic_task()to implement theMonitorperiodic task.
-
Arbiter Concurrency¶
-
class
pulsar.async.concurrency.ArbiterConcurrency[source]¶ Concurrency implementation for the
arbiter-
add_monitor(actor, monitor_name, **params)[source]¶ Add a new
monitor.Parameters: - monitor_class – a
Monitorclass. - monitor_name – a unique name for the monitor.
- kwargs – dictionary of key-valued parameters for the monitor.
Returns: the
Monitoradded.- monitor_class – a
-
create_mailbox(actor, loop)[source]¶ Override
Concurrency.create_mailbox()to create the mailbox server.
-
periodic_task(actor, **kw)[source]¶ Override the
Concurrency.periodic_task()to implement theArbiterperiodic task.
-
Constants¶
Constants used throughout pulsar.
-
pulsar.async.consts.ACTOR_STATES= {'RUN': 3, 'TERMINATE': 6, 'DESCRIPTION': {0: 'initial', 1: 'inactive', 2: 'starting', 3: 'running', 4: 'stopping', 5: 'closed', 6: 'terminated'}, 'INITIAL': 0, 'CLOSE': 5, 'STOPPING': 4, 'INACTIVE': 1, 'STARTING': 2}¶ Actor state constants are access via:
from pulsar import ACTOR_STATES
They are:
ACTOR_STATES.INITIAL = 0when an actor is just created, before thepulsar.Actor.startmethod is called.ACTOR_STATES.STARTING = 2whenpulsar.Actor.startmethod is called.ACTOR_STATES.RUN = 3whenpulsar.Actor._loopis up and running.ACTOR_STATES.STOPPING = 4whenpulsar.Actor.stophas been called for the first time and the actor is running.
-
pulsar.async.consts.ACTOR_ACTION_TIMEOUT= 5¶ Important constant used by
pulsar.Monitorto kill actors which don’t respond to thestopcommand.
-
pulsar.async.consts.MONITOR_TASK_PERIOD= 1¶ Interval for
pulsar.Monitorandpulsar.Arbiterperiodic task.