Writing agents with joinads

Another area where the match! syntax can be used is when programming with F# agents, implemented by the MailboxProcessor type. Formally, agents do not form the monad structure in a useful way - when programming with agents, we do not compose a new agents, but instead we write code that (imperatively) receives messages from the agent's mailbox and handles them.

This article demonstrates an agent { ... } computation builder that can be used for implementing the body of an agent. Normally, the body of an agent is an asynchronous workflow. The code in the body uses let! to perform asynchronous operations, most importantly to call inbox.Receive to get the next message from the inbox. When the agent intends to handle only certain kinds of messages, it can use inbox.Scan. When using the agent builder, pattern matching on messages can be written using match! and it is possible to write code that ignores certain types of messages simply by writing an incomplete pattern matching.

Implementing pausable agent

As an example, let's implement a simple agent that handles Print messages by printing a received string to the console. In addition, the agent also supports Pause and Resume messages - when it receives Pause, it stops accepting Print messages until it receives Resume and so all Print messages will be queued for later processing.

First of all, we need to reference the namespace with agent builder and declare the type of messages, which is a simple discriminated union:

open FSharp.Extensions.Joinads

type Message =
  | Print of string
  | Pause 
  | Resume

The body of the agent consists of two functions. The working function represents the agent in a state where it can handle all messages and the paused function represents the state when the agent waits for Resume (and also dismisses Paused messages, because it is already paused). Using match! this can be implemented as follows:

let printer = MailboxProcessor.Start(fun inbox -> 

  /// Agent can receive and handle any message; after processing, 
  /// it continues in 'working' state unless it receives 'Pause'
  let rec working() = agent {
    match! inbox with
    | Print msg -> 
        printfn "%s" msg
        return! working()
    | Pause -> return! paused()
    | Resume -> return! working() }

  /// Agent is paused and waits for 'Resume' (and then continues in
  /// 'working' state). If it receives 'Pause', it remains paused 
  and paused() = agent {
    match! inbox with
    | Pause -> return! paused()
    | Resume -> 
        printfn "Resuming..."
        return! working() } 

  // Start the agent in the working state
  working() )

The snippet creates an agent printer and starts it. Both of the functions that implement the body of the agent use match! to receive the next message from the inbox. Inside the agent { ... } computation, the match! keyword can be used to match on messages in the agent's inbox (of type MailboxProcessor<'T>). Otherwise, the computation builder behaves just like async { ... }, so the body can call asynchronous operations using let! as usual (in theory, it would be more clean to return async { ... } blocks from the body of match!, but the agent builder uses overloading to make the syntax nicer).

As already discussed, the working state accepts all messages - when implementing F# agents in the usual way, this would be done by calling inbox.Receive() asynchronously and then pattern matching on the received message. With the match! construct, we can do both operations at the same time.

However, the main benefit of using match! is apparent when implementing states that only accept certain messages. In the paused state, the pattern matching handles only Pause and Resume. In response to Resume, the agent also prints "Resuming..." to make the code slightly more interesting. The match! operation implemented by the agent workflow automatically leaves all unhandled messages in the inbox, so the pattern matching does not have to be complete. In a normal implementation (shown below), this would have to be done using inbox.Scan.

To test the agent, you can try running the following commands. If you execute them one by one, you can see that the second Print message gets handled only after the Resume message is received:

printer.Post(Print "hello world")
printer.Post(Pause)
printer.Post(Print "hello again!")
printer.Post(Resume)

The agent discussed in this section is fairly simple, but it shows that handling messages is largely simplified using match!. The next section compares the previous code with a version written using inbox.Scan.

Comparison with standard style

When implementing agents without using match!, states that do not handle all possible messages need to be implemented using Scan. The operation takes a function that returns option<Async<unit>>. The result is None for messages that should be left in the queue and Some(async { ... }) when a message can be handled.

In the above example, this only applies to the paused state:

let printer = MailboxProcessor.Start(fun inbox -> 

  (Working state omitted)

  // Use Scan to handle only Pause or Resume messages
  and paused() = inbox.Scan(function
    | Pause -> Some(paused())
    | Resume -> Some(async { 
        printfn "Resuming..."
        return! working() })
    | _ -> None) 

  (Start the agent) )

The syntax is definitely less obvious, especially in the Resume case, where we need to perform some operation before returning. This requires writing an asynchronous workflow that is wrapped in Some (which is a value returned by a pattern matching inside a function).

Implementing blocking queue

To give a more complex example of an agent using match!, we can re-implement the BlockingQueueAgent from the MSDN tutorial. The agent implements an asynchronous blocking queue similar to BlockingCollection from .NET. It supports two messages - one for adding values to the queue and another for removing values from the queue. Both of the messages should be sent asynchronously, because their processing can be blocked. The Add message cannot be processed when the queue is full and the Get message cannot be processed when the queue is empty.

We first reference a namespace that we'll need later and define the message type:

open System.Collections.Generic

type BlockingAgentMessage<'T> = 
  | Add of 'T * AsyncReplyChannel<unit> 
  | Get of AsyncReplyChannel<'T>

Both messages also carry AsyncReplyChannel, because the agent needs to reply to both of them. The reply to Get carries the obtained value, while a reply to Add is just a notification that the value was added to the queue.

The implementation of the queue uses a mutable Queue<'T> to keep the items. This is perfectly fine when writing an F# agent, because its body is not executed concurrently and the blocking queue agent does not need to expose the entire queue of messages.

The body of the agent consists of three functions that implement three states. The agent can be empty (waiting for Put message); full (waiting for the Get message) or it is able to handle both messages:

let createQueue maxLength = MailboxProcessor.Start(fun inbox ->
  // Private queue that stores the values
  let items = new Queue<_>()
  // Continue in one of the states, depending on the queue
  let rec chooseState() = (...)

  // When the agent is empty, it can only handle 'Add'
  and emptyQueue() = agent {
    match! inbox with
    | Add(value, reply) -> 
        items.Enqueue(value)
        reply.Reply()
        return! chooseState() }
  // When the agent is full, it can only handle 'Get'
  and fullQueue() = agent {
    match! inbox with
    | Get(reply) -> 
        reply.Reply(items.Dequeue())
        return! chooseState() }
  // A state in which the agent can handle both messages
  and runningQueue() = agent {
    match! inbox with
    | Add(value, reply) -> 
        reply.Reply() 
        items.Enqueue(value)
        return! chooseState()
    | Get(reply) -> 
        reply.Reply(items.Dequeue())
        return! chooseState() }

  // Start with an empty queue
  emptyQueue() )

Compared with the original version of the agent in the MSDN tutorial, the code is more consistent. We don't need to use different programming style for functions that can handle all messages (runningQueue) and for functions that only handle certain messages (emptyQueue and fullQueue).

Summary

This article demonstrated how to use match! construct when implementing agents using the MailboxProcessor type in F#. This use of match! does not follow the usual formal notion of joinad as explained in the recent publications, because the agent { ... } computation does not implement parallel composition (it is not possible to match on multiple mailboxes) and the monadic bind works over two different types (normal asynchronous workflows and agent's inbox). However, from the practical point of view, this use definitely simplifies programming with agents.

namespace FSharp
namespace FSharp.Extensions
namespace FSharp.Extensions.Joinads
type Message =
  | Print of string
  | Pause
  | Resume

Full name: TryJoinads.Message
  type: Message
union case Message.Print: string -> Message
Multiple items
val string : 'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
  type: string
union case Message.Pause: Message
union case Message.Resume: Message
val printer : MailboxProcessor<Message>

Full name: TryJoinads.printer
  type: MailboxProcessor<Message>
type MailboxProcessor<'Msg> =
  class
    interface System.IDisposable
    new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
    member Post : message:'Msg -> unit
    member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
    member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
    member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
    member Receive : ?timeout:int -> Async<'Msg>
    member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
    member Start : unit -> unit
    member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
    member TryReceive : ?timeout:int -> Async<'Msg option>
    member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
    member add_Error : Handler<System.Exception> -> unit
    member CurrentQueueLength : int
    member DefaultTimeout : int
    member Error : IEvent<System.Exception>
    member remove_Error : Handler<System.Exception> -> unit
    member DefaultTimeout : int with set
    static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
  end

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
  type: MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<Message>
  type: MailboxProcessor<Message>
val working : (unit -> Async<'a>)


 Agent can receive and handle any message; after processing,
 it continues in 'working' state unless it receives 'Pause'
val agent : AgentBuilder

Full name: FSharp.Extensions.Joinads.TopLevelAgentValues.agent
val msg : string
  type: string
val printfn : Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val paused : (unit -> Async<'a>)


 Agent is paused and waits for 'Resume' (and then continues in
 'working' state). If it receives 'Pause', it remains paused
member MailboxProcessor.Post : message:'Msg -> unit
val printer : MailboxProcessor<Message>

Full name: TryJoinads.Comparison.printer
  type: MailboxProcessor<Message>
let rec working() = agent {
        let! msg = inbox.Receive()
        match msg with
        | Print msg ->
            printfn "%s" msg
            return! working()
        | Pause -> return! paused()
        | Resume -> return! working() }
val paused : (unit -> Async<'a>)
member MailboxProcessor.Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
union case Option.Some: 'T -> Option<'T>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val working : (unit -> Async<'a>)
union case Option.None: Option<'T>
// Start the agent in the working state
      working()
namespace System
namespace System.Collections
namespace System.Collections.Generic
type BlockingAgentMessage<'T> =
  | Add of 'T * AsyncReplyChannel<unit>
  | Get of AsyncReplyChannel<'T>

Full name: TryJoinads.BlockingAgentMessage<_>
  type: BlockingAgentMessage<'T>
union case BlockingAgentMessage.Add: 'T * AsyncReplyChannel<unit> -> BlockingAgentMessage<'T>
type AsyncReplyChannel<'Reply>
with
  member Reply : value:'Reply -> unit
end

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
  type: unit
union case BlockingAgentMessage.Get: AsyncReplyChannel<'T> -> BlockingAgentMessage<'T>
val createQueue : int -> MailboxProcessor<BlockingAgentMessage<'a>>

Full name: TryJoinads.createQueue
val maxLength : int
  type: int
  inherits: System.ValueType
val inbox : MailboxProcessor<BlockingAgentMessage<'a>>
  type: MailboxProcessor<BlockingAgentMessage<'a>>
val items : Queue<'a>
  type: Queue<'a>
type Queue<'T> =
  class
    new : unit -> System.Collections.Generic.Queue<'T>
    new : int -> System.Collections.Generic.Queue<'T>
    new : System.Collections.Generic.IEnumerable<'T> -> System.Collections.Generic.Queue<'T>
    member Clear : unit -> unit
    member Contains : 'T -> bool
    member CopyTo : 'T [] * int -> unit
    member Count : int
    member Dequeue : unit -> 'T
    member Enqueue : 'T -> unit
    member GetEnumerator : unit -> Enumerator<'T>
    member Peek : unit -> 'T
    member ToArray : unit -> 'T []
    member TrimExcess : unit -> unit
    type Enumerator =
      struct
        member Current : 'T
        member Dispose : unit -> unit
        member MoveNext : unit -> bool
      end
  end

Full name: System.Collections.Generic.Queue<_>
  type: Queue<'T>
val chooseState : (unit -> Async<'b>)
if items.Count = 0 then emptyQueue()
    elif items.Count < maxLength then runningQueue()
    else fullQueue()
val emptyQueue : (unit -> Async<'b>)
val value : 'a
val reply : AsyncReplyChannel<unit>
Queue.Enqueue(item: 'a) : unit
val fullQueue : (unit -> Async<'b>)
val reply : AsyncReplyChannel<'a>
Queue.Dequeue() : 'a
val runningQueue : (unit -> Async<'b>)