Asynchronous workflows and joinads

Asynchronous workflows provide a way of writing code that does not block a thread when waiting for a completion of long-running operation such as web service call, another I/O operation or waiting for the completion of some background operation. In this article, we look at the new expressive power that joinads add to asynchronous workflows written using the async { ... } block in F#.

Downloading web sites asynchronously

Code written using asynchronous workflows is wrapped in the async { ... } block and the resulting computation has a type Async<'T>. The body of a workflow can contain let! to call another asynchronous computation of type Async<'T>, do! to call asynchronous operation returning unit and use! to call an asynchronous operation and ensure that the result (of type IDisposable) will be diposed of when the computation finishes. For example, the following function downloads a specified web site and extracts the value of the <title> element:

open System
open System.Net
open System.Text.RegularExpressions

/// Extracts the content of the <title> element
let extractTitle html = 
  let regTitle = new Regex(@"\<title\>([^\<]+)\</title\>")
  regTitle.Match(html).Groups.[1].Value

/// Asynchronously downloads a page and extracts the title
/// (uses a proxy to enable cross-domain downloads)
let downloadTitle url = async {
  let wc = new WebClient()
  let proxy = "http://tomasp.net/tryjoinads/proxy.aspx?url=" + url
  let! html = wc.AsyncDownloadString(Uri(proxy)) 
  return extractTitle html }

The AsyncDownloadString operation takes a URL and returns an asynchronous workflow of type Async<string>. The let! operation makes it possible to sequentially compose asynchronous operations and store the result as a value of type string in a variable html. Asynchronous workflows do not introduce any implicit parallelism - the work is performed step-by-step, but without blocking threads.

To implement parallelism within asynchronous computations, we can use combinators such as Async.Parallel and Async.StartChild that are provided by the F# library. The first combinator takes a sequence of asynchronous workflow and returns a single workflow, that performs all of them in parallel and returns an array with the results. The second combinator starts a workflow without waiting and returns a handle (also an asynchronous workflow) that can be later used for waiting until the operation finishes.

Parallel composition using joinads

Thanks to joinads, it is possible to use the match! syntax within the async { ... } block to express patterns that would be otherwise expressed using combinators. In this section, we start with parallel composition. For example, let's say that we want to download the title of two different web pages in parallel and then print both of them. Using joinads, this can be written as follows:

open FSharp.Extensions.Joinads

let fsharp = "http://www.fsharp.net"
let csharp = "http://www.csharp.net"

/// Download titles of two pages in parallel
let titles = async {
  match! downloadTitle fsharp, downloadTitle csharp with
  | title1, title2 -> 
      printfn "Downloaded:\n   - %s\n   - %s" title1 title2 }

titles |> Async.Start

The match! construct can be used to pattern match on computations. In case of asynchronous workflows, the type of computations is Async<'T>. Here, we use match! with two arguments - computations that download the title of two web pages. The pattern matching contains just a single clause that will match when both of the computations finish and produce a value. This means, that the body of the computation is executed once both of the workflows complete. When the titles workflow is started on the last line, the match! construct starts both of the downloads and then runs the body of the only clause.

In general, parallel composition is used when a single clause contains multiple patterns that require the computation to finish (we will see other patterns in the next section). The parallel composition is defined by the async.Merge operation that is defined by the computation builder. The type of the operation is:

async.Merge : Async<'T1> * Async<'T2> -> Async<'T1 * 'T2>

We might try to implement the operation using an asynchronous workflow that waits for the two computations in sequence, but that would not give a desired result. The files would be downloaded sequentially, which is not what we wanted. Instead, the operation is implemented using the Async.Parallel combinator.

Non-deterministic choice using joinads

Another operation that can be written using joinads is (non-deterministic) choice. For asynchronous workflows, this corresponds to waiting for the completion of the first out of several operations. For example, a required resource may be available from the primary and from a secondary server. To speed-up the processing, we may want to try downloading the resource from both of the servers and return the result (in this case, the <title>) from the first one that returns. This can be written as follows:

let main = "http://msdn.microsoft.com/en-us/vstudio/hh388569.aspx"
let backup = "http://www.fsharp.net"

/// Start two downloads and return the first available result
let getFirst = async {
  match! downloadTitle main, downloadTitle backup with
  | res, ? -> printfn "Main: %s" res
  | ?, res -> printfn "Backup: %s" res }

getFirst |> Async.Start

Just like in the previous example, the match! construct has two arguments that represent the two asynchronous downloads. However, the body now consists of two clauses. The first clause uses a new syntax for patterns (added by the joinads extension) that is called ignore pattern. When you write ? in the clause, it means that the corresponding computation is not required to produce a value.

In case of getFirst, this means that the first clause can be executed when downloadTitle main finishes (and produces a value that is assigned to res), but the second computation is not required to complete. Similarly, the second clause matches when the downloading of backup completes, regardless of the first download.

In general, (non-deterministic) choice is used when match! consists of multiple clauses. For some types of computations (like Async<'T>), the behaviour is truly non-deterministic, but other computations may define a deterministic choice. The definition is provided by the async.Choose operation defined by the computation builder:

async.Choose : Async<'T> * Async<'T> -> Async<'T>

For F# asynchronous workflows, the operation does not correspond to any built-in operator, so it has to be implemented by spawning the two computations and waiting for the first one that completes, either using F# agent or using mutable state. The implementation of both Choose and Merge can be found in FSharp.Joinads project on GitHub.

Matching against multiple patterns

The previous two examples gave the simplest case for choice and parallel composition. In the first one, only the Merge operation was needed and the second one required only the Choose operation. Notably, the match! construct can be used when only one of the operations is defined, but it will accept only a subset of the syntax:

Let's say that we wanted to download the title of one of two web pages just like in the previous example. This time, the download function returns Async<string option>, meaning that it returns None when the computation fails for some reason. We want to write a workflow that returns Some with the title when either of the computations succeed and None when both fail:

let good = "http://www.fsharp.net"
let bad = "http://www.f#.net"

/// Wraps 'downloadTitle' with an exception handler and returns
/// None if an exception occurs (or Some when download succeeds)
let tryDownloadTitle url = (...)

/// Try to download first available title. If both downloads
/// fail, then the value 'None' is returned.
let tryGetFirst = async {
  match! tryDownloadTitle good, tryDownloadTitle bad with
  | Some res, ? -> return Some ("First: " + res)
  | ?, Some res -> return Some ("Second: " + res)
  | None, None  -> return None }

// Run the download synchronously and wait for the result
let res = tryGetFirst |> Async.RunSynchronously
printfn "Result: %A" res

The sample looks similar to the previous one, but there are a few changes. The patterns in the first two clauses only match when the value is Some and there is a third clause that handles the remaining case - when both of the computations fail. The code is interesting for two reasons:

  1. It requires both choice and parallel composition, because there are multiple clauses and the last clause matches on both of the computations.
  2. Both of the computations are accessed in two patterns - in the first or the second clause and in the last clause.

The first fact simply means that the computation builder needs to provide both Merge and Choose. However, the second property has an interesting implication for asynchronous workflows. When you write code that uses a workflow work multiple times, such as async.Choose(work, work), and run it, the computation represented by the workflow is started twice.

For the above example, this is not desirable. We just want to start the two downloads and then reuse the result (when it becomes available) of the same computation in the two clauses. To achieve this behaviour, the translation of match! uses one more operation that represents aliasing of computations:

async.Alias : Async<'T> -> Async<Async<'T>>

The operation returns an asynchronous workflow that, when executed, starts the workflow specified as the argument. Then it returns a new (inner) workflow that does not do any additional work when called. Instead, it simply waits until the work started earlier completes. In F#, this operation is already available as Async.StartChild.

If you want to play with the example, try modifying the code by changing the good URL to some invalid address. When individual downloads complete, the first two clauses will not match and the computation will have to wait for both downloads and then return None.

Waiting for user-interface events

So far, all of the examples used web page downlaod as an example of asynchronous oparation. However, the support for joinads in asynchronous workflows can be used in much wider range of scenarios. In this section, we look at an example that implements a simple user-interface using async.

The user-interface implements a simple counter and consists of three elements. A label shows the current number of the counter and two buttons can be used to increment or decrement the number. The user-interface interaction can be implemented as a recursive asynchronous loop that keeps the current count as a function argument. In every iteration, it needs to wait until user clicks on one of the buttons.

Import necessary namespaces

/// Creates a label that shows the current count and
/// buttons that increment and decrement the number
let createUserInterface() = (...)

/// Runs the specified workflow on the main
/// user-interface thread of the F# console
let runUserInterface work = (...)

/// Main workflow of the widget - creates the user interface and then
/// starts a recursive async function that implements user interaction.
let guiWorkflow = async {
  let label, inc, dec = createUserInterface() 

  /// Recursive workflow that keeps the current count as an argument
  let rec counter n : Async<unit> = async {
    // Update the text on the label
    label.Text <- sprintf "Count: %d" n
    // Wait for click on one of the two buttons 
    match! Async.AwaitEvent inc.Click, Async.AwaitEvent dec.Click with
    | _, ? -> return! counter (n + 1)
    | ?, _ -> return! counter (n - 1) }

  // Start the counter user interaction
  return! counter 0 }

// Start the main computation on GUI thread
runUserInterface guiWorkflow

The snippet uses the App.Console property (from the FSharp.Console namespace) to access a canvas of the F# Interactive console. The runUserInterface function starts a specified asynchronous workflow on the main Silverlight GUI thread, so that the code running inside the workflow can access the Canvas object.

The main workflow first creates the user interface and gets three objects representing a label and two buttons. Then it starts the recursie asynchronous function counter to handle the interaction. The function uses match! to wait for the first of two events. The workflow created by Async.AwaitEvent completes when the specified event occurs for the first time (and ignores any further occurrences).

The two clauses of match! specify that when any of the events happens, the counter function should be called recursively with incremented or decremented count as the argument. Note that there is a difference between the standard F# pattern _ (underscore), which specifies that value is required, but is then discareded and a pattern ? added by joinads, which specifies that the workflow does not have to complete, in order for a clause to match.

Summary

When using joinads and the match! notation with asynchronous workflows, we get two additional ways of composing asynchronous computations - parallel composition similar to Async.Parallel can be written using match! with multiple arguments and a single clause; non-deterministic choic can be written using multiple clauses that match on distinct arguments. However, it is also possible to combine these two operations, so we can express fairly complex synchronization patterns, such as waiting for any two out of three computations or waiting for a value that matches a certain pattern.

namespace System
namespace System.Net
namespace System.Text
namespace System.Text.RegularExpressions
val extractTitle : string -> string

Full name: TryJoinads.extractTitle


 Extracts the content of the <title> element
val html : string
  type: string
val regTitle : Regex
type Regex =
  class
    new : string -> System.Text.RegularExpressions.Regex
    new : string * System.Text.RegularExpressions.RegexOptions -> System.Text.RegularExpressions.Regex
    member GetGroupNames : unit -> string []
    member GetGroupNumbers : unit -> int []
    member GroupNameFromNumber : int -> string
    member GroupNumberFromName : string -> int
    member IsMatch : string -> bool
    member IsMatch : string * int -> bool
    member Match : string -> System.Text.RegularExpressions.Match
    member Match : string * int -> System.Text.RegularExpressions.Match
    member Match : string * int * int -> System.Text.RegularExpressions.Match
    member Matches : string -> System.Text.RegularExpressions.MatchCollection
    member Matches : string * int -> System.Text.RegularExpressions.MatchCollection
    member Options : System.Text.RegularExpressions.RegexOptions
    member Replace : string * string -> string
    member Replace : string * System.Text.RegularExpressions.MatchEvaluator -> string
    member Replace : string * string * int -> string
    member Replace : string * System.Text.RegularExpressions.MatchEvaluator * int -> string
    member Replace : string * string * int * int -> string
    member Replace : string * System.Text.RegularExpressions.MatchEvaluator * int * int -> string
    member RightToLeft : bool
    member Split : string -> string []
    member Split : string * int -> string []
    member Split : string * int * int -> string []
    member ToString : unit -> string
    static member CacheSize : int with get, set
    static member Escape : string -> string
    static member IsMatch : string * string -> bool
    static member IsMatch : string * string * System.Text.RegularExpressions.RegexOptions -> bool
    static member Match : string * string -> System.Text.RegularExpressions.Match
    static member Match : string * string * System.Text.RegularExpressions.RegexOptions -> System.Text.RegularExpressions.Match
    static member Matches : string * string -> System.Text.RegularExpressions.MatchCollection
    static member Matches : string * string * System.Text.RegularExpressions.RegexOptions -> System.Text.RegularExpressions.MatchCollection
    static member Replace : string * string * string -> string
    static member Replace : string * string * System.Text.RegularExpressions.MatchEvaluator -> string
    static member Replace : string * string * string * System.Text.RegularExpressions.RegexOptions -> string
    static member Replace : string * string * System.Text.RegularExpressions.MatchEvaluator * System.Text.RegularExpressions.RegexOptions -> string
    static member Split : string * string -> string []
    static member Split : string * string * System.Text.RegularExpressions.RegexOptions -> string []
    static member Unescape : string -> string
  end

Full name: System.Text.RegularExpressions.Regex
Regex.Match(input: string) : Match
Regex.Match(input: string, startat: int) : Match
Regex.Match(input: string, beginning: int, length: int) : Match
val downloadTitle : string -> Async<string>

Full name: TryJoinads.downloadTitle


 Asynchronously downloads a page and extracts the title
 (uses a proxy to enable cross-domain downloads)
val url : string
  type: string
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val wc : WebClient
type WebClient =
  class
    new : unit -> System.Net.WebClient
    member AllowReadStreamBuffering : bool with get, set
    member AllowWriteStreamBuffering : bool with get, set
    member BaseAddress : string with get, set
    member CancelAsync : unit -> unit
    member Credentials : System.Net.ICredentials with get, set
    member DownloadStringAsync : System.Uri -> unit
    member DownloadStringAsync : System.Uri * obj -> unit
    member Encoding : System.Text.Encoding with get, set
    member Headers : System.Net.WebHeaderCollection with get, set
    member IsBusy : bool
    member OpenReadAsync : System.Uri -> unit
    member OpenReadAsync : System.Uri * obj -> unit
    member OpenWriteAsync : System.Uri -> unit
    member OpenWriteAsync : System.Uri * string -> unit
    member OpenWriteAsync : System.Uri * string * obj -> unit
    member ResponseHeaders : System.Net.WebHeaderCollection
    member UploadStringAsync : System.Uri * string -> unit
    member UploadStringAsync : System.Uri * string * string -> unit
    member UploadStringAsync : System.Uri * string * string * obj -> unit
    member UseDefaultCredentials : bool with get, set
  end

Full name: System.Net.WebClient
val proxy : string
  type: string
member WebClient.AsyncDownloadString : address:Uri -> Async<string>
type Uri =
  class
    new : string -> System.Uri
    new : string * System.UriKind -> System.Uri
    new : System.Uri * string -> System.Uri
    new : System.Uri * System.Uri -> System.Uri
    member AbsolutePath : string
    member AbsoluteUri : string
    member DnsSafeHost : string
    member Equals : obj -> bool
    member Fragment : string
    member GetComponents : System.UriComponents * System.UriFormat -> string
    member GetHashCode : unit -> int
    member Host : string
    member IsAbsoluteUri : bool
    member IsBaseOf : System.Uri -> bool
    member IsUnc : bool
    member LocalPath : string
    member MakeRelativeUri : System.Uri -> System.Uri
    member OriginalString : string
    member Port : int
    member Query : string
    member Scheme : string
    member ToString : unit -> string
    member UserEscaped : bool
    member UserInfo : string
    static val UriSchemeFile : string
    static val UriSchemeFtp : string
    static val UriSchemeGopher : string
    static val UriSchemeHttp : string
    static val UriSchemeHttps : string
    static val UriSchemeMailto : string
    static val UriSchemeNews : string
    static val UriSchemeNntp : string
    static val UriSchemeNetTcp : string
    static val UriSchemeNetPipe : string
    static val SchemeDelimiter : string
    static member CheckSchemeName : string -> bool
    static member Compare : System.Uri * System.Uri * System.UriComponents * System.UriFormat * System.StringComparison -> int
    static member EscapeDataString : string -> string
    static member EscapeUriString : string -> string
    static member FromHex : char -> int
    static member IsHexDigit : char -> bool
    static member IsWellFormedUriString : string * System.UriKind -> bool
    static member TryCreate : string * System.UriKind * System.Uri -> bool
    static member TryCreate : System.Uri * string * System.Uri -> bool
    static member TryCreate : System.Uri * System.Uri * System.Uri -> bool
    static member UnescapeDataString : string -> string
  end

Full name: System.Uri
namespace FSharp
namespace FSharp.Extensions
namespace FSharp.Extensions.Joinads
val fsharp : string

Full name: TryJoinads.fsharp
  type: string
val csharp : string

Full name: TryJoinads.csharp
  type: string
val titles : Async<unit>

Full name: TryJoinads.titles


 Download titles of two pages in parallel
val title1 : string
  type: string
val title2 : string
  type: string
val printfn : Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Multiple items
module Async

from FSharp.Extensions.Joinads.AsyncTopLevel

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------
type Async =
  class
    static member ParallelImmediate : works:Async<'T> [] -> Async<'T []>
    static member StartChildImmediate : work:Async<'a0> -> Async<Async<'a0>>
    static member WhenAny : works:Async<'T> [] -> Async<'T>
  end

Full name: FSharp.Extensions.Joinads.Async
static member Async.Start : computation:Async<unit> * ?cancellationToken:Threading.CancellationToken -> unit
val main : string

Full name: TryJoinads.main
  type: string
val backup : string

Full name: TryJoinads.backup
  type: string
val getFirst : Async<unit>

Full name: TryJoinads.getFirst


 Start two downloads and return the first available result
val res : string
  type: string
val good : string

Full name: TryJoinads.good
  type: string
val bad : string

Full name: TryJoinads.bad
  type: string
val tryDownloadTitle : string -> Async<string option>

Full name: TryJoinads.tryDownloadTitle


 Wraps 'downloadTitle' with an exception handler and returns
 None if an exception occurs (or Some when download succeeds)
async {
  try
    let! res = downloadTitle url
    return Some res
  with e -> return None }
val tryGetFirst : Async<string option>

Full name: TryJoinads.tryGetFirst


 Try to download first available title. If both downloads
 fail, then the value 'None' is returned.
union case Option.Some: 'T -> Option<'T>
union case Option.None: Option<'T>
val res : string option

Full name: TryJoinads.res
  type: string option
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
open System.Windows
open System.Windows.Controls
open FSharp.Console
open FSharp.Extensions.Joinads
val createUserInterface : unit -> TextBlock * Button * Button

Full name: TryJoinads.createUserInterface


 Creates a label that shows the current count and
 buttons that increment and decrement the number
let addControl (left, top) (ctrl:#UIElement) =
    App.Console.Canvas.Children.Add(ctrl)
    Canvas.SetTop(ctrl, top)
    Canvas.SetLeft(ctrl, left)
    ctrl

  let label = addControl (20.0, 20.0) (TextBlock(FontSize = 20.0))
  let incBtn = addControl (20.0, 60.0) (Button(Content="Increment", Width = 80.0))
  let decBtn = addControl (110.0, 60.0) (Button(Content="Decrement", Width = 80.0))
  label, incBtn, decBtn
val runUserInterface : Async<unit> -> unit

Full name: TryJoinads.runUserInterface


 Runs the specified workflow on the main
 user-interface thread of the F# console
val work : Async<unit>
App.Dispatch (fun() ->
    App.Console.ClearCanvas()
    Async.StartImmediate work
    App.Console.CanvasPosition <- CanvasPosition.Right )
val guiWorkflow : Async<unit>

Full name: TryJoinads.guiWorkflow


 Main workflow of the widget - creates the user interface and then
 starts a recursive async function that implements user interaction.
val label : TextBlock
  type: TextBlock
  inherits: FrameworkElement
  inherits: UIElement
  inherits: DependencyObject
val inc : Button
  type: Button
  inherits: Primitives.ButtonBase
  inherits: ContentControl
  inherits: Control
  inherits: FrameworkElement
  inherits: UIElement
  inherits: DependencyObject
val dec : Button
  type: Button
  inherits: Primitives.ButtonBase
  inherits: ContentControl
  inherits: Control
  inherits: FrameworkElement
  inherits: UIElement
  inherits: DependencyObject
val counter : (int -> Async<unit>)


 Recursive workflow that keeps the current count as an argument
val n : int
  type: int
  inherits: ValueType
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
  type: unit
property TextBlock.Text: string
val sprintf : Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
static member Async.AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
event Primitives.ButtonBase.Click: IEvent<RoutedEventHandler,RoutedEventArgs>