Producer-Consumer Problem

This classic problem describes producers and consumers sharing a buffer with a given capacity. A producer produces an item at a time, puts it into the buffer and starts again. At the same time consumers are consuming the items (i.e. removing them from the buffer). The problem ist to make sure that a producer won't add items to the buffer if it is full and that a consumer won't try to remove them from an empty buffer.

The solution for a producer is to go to sleep if the buffer is full. The next time, a consumer removes an item from the buffer, the buffer notifies the stalled producer, who then starts to replenish the buffer again. In the same way, the consumer can go to waiting if it finds the buffer empty. The next time a producer delivers an item, the buffer notifies the waiting consumer.

We implement this problem with three kinds of actors for store, producer and consumer.

The store has a fixed capacity, holds items and queues of stalled producers and waiting customers and a counting variable.

# examples/prod_cons.jl

using Actors, Printf
import Actors: spawn

const maxitems = 10

mutable struct Store
    capacity::Int
    items::Array{Any,1}
    prod::Array{Link,1}
    cons::Array{Link,1}
    count::Int
    Store(capacity::Int) = new(capacity, Any[], Link[], Link[], 0)
end

available(s::Store) = 0 < length(s.items) < s.capacity
isfull(s::Store) = length(s.items) ≥ s.capacity
Base.isempty(s::Store) = isempty(s.items)

We implement the store's behavior as a function object receiving two messages Put() and Take().

@msg Put Full Done Ok Take Empty Notify

function (s::Store)(::Put, prod, item)
    if isfull(s)
        send(prod, Full(), item)
        push!(s.prod, prod)
    elseif s.count < maxitems
        push!(s.items, item)
        s.count += 1
        s.count == maxitems ?
            send(prod, Done()) :
            send(prod, Ok(), item)
        !isempty(s.cons) && send(popfirst!(s.cons), Notify())
    else
        send(prod, Done())
    end
end
function (s::Store)(::Take, cons)
    if isempty(s)
        send(cons, Empty())
        push!(s.cons, cons)
    else
        send(cons, popfirst!(s.items))
        !isempty(s.prod) && send(popfirst!(s.prod), Notify())
    end
end

Producers and consumers have a name and a link to the store:

struct Prod
    name::String
    store::Link
end

struct Cons
    name::String
    store::Link
end

Those are acquaintances of their behavior functions. We have also a print server actor prn as a global variable.

function prod_start(p::Prod, start)
    become(producing, p)
    send(self(), start+1)
    send(prn, "producer $(p.name) started")
end
function producing(p::Prod, item)
    sleep(rand())
    send(p.store, Put(), self(), item)
end
function producing(p::Prod, ::Ok, item)
    send(prn, "producer $(p.name) delivered item $item")
    send(self(), item+1)
end
function producing(p::Prod, ::Full, item)
    send(prn, "producer $(p.name) stalled with item $item")
    become(stalled, p, item)
end
function producing(p::Prod, ::Done)
    send(prn, "producer $(p.name) done")
    stop()
end
function stalled(p::Prod, item, ::Notify)
    send(p.store, Put(), self(), item)
    become(producing, p)
end

function cons_start(c::Cons)
    become(buying, c)
    send(c.store, Take(), self())
    send(prn, "consumer $(c.name) started")
end
function buying(c::Cons, item)
    become(consuming)
    send(self(), c)
    send(prn, "consumer $(c.name) got item $item")
end
function buying(c::Cons, ::Empty)
    become(waiting, c)
    send(prn, "consumer $(c.name) found store empty")
end
function consuming(c)
    sleep(rand())
    become(buying, c)
    send(c.store, Take(), self())
end
function waiting(c::Cons, ::Notify)
    become(buying, c)
    send(c.store, Take(), self())
end

Finally we start the simulation with a print server, a store, three producers and two consumers.

prn = spawn(s->print(@sprintf("%s\n", s)))
st = spawn(Store(5))
pr1 = spawn(prod_start, Prod("A", st), 100)
pr2 = spawn(prod_start, Prod("B", st), 200)
pr3 = spawn(prod_start, Prod("C", st), 300)
cs1 = spawn(cons_start, Cons("U", st))
cs2 = spawn(cons_start, Cons("V", st))

foreach(x->send(x), (pr1,pr2,pr3,cs1,cs2))

Let's see, what happens:

julia> include("examples/prod_cons.jl")

julia> producer A started
consumer U started
producer C started
producer B started
consumer V started
consumer V found store empty
consumer U found store empty
producer A delivered item 101
consumer U got item 101
producer B delivered item 201
consumer V got item 201
producer C delivered item 301
consumer V got item 301
consumer U found store empty
producer C delivered item 302
consumer U got item 302
producer B delivered item 202
producer C delivered item 303
producer B delivered item 203
producer A delivered item 102
producer A delivered item 103
consumer U got item 202
consumer V got item 303
producer A done
consumer V got item 203
producer C done
producer B done
consumer V got item 102
consumer U got item 103
consumer V got item 104
consumer U found store empty
consumer V found store empty

We had limited the sold items to 10. This is a queueing process.