Using joinads for Task-based parallelism

The implementation of joinad operations for the Task<'T> type is quite similar to the implementation of Async<'T>, because the two types have similar properties. They both produce at most one value (or an exception) and they both take some time to complete.

Just like for asynchronous workflows, pattern matching on multiple computations using match! gives us a parallel composition (with the two tasks running in parallel) and choice between clauses is non-deterministic, depending on which clause completes first.

Unlike asynchronous workflows, the Task<'T> type does not require any support for aliasing. A value of type Task<'T> represents a running computation that can be accessed from multiple parts of program. In this sense, the type Async<'T> is more similar to a function unit -> Task<'T> than to the type Task<'T> itself.

The key difference between tasks and asynchronous workflows is that the latter provides better support for writing non-blocking computations that involve asynchronous long-running operations such as I/O or waiting for a certain event. Tasks are more suitable for high-performance CPU-intensive computations.

Parallel list processing

The main example in this article is a tree processing function that can be used to test whether all values in leafs satisfy a given predicate. However, we'll build the example step-by-step, exploring several other examples along the way.

To generate inputs for testing, we will calculate a list containing several large prime numbers. We will use functional map and filter combinators, but we first implement parallel version of map (in practice, this is already available in F# libraries or in the F# PowerPack, but it is an interesting example of using joinads:

open System.Threading.Tasks
open FSharp.Extensions.Joinads

/// Applies the specified function to all 
/// elements of the input list in parallel.
let parallelMap (f:'T -> 'R) input = 
  // Recursively process list and spawn tasks
  let rec loop input = future {
    match input with 
    | [] -> return []
    | x::xs -> 
       // Process current element & the rest of the list
       match! future { return f x }, loop xs with
       | y, ys -> return y::ys }

  // Start the work and wait until it completes
  (loop input).Result

The function parallelMap contains a nested recursive function loop that returns a task, which is processing a part of the list in parallel. The declaration of loop uses a computation builder future { ... } (which is defined in the FSharp.Extensions.Joinads namespace).

The loop function uses ordinary match to handle the end of the list. If the list is non-empty, it uses match! to perform two tasks in parallel:

When both computations complete, the body of the clause constructs a list to be returned. The return type of loop is Task<list<'T>>, so the body of parallelMap uses the Result property to obtain the processed list.

The following snippet defines a function that tests whether a number is prime and compares the performance of sequential and parallel map function:

// Create a list containing 1000 big numbers
let nums = [ for i in 0L .. 1000L -> i + 5000000000000L ]

/// Tests whether the specified 64 bit int is a prime
let isPrime num = 
  seq { 2L .. int64 (sqrt (float num)) } 
  |> Seq.forall (fun div -> num % div <> 0L)

// Turn on the timing and compare the performance
#time
List.map isPrime nums
parallelMap isPrime nums

When you load the code in F# Interactive, you can select and run the #time directive to enable simple performance measuring. F# Interactive then prints the result of every entered command, together with the time it took to calculate it. On the author's machine, the time needed to run the sequential version is about 8 seconds and the time of the parallel version is about 4.5 seconds.

Building a ballanced tree

Before we can look at tree processing, we need to define a list type and we need to write a function for constructing lists. The following snippet shows a standard binary tree declaration together with a function ballancedOfList that creates ballanced tree from a non-empty list:

type Tree<'T> = 
  | Leaf of 'T 
  | Node of Tree<'T> * Tree<'T>

/// Creates a ballanced tree from a non-empty list
/// (odd elements are added to the left and even to the right)
let rec ballancedOfList list =
  match list with 
  | [] -> failwith "Cannot create tree of empty list"
  | [n] -> Leaf n
  | _ -> 
      // Split the elements into odd and even using their index
      let left, right =
        list |> List.mapi (fun i v -> i, v)
             |> List.partition (fun (i, v) -> i%2 = 0)
      // Create ballanced trees for both parts
      let left, right = List.map snd left, List.map snd right
      Node(ballancedOfList left, ballancedOfList right)

The function is quite simple and it is only shown to make the sample complete. We use it to construct two trees that we'll later want to process. The first tree is generated by taking all primes from the nums list shown earlier and the other contains several additional non-prime numbers:

// Create a list with large prime numbers
let primes = 
  nums |> parallelMap (fun v -> isPrime v, v) 
       |> List.filter fst |> List.map snd
// Create a list with some additional non-primes
let mixed = primes @ [ 2L .. 20L ]

// Created ballanced trees from both lists
let primeTree = ballancedOfList primes
let mixedTree = ballancedOfList mixed

The primeTree tree contains only large prime numbers, so checking if it contains only prime numbers will take relatively long. The mixedTree contains several additional numbers, some of them are not primes. This means that running isPrime on all of the values would take longer, but if we can return the result immediately after we find a non-prime, the processing is likely to complete quite quickly.

Parallel tree processing

Let's now implement a forall function that takes a tree and a predicate and tests whether the predicate holds for all leafs. We use the future { ... } computation builder and we use match! to handle the Node case by checking both sub-trees in parallel:

/// Checks whether the specified predicate 'f'
/// holds for all Leaf elements of the tree.
let forall f tree = 
  let rec loop tree = future { 
    match tree with
    | Leaf v -> return f v 
    | Node(left, right) ->
        // Process left and right branch in parallel
        match! loop left, loop right with
        | l, r -> return l && r }
  // Start the recursive processing & wait for the result
  (loop tree).Result

// Test processing on two sample trees
forall isPrime mixedTree
forall isPrime primeTree

If you run the processing on both mixedTree and primeTree, they will take similarly long time to complete. However, a sequential version of the function would be faster for mixedTree, because it would return false immediately after finding the first non-prime number.

Adding short-circuiting

Implementing the same functionality using Task<'T> sounds difficult, but using joinads, the problem becomes quite simple. We add two additional clauses that handle the case when one branch completes returning false.

Aside from this simple change, we also need to make sure that all remaining tasks, which are not required to complete, get cancelled. The cancellation is implemented by creating a .NET CancellationTokenSource before starting the recursive processing. In the body of loop we then check if the processing has completed and we throw an exception if it has:

open System.Threading

/// Behaves like previous 'forall' function, but returns
/// immediately when one of the branches returns 'false'
let forall f tree = 
  // Create cancellation token for checking
  let cts = new CancellationTokenSource()
  let rec loop tree = future { 
    // Stop processing if the function already returned
    if cts.Token.IsCancellationRequested then 
        failwith "cancelled"
    match tree with
    | Leaf v -> return f v 
    | Node(left, right) ->
        match! loop left, loop right with
        | false, ? -> return false
        | ?, false -> return false
        | l, r -> return l && r }

  // Wait for the result & cancel all pending work
  let res = (loop tree).Result
  cts.Cancel()
  res

// Processing 'mixedTree' is significanlty faster,
// because it returns after first non-prime is found!
forall isPrime mixedTree
forall isPrime primeTree

The changes required to implement short-circuiting are quite small. As already mentioned, we added two clauses with patterns false, ? and ?, false. These will match when one of the computation completes and returns false while the other is still running. When that happens, the function loop can return the final result, but the other task may still continue running.

To actually save CPU power, we need to cancel the other task. This is done using the standard .NET mechanism. After the task that processes the entire tree completes, we call cts.Cancel() to trigger the cancellation. All tasks that are started from that point will throw an exception (which is okay, because non-deterministic choice ignores exceptions if the first computation succeeds).

As a result, the processing of mixedTree is now significantly faster than the processing of primeTree. On the author's machine, the first one requires about 0.3s, while the second takes 4 seconds to complete. You can easily test the performance for different inputs yourself using the #time directive.

Summary

In principle, the implementation of joinad operations for the Task<'T> type is very similar to the implementation for asynchronous workflows as discussed in the previous article. The main difference is that the underlying type is different - tasks are designed for CPU-intensive computations. Therefore the applications in this article were quite different. We used tasks to write a parallel map operation for lists and then to implement forall function for trees. The second was particularly interesting, because joinads make it very easy to implement shortcircuiting behaviour thanks to the non-deterministic choice between clauses.

namespace System
namespace System.Threading
namespace System.Threading.Tasks
namespace FSharp
namespace FSharp.Extensions
namespace FSharp.Extensions.Joinads
val parallelMap : ('T -> 'R) -> 'T list -> 'R list

Full name: TryJoinads.parallelMap


 Applies the specified function to all
 elements of the input list in parallel.
val f : ('T -> 'R)
val input : 'T list
  type: 'T list
val loop : ('T list -> Task<'R list>)
val future : FutureBuilder

Full name: FSharp.Extensions.Joinads.TopLevelValues.future
val x : 'T
val xs : 'T list
  type: 'T list
val y : 'R
val ys : 'R list
  type: 'R list
val nums : int64 list

Full name: TryJoinads.nums
  type: int64 list
val i : int64
  type: int64
  inherits: System.ValueType
val isPrime : int64 -> bool

Full name: TryJoinads.isPrime


 Tests whether the specified 64 bit int is a prime
val num : int64
  type: int64
  inherits: System.ValueType
Multiple items
val seq : seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
  type: seq<'T>
  inherits: System.Collections.IEnumerable
Multiple items
val int64 : 'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
  type: int64<'Measure>
  inherits: System.ValueType


--------------------
type int64 = System.Int64

Full name: Microsoft.FSharp.Core.int64
  type: int64
  inherits: System.ValueType
val sqrt : 'T -> 'U (requires member Sqrt)

Full name: Microsoft.FSharp.Core.Operators.sqrt
Multiple items
val float : 'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
  type: float<'Measure>
  inherits: System.ValueType


--------------------
type float = System.Double

Full name: Microsoft.FSharp.Core.float
  type: float
  inherits: System.ValueType
module Seq

from Microsoft.FSharp.Collections
val forall : ('T -> bool) -> seq<'T> -> bool

Full name: Microsoft.FSharp.Collections.Seq.forall
val div : int64
  type: int64
  inherits: System.ValueType
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of 'T * 'T list
  with
    interface System.Collections.IEnumerable
    interface System.Collections.Generic.IEnumerable<'T>
    member Head : 'T
    member IsEmpty : bool
    member Item : index:int -> 'T with get
    member Length : int
    member Tail : 'T list
    static member Cons : head:'T * tail:'T list -> 'T list
    static member Empty : 'T list
  end

Full name: Microsoft.FSharp.Collections.List<_>
  type: List<'T>
val map : ('T -> 'U) -> 'T list -> 'U list

Full name: Microsoft.FSharp.Collections.List.map
type Tree<'T> =
  | Leaf of 'T
  | Node of Tree<'T> * Tree<'T>

Full name: TryJoinads.Tree<_>
  type: Tree<'T>
union case Tree.Leaf: 'T -> Tree<'T>
union case Tree.Node: Tree<'T> * Tree<'T> -> Tree<'T>
val ballancedOfList : 'a list -> Tree<'a>

Full name: TryJoinads.ballancedOfList


 Creates a ballanced tree from a non-empty list
 (odd elements are added to the left and even to the right)
Multiple items
val list : 'a list
  type: 'a list


--------------------
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
  type: 'T list
val failwith : string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val n : 'a
val left : (int * 'a) list
  type: (int * 'a) list
val right : (int * 'a) list
  type: (int * 'a) list
val mapi : (int -> 'T -> 'U) -> 'T list -> 'U list

Full name: Microsoft.FSharp.Collections.List.mapi
val i : int
  type: int
  inherits: System.ValueType
val v : 'a
val partition : ('T -> bool) -> 'T list -> 'T list * 'T list

Full name: Microsoft.FSharp.Collections.List.partition
val left : 'a list
  type: 'a list
val right : 'a list
  type: 'a list
val snd : ('T1 * 'T2) -> 'T2

Full name: Microsoft.FSharp.Core.Operators.snd
val primes : int64 list

Full name: TryJoinads.primes
  type: int64 list
val v : int64
  type: int64
  inherits: System.ValueType
val filter : ('T -> bool) -> 'T list -> 'T list

Full name: Microsoft.FSharp.Collections.List.filter
val fst : ('T1 * 'T2) -> 'T1

Full name: Microsoft.FSharp.Core.Operators.fst
val mixed : int64 list

Full name: TryJoinads.mixed
  type: int64 list
val primeTree : Tree<int64>

Full name: TryJoinads.primeTree
  type: Tree<int64>
val mixedTree : Tree<int64>

Full name: TryJoinads.mixedTree
  type: Tree<int64>
val forall : ('a -> bool) -> Tree<'a> -> bool

Full name: TryJoinads.Parallel.forall


 Checks whether the specified predicate 'f'
 holds for all Leaf elements of the tree.
val f : ('a -> bool)
val tree : Tree<'a>
  type: Tree<'a>
val loop : (Tree<'a> -> Task<bool>)
val left : Tree<'a>
  type: Tree<'a>
val right : Tree<'a>
  type: Tree<'a>
val l : bool
  type: bool
  inherits: System.ValueType
val r : bool
  type: bool
  inherits: System.ValueType
val forall : ('a -> bool) -> Tree<'a> -> bool

Full name: TryJoinads.Shortcircuiting.forall


 Behaves like previous 'forall' function, but returns
 immediately when one of the branches returns 'false'
val cts : CancellationTokenSource
  type: CancellationTokenSource
type CancellationTokenSource =
  class
    interface System.IDisposable
    new : unit -> CancellationTokenSource
    member Cancel : unit -> unit
    member Dispose : unit -> unit
    member Token : CancellationToken
    static member CreateLinkedTokenSource : token1:CancellationToken * token2:CancellationToken -> CancellationTokenSource
  end

Full name: System.Threading.CancellationTokenSource
  type: CancellationTokenSource
property CancellationTokenSource.Token: CancellationToken
property CancellationToken.IsCancellationRequested: bool
val res : bool
  type: bool
  inherits: System.ValueType
member CancellationTokenSource.Cancel : unit -> unit