Actors API¶
For an overview of pulsar Actor
check out the design documentation.
High Level Functions¶
spawn¶
-
pulsar.async.actor.
spawn
(**kwargs)[source]¶ Spawn a new
Actor
and return anFuture
.Parameter kwargs
These optional parameters are:
aid
the actor idname
the actor name- actor hooks such as
start
,stopping
andperiodic_task
Returns: an Future
.A typical usage:
>>> def do_something(actor): ... >>> a = spawn(start=do_something, ...) >>> a.aid 'ba42b02b' >>> a.called True >>> p = a.result() >>> p.address ('127.0.0.1', 46691)
send¶
-
pulsar.async.actor.
send
(target, action, *args, **params)[source]¶ Send a message to
target
The message is to perform a given
action
. The actor sending the message is obtained via theget_actor()
function.Parameters: - target – the
Actor
id or anActorProxy
or name of the target actor which will receive the message. - action – the remote command
to perform in the
target
Actor
. - args – positional arguments to pass to the
remote command
action
. - params – dictionary of parameters to pass to
remote command
action
.
Returns: an
Future
if the action acknowledge the caller or None.Typical example:
>>> r = send(p,'ping') >>> r.result() 'pong'
- target – the
get_actor¶
Actor¶
At the core of the library we have the Actor
class which defines
the primitive of pulsar concurrent framework. Actor’s instances communicate
with each other via messages in a share-nothing architecture.
-
class
pulsar.async.actor.
Actor
(impl)[source]¶ The base class for parallel execution in pulsar.
In computer science, the Actor model is a mathematical model of concurrent computation that treats actors as the universal primitives of computation. In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
The current implementation allows for actors to perform specific tasks such as listening to a socket, acting as http server, consuming a task queue and so forth.
To spawn a new actor:
>>> from pulsar import spawn >>> a = spawn() >>> a.is_alive() True
Here
a
is actually a reference to the remote actor, it is anActorProxy
.ATTRIBUTES
-
impl
¶ The
Concurrency
implementation for thisActor
.
-
_loop
¶ An event loop which listen for input/output events on sockets or socket-like objects. It is the driver of the
Actor
. If the_loop
stops, theActor
stops running and goes out of scope.
-
mailbox
¶ Used to send and receive actor messages.
-
address
¶ The socket address for this
Actor.mailbox
.
-
proxy
¶ Instance of a
ActorProxy
holding a reference to thisActor
. The proxy is a lightweight representation of the actor which can be shared across different processes (i.e. it is picklable).
-
state
¶ The actor numeric state.
-
extra
¶ A dictionary which can be populated with extra parameters useful for other actors. This dictionary is included in the dictionary returned by the
info()
method. Check the info command for how to obtain information about an actor.
-
info_state
¶ Current state description string. One of
initial
,running
,stopping
,closed
andterminated
.
-
next_periodic_task
¶ The
asyncio.Handle
for the next actor periodic task.
-
monitors
¶ Dictionary of monitors or None
-
managed_actors
¶ Dictionary of managed actors or None
-
registered
¶ Dictionary of registered actors or None
-
start
(exit=True)[source]¶ Called after forking to start the actor’s life.
This is where logging is configured, the
mailbox
is registered and the_loop
is initialised and started. Calling this method more than once does nothing.
-
send
(target, action, *args, **kwargs)[source]¶ Send a message to
target
to performaction
with given positionalargs
and key-valuedkwargs
. Returns a coroutine or a Future.
-
stop
(exc=None, exit_code=None)[source]¶ Gracefully stop the
Actor
.Implemented by the
Concurrency.stop()
method of theimpl
attribute.
-
actorparams
()[source]¶ Returns a dictionary of parameters for spawning actors.
The dictionary is passed to the spawn method when creating new actors. Fire the on_params actor hook.
-
is_running
()[source]¶ True
if actor is running, that is when thestate
is equal to ACTOR_STATES.RUN and the loop is running.
-
started
()[source]¶ True
if actor has started.It does not necessarily mean it is running. Its state is greater or equal ACTOR_STATES.RUN.
-
closed
()[source]¶ True
if actor has exited in an clean fashion.Its
state
is ACTOR_STATES.CLOSE.
-
stopped
()[source]¶ True
if actor has exited.Its
state
is greater or equal to ACTOR_STATES.CLOSE.
-
info
()[source]¶ Return a nested dictionary of information related to the actor status and performance. The dictionary contains the following entries:
actor
a dictionary containing information regarding the type of actor and its status.events
a dictionary of information about the event loop running the actor.extra
theextra
attribute (you can use it to add stuff).system
system info.
This method is invoked when you run the info command from another actor.
-
Actor Internals¶
ActorProxy¶
-
class
pulsar.async.proxy.
ActorProxy
(impl)[source]¶ A proxy for a remote
Actor
.This is a lightweight class which delegates function calls to the underlying remote object.
It is picklable and therefore can be send from actor to actor using actor message passing.
For example, lets say we have a proxy
a
, to send a message to it:from pulsar import send send(a, 'echo', 'hello there!')
will send the command
echo
to actora
with parameter"hello there!"
.-
address
¶ the socket address of the underlying
Actor.mailbox
.
-
ActorProxyMonitor¶
-
class
pulsar.async.proxy.
ActorProxyMonitor
(impl)[source]¶ A specialised
ActorProxy
class.It contains additional information about the remote underlying
Actor
. Instances of this class serialise intoActorProxy
.The
ActorProxyMonitor
is special since it lives in theArbiter
domain and it is used by theArbiter
(or aMonitor
) to monitor the state of the spawned actor.-
impl
¶ The
Concurrency
instance for the remoteActor
. This dictionary is constantly updated by the remote actor by sending the info message.
-
mailbox
¶ This is the connection with the remote actor. It is available once the actor handshake between the actor and the monitor has completed. The
mailbox
is a server-sideMailboxProtocol
instance and it is used by thesend()
function to send messages to the remote actor.
-
notified
¶ Last time this
ActorProxyMonitor
was notified by the remote actor.
-
proxy
¶ The
ActorProxy
for this monitor.
-
Messages¶
Actor
communicate with each other via the send()
function
which uses the via Actor.mailbox
attribute of the actor in the
current context. When an actor communicate with
another remote actor it does so by sending an action to it
with positional and/or key-valued arguments. For example:
send(target, 'ping')
will send the ping action to target from the actor in the current context of execution. The above is equivalent to:
get_actor().send(target, 'ping')
Each action is implemented via the command()
decorator implemented
in the pulsar.async.commands
module.
A list of standard commands
is available in the design documentation.
Concurrency¶
The Concurrency
class implements the behaviour of an Actor
and therefore allows for decoupling between the Actor
abstraction
and its implementation (bridge pattern).
Base Concurrency¶
-
class
pulsar.async.concurrency.
Concurrency
[source]¶ Actor
Concurrency
.Responsible for the actual spawning of actors according to a concurrency implementation. Instances are picklable and are shared between the
Actor
and itsActorProxyMonitor
. This is an abstract class, derived classes must implement thestart
method.Parameters: - concurrency – string indicating the concurrency implementation.
Valid choices are
monitor
,process
,thread
. - actor_class –
Actor
or one of its subclasses. - timeout – timeout in seconds for the actor.
- kwargs – additional key-valued arguments to be passed to the actor constructor.
-
actor_class
¶ alias of
Actor
-
hand_shake
(actor)[source]¶ Perform the hand shake for
actor
The hand shake occurs when the
actor
is in starting state. It performs the following actions:- set the
actor
as the actor of the current thread - bind two additional callbacks to the
start
event - fire the
start
event
If the hand shake is successful, the actor will eventually results in a running state.
- set the
-
periodic_task
(actor, **kw)[source]¶ Implement the actor period task.
This is an internal method called periodically by the
Actor._loop
to ping the actor monitor. If successful return aFuture
called back with the acknowledgement from the monitor.
- concurrency – string indicating the concurrency implementation.
Valid choices are
Monitor Concurrency¶
-
class
pulsar.async.concurrency.
MonitorConcurrency
[source]¶ Concurrency
class for aMonitor
.Monitors live in the main thread of the master process and therefore do not require to be spawned.
-
periodic_task
(monitor, **kw)[source]¶ Override the
Concurrency.periodic_task()
to implement theMonitor
periodic task.
-
Arbiter Concurrency¶
-
class
pulsar.async.concurrency.
ArbiterConcurrency
[source]¶ Concurrency implementation for the
arbiter
-
add_monitor
(actor, monitor_name, **params)[source]¶ Add a new
monitor
.Parameters: - monitor_class – a
Monitor
class. - monitor_name – a unique name for the monitor.
- kwargs – dictionary of key-valued parameters for the monitor.
Returns: the
Monitor
added.- monitor_class – a
-
create_mailbox
(actor, loop)[source]¶ Override
Concurrency.create_mailbox()
to create the mailbox server.
-
periodic_task
(actor, **kw)[source]¶ Override the
Concurrency.periodic_task()
to implement theArbiter
periodic task.
-
Constants¶
Constants used throughout pulsar.
-
pulsar.async.consts.
ACTOR_STATES
= {'RUN': 3, 'TERMINATE': 6, 'DESCRIPTION': {0: 'initial', 1: 'inactive', 2: 'starting', 3: 'running', 4: 'stopping', 5: 'closed', 6: 'terminated'}, 'INITIAL': 0, 'CLOSE': 5, 'STOPPING': 4, 'INACTIVE': 1, 'STARTING': 2}¶ Actor state constants are access via:
from pulsar import ACTOR_STATES
They are:
ACTOR_STATES.INITIAL = 0
when an actor is just created, before thepulsar.Actor.start
method is called.ACTOR_STATES.STARTING = 2
whenpulsar.Actor.start
method is called.ACTOR_STATES.RUN = 3
whenpulsar.Actor._loop
is up and running.ACTOR_STATES.STOPPING = 4
whenpulsar.Actor.stop
has been called for the first time and the actor is running.
-
pulsar.async.consts.
ACTOR_ACTION_TIMEOUT
= 5¶ Important constant used by
pulsar.Monitor
to kill actors which don’t respond to thestop
command.
-
pulsar.async.consts.
MONITOR_TASK_PERIOD
= 1¶ Interval for
pulsar.Monitor
andpulsar.Arbiter
periodic task.