Integration
SlowActors took the following steps to integrate with Actors:
Actor Primitives
1. Reexport Actors
using Reexport
@reexport using Actors2. Provide Yourtype as a Link{T} parameter
I the case of SlowActors this is Mailbox:
struct Mailbox
box::Deque
lock::ReentrantLock
A::Union{Nothing,SlowActor}
end3. Provide a concrete Link type
You then can create a new concrete Link{Yourtype} type:
newLink() = Link(Mailbox(Deque{Any}(), ReentrantLock(), nothing),
myid(), :local)Your actor creation primitive, e.g. spawn must return that type too:
function spawn(bhv::Func; sticky=false)
lk = Link(
Mailbox(Deque{Any}(), ReentrantLock(),
SlowActor(nothing, sticky, _ACT())),
myid(), :local
)
lk.chn.A.act.bhv = bhv
lk.chn.A.act.self = lk
end4. Export your primitives for link and actor creation
export newLink, spawn5. Reimplement send! for your link type
Actors.send!(lk::Link{Mailbox}, msg::Msg) = (push!(lk.chn, msg); _start_on_send(lk))
Actors.send!(lk::Link{Mailbox}, msg...) = (push!(lk.chn, msg); _start_on_send(lk))Then you can send messages between actors using the Actors interface. Still some further primitives are missing, e.g. self() and become!/become. You get those and much more if you plug in the messaging protocol of Actors.
Plugin the Actors API
6. Provide your actor with the _ACT variable
The _ACT variable contains status information for each actor such as the behavior function, the actor link, init and exit functions, actor ties ...
Therefore on actor creation you create also _ACT(). As shown above, you must set the fields bhv and self with the behavior and the actor link respectively:
...
lk.chn.A.act.bhv = bhv
lk.chn.A.act.self = lk
endWhen your actor task starts, it puts a reference to its _ACT variable into task_local_storage:
function _act(mbx::Mailbox)
task_local_storage("_ACT", mbx.A.act)
magic = hash(rand(Int))
while true
msg = lock(mbx.lock) do
isempty(mbx.box) ?
magic : popfirst!(mbx.box)
end
msg == magic && break
msg isa Actors.Exit && break
onmessage(mbx.A.act, msg)
end
end7. Call onmessage to process a message
If a message of type Msg arrives, onmessage provides a finite state machine to process it. To call onmessage(A::_ACT, msg) enables the Actors messaging protocol.
You can extend Msg with new message types or create other message types. In that case you might want to extend Actors.onmessage to handle those other types if you don't want the standard behavior to pass it as last argument to the behavior function.
8. Enable receive!
The Actors API works with receive! to enable easy communication with actors. You enable receive! on your concrete link type if you extend three Base functions with your Link{Yourtype} parameter Yourtype, e.g.:
Base.isready(mbx::Mailbox) = !isempty(mbx)
function Base.fetch(mbx::Mailbox) # this waits for a message
timedwait(60, pollint=0.01) do
!isempty(mbx)
end == :ok ?
lock(()->first(mbx.box), mbx.lock) :
:timed_out
end
Base.take!(mbx::Mailbox) = popfirst!(mbx)