Concurrent programming using join calculus

Join calculus provides a declarative way of expressing asynchronous synchronization patterns. It has been use as a basis for programming languages (JoCaml and COmega), but also as a basis for libraries (embedded in C# and Scala). Using joinads, it is possible to embed join calculus in F# with a nice syntax using the match! construct. Formally, join calculus does not form a monad, but it can be viewed as a version of joinad as described in the first paper on joinads.

The programming model is based on channels and join patterns. A channel can be viewed as a thread-safe mailbox into which we can put values without blocking the caller. In some sense, this is quite similar to F# agents. A join pattern is then a rule saying that a certain combination of values in channels should trigger a specific reaction (and remove values from the channels). The ability to match on multiple channels distinguishes join calculus from F# agents.

Thread-safe buffer

As a first example, we implement a thread-safe (unbounded) blocking buffer. The buffer uses two channels - the put channel is used for adding values to the buffer and has a type Channel<string>; the get channel is used for retrieving values from the buffer. When the buffer is empty, the call cannot complete, so the operation is available as an asynchronous workflow. Channels that are used to obtain some result from a join pattern are represented using the ReplyChannel<'T> type where 'T is the type of the returned value.

After constructing the channels, we construct a join pattern using the join { ... } computation builder by pattern matching on channels using match!:

open FSharp.Extensions.Joinads

// Construct channels implementing the buffer
let put = Channel<string>()
let get = ReplyChannel<string>()

// Encode a join pattern over the channels
join {
  match! get, put with
  | repl, v -> repl.Reply("Echo: " + v) }

The pattern matching consists of a single clause that matches on both of the channels. When both values become available, the join pattern is triggered. In response, it calls Reply method of the value received from the get buffer. This sends a string value back to the caller of the get channel. Unlike for example async { ... }, the join { ... } computation does not return any value - it constructs a join pattern that operates over the existing channels. We look at a more complex sample soon, but let's first demonstrate how the channels are used:

// Store some values in 'put' channel
put.Call("Hello world.")
put.Call("Hi there!")

// Obtain value via the 'get' channel
get.AsyncCall() |> Async.RunSynchronously

The above example shows how channels behave - values stored in channels are cached and when there is a matching pair (after calling both put and get), the join pattern fires. Note that calling AsyncCall using RunSynchronously blocks the F# Interactive if there is no matching value in the put channel.

Buffer with two input channels

To make the example slightly more interesting, we now change it to use two input channels. In the extended version, we can store two types of values in the buffer. The putString channel makes it possible to store strings and the putInt channel can be used to store integers. Values can be retrieved only as strings (numbers get converted to strings) using a single getString channel.

This makes the join pattern more interesting, because it must now handle two cases. One pattern handles the case when there is a string in putString and a request to read value in getString and another pattern handles a case when there are values in putInt and getString:

let putString = Channel<string>()
let putInt = Channel<int>()
let getString = ReplyChannel<string>()

join {
  match! getString, putString, putInt with
  | repl, v, ? -> repl.Reply("Echo " + v)
  | repl, ?, v -> repl.Reply("Echo " + (string v)) }

The first clause of the match! construct handles the case when there is a value in getString and putString. The third channel is matched against the ? pattern, which means that a value from that channel is not required. When the pattern matches, the values from getString and putString channels are removed and they are processed (by sending a string back to the caller of getString channel). The second pattern is similar, but it matches on getString and putInt.

The following example shows how to call the buffer implemented above:

// Put values to 'putString' and 'putInt' channels
async {
  for i in 1 .. 5 do 
    do! Async.Sleep(100) } |> Async.Start

// Call 'getString' asynchronously to read replies
async { 
  while true do
    let! repl = getString.AsyncCall()
    printfn "got: %s" repl } |> Async.Start

The code creates and starts tw oasynchronous workflows - one that puts 5 values to each of the put channels and another that keeps reading values from the getString channel. When you run the code, you should see that the two join patterns are triggered in an interleaving order, as the values are added to the put channels.


In this article, we looked at two basic examples that used join calculus, embedded in F# using the match! construct. The second example showed a sample consisting of multiple join patterns. This is when join calculus becomes quite powerful, because we can declaratively specify what combination of values should trigger a certain reaction.

Join calculus provides a powerful abstraction for implementing concurrency primitives - it can be used to write various buffers, reader-writer locks, count-down events etc. However, the join calculus has been designed as an abstraction for distributed programming and so it may find use in other areas as well.

See also

namespace FSharp
namespace FSharp.Extensions
namespace FSharp.Extensions.Joinads
val put : Channel<string>

Full name: TryJoinads.put
  type: Channel<string>
Multiple items
module Channel

from FSharp.Extensions.Joinads

type Channel<'T> =
    interface IChannel<'T>
    new : unit -> Channel<'T>
    member Call : message:'T -> unit
    member Put : message:'T -> unit

Full name: FSharp.Extensions.Joinads.Channel<_>
  type: Channel<'T>
Multiple items
val string : 'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

type string = System.String

Full name: Microsoft.FSharp.Core.string
  type: string
val get : ReplyChannel<string>

Full name: TryJoinads.get
  type: ReplyChannel<string>
  inherits: Channel<IReplyChannel<string>>
Multiple items
type ReplyChannel<'TArg,'TRes> =
    inherit Channel<'TArg * IReplyChannel<'TRes>>
    new : unit -> ReplyChannel<'TArg,'TRes>
    member AsyncCall : arg:'TArg -> Async<'TRes>

Full name: FSharp.Extensions.Joinads.ReplyChannel<_,_>
  type: ReplyChannel<'TArg,'TRes>
  inherits: Channel<'TArg * IReplyChannel<'TRes>>

type ReplyChannel<'TRes> =
    inherit Channel<IReplyChannel<'TRes>>
    new : unit -> ReplyChannel<'TRes>
    member AsyncCall : unit -> Async<'TRes>

Full name: FSharp.Extensions.Joinads.ReplyChannel<_>
  type: ReplyChannel<'TRes>
  inherits: Channel<IReplyChannel<'TRes>>
val join : JoinBuilder

Full name: FSharp.Extensions.Joinads.Builders.join
val repl : IReplyChannel<string>
val v : string
  type: string
member Channel.Call : message:'T -> unit
member ReplyChannel.AsyncCall : unit -> Async<'TRes>
Multiple items
module Async

from FSharp.Extensions.Joinads.AsyncTopLevel

type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

type Async =
    static member ParallelImmediate : works:Async<'T> [] -> Async<'T []>
    static member StartChildImmediate : work:Async<'a0> -> Async<Async<'a0>>
    static member WhenAny : works:Async<'T> [] -> Async<'T>

Full name: FSharp.Extensions.Joinads.Async
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
val putString : Channel<string>

Full name: TryJoinads.putString
  type: Channel<string>
val putInt : Channel<int>

Full name: TryJoinads.putInt
  type: Channel<int>
Multiple items
val int : 'T -> int (requires member op_Explicit)

Full name:

type int<'Measure> = int

Full name:<_>
  type: int<'Measure>
  inherits: System.ValueType

type int = int32

Full name:
  type: int
  inherits: System.ValueType
val getString : ReplyChannel<string>

Full name: TryJoinads.getString
  type: ReplyChannel<string>
  inherits: Channel<IReplyChannel<string>>
val v : int
  type: int
  inherits: System.ValueType
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val i : int
  type: int
  inherits: System.ValueType
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
static member Async.Start : computation:Async<unit> * ?cancellationToken:System.Threading.CancellationToken -> unit
val repl : string
  type: string
val printfn : Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn