Code Listings
Chapter 22: Parallel Programming
PLINQ: AsParallel
// Calculate prime numbers using a simple (unoptimized) algorithm.
IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);
var parallelQuery = 
  from n in numbers.AsParallel()
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;
int[] primes = parallelQuery.ToArray();
When (not) to call .AsParallel
mySequence.AsParallel()           // Wraps sequence in ParallelQuery<int>
          .Where (n => n > 100)   // Outputs another ParallelQuery<int>
          .AsParallel()           // Unnecessary - and inefficient!
          .Select (n => n * n)
Parallel Spellchecker
if (!File.Exists ("WordLookup.txt"))    // Contains about 150,000 words
  new WebClient().DownloadFile (
    "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
var wordLookup = new HashSet<string> (
  File.ReadAllLines ("WordLookup.txt"),
  StringComparer.InvariantCultureIgnoreCase);
  
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();
wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.
var query = wordsToTest
  .AsParallel()
  .Select  ((word, index) => new IndexedWord { Word=word, Index=index })
  .Where   (iword => !wordLookup.Contains (iword.Word))
  .OrderBy (iword => iword.Index);
foreach (var mistake in query)
  Console.WriteLine (mistake.Word + " - index = " + mistake.Index);
struct IndexedWord { public string Word; public int Index; }
Using ThreadLocal to parallelize creation of wordlist
var localRandom = new ThreadLocal<Random>
 ( () => new Random (Guid.NewGuid().GetHashCode()) );
string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
  .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
  .ToArray();
Functional purity - wrong
// The following query multiplies each element by its position.
// Given an input of Enumerable.Range(0,999), it should output squares.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;
Functional purity - right
var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);
Calling Blocking or I/O-Intensive Functions
from site in new[]
{
  "www.albahari.com",
  "www.linqpad.net",
  "www.oreilly.com",
  "www.takeonit.com",
  "stackoverflow.com",
  "www.rebeccarey.com"  
}
.AsParallel().WithDegreeOfParallelism(6)
let p = new Ping().Send (site)
select new
{
  site,
  Result = p.Status,
  Time = p.RoundtripTime
}
Camera class
class Camera
{
  public readonly int CameraID;
  public Camera (int cameraID) { CameraID = cameraID; }
  // Get image from camera: return a simple string rather than an image
  public string GetNextFrame()
  {
    Thread.Sleep (123);       // Simulate time taken to get snapshot
    return "Frame from camera " + CameraID;
  }
}
Frame composition - with PLINQ
Camera[] cameras = Enumerable.Range (0, 4)    // Create 4 camera objects.
  .Select (i => new Camera (i))
  .ToArray();
while (true)
{
  string[] data = cameras
    .AsParallel().AsOrdered().WithDegreeOfParallelism (4)
    .Select (c => c.GetNextFrame()).ToArray();
  Console.WriteLine (string.Join (", ", data));   // Display data...
}
Changing the degree of parallelism
"The Quick Brown Fox"
  .AsParallel().WithDegreeOfParallelism (2)
  .Where (c => !char.IsWhiteSpace (c))
  .AsParallel().WithDegreeOfParallelism (3)   // Forces Merge + Partition
  .Select (c => char.ToUpper (c))
Cancellation
IEnumerable<int> million = Enumerable.Range (3, 1000000);
var cancelSource = new CancellationTokenSource();
var primeNumberQuery = 
  from n in million.AsParallel().WithCancellation (cancelSource.Token)
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;
new Thread (() => {
                    Thread.Sleep (100);      // Cancel query after
                    cancelSource.Cancel();   // 100 milliseconds.
                  }
           ).Start();
try 
{
  // Start query running:
  int[] primes = primeNumberQuery.ToArray();
  // We'll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
  Console.WriteLine ("Query canceled");
}
Optimizing PLINQ: output-side optimization
"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
Forcing chunk partitioning
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
  Partitioner.Create (numbers, true).AsParallel()
  .Where (...)
Aggregate operator - recap
int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n);   // 6
Calculating letter frequencies - sequential imperative
string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
  int index = char.ToUpper (c) - 'A';
  if (index >= 0 && index <= 26) letterFrequencies [index]++;
};
Calculating letter frequencies - sequential with Aggregate
int[] result =
  text.Aggregate (
    new int[26],                // Create the "accumulator"
    (letterFrequencies, c) =>   // Aggregate a letter into the accumulator
    {
      int index = char.ToUpper (c) - 'A';
      if (index >= 0 && index <= 26) letterFrequencies [index]++;
      return letterFrequencies;
    });
Calculating letter frequencies - parallel with Aggregate
int[] result =
  text.AsParallel().Aggregate (
   () => new int[26],             // Create a new local accumulator
    (localFrequencies, c) =>       // Aggregate into the local accumulator
    {
      int index = char.ToUpper (c) - 'A';
      if (index >= 0 && index <= 26) localFrequencies [index]++;
      return localFrequencies;
    },
                                   // Aggregate local->main accumulator
    (mainFreq, localFreq) =>
      mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
    finalResult => finalResult     // Perform any final transformation
  );                               // on the end result.
TPL Parallel Class - Parallel.Invoke
Parallel.Invoke (
 () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
 () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));
Parallel.For
var keyPairs = new string[6];
Parallel.For (0, keyPairs.Length,
              i => keyPairs[i] = RSA.Create().ToXmlString (true));
PLINQ solution
string[] keyPairs =
  ParallelEnumerable.Range (0, 6)
  .Select (i => RSA.Create().ToXmlString (true))
  .ToArray();
Parallel.For - outer vs inner loops
Parallel.For (0, 100, i =>
{
  Parallel.For (0, 50, j => Foo (i, j));   // Sequential would be better
});                                        // for the inner loop.
Indexed Parallel.ForEach
Parallel.ForEach ("Hello, world", (c, state, i) =>
{
   Console.WriteLine (c.ToString() + i);
});
Parallel Spellchecker with TPL
if (!File.Exists ("WordLookup.txt"))    // Contains about 150,000 words
  new WebClient().DownloadFile (
    "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
var wordLookup = new HashSet<string> (
  File.ReadAllLines ("WordLookup.txt"),
  StringComparer.InvariantCultureIgnoreCase);
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();
wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.
var misspellings = new ConcurrentBag<Tuple<int,string>>();
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
  if (!wordLookup.Contains (word))
    misspellings.Add (Tuple.Create ((int) i, word));
});
ParallelLoopState: Breaking early out of loops 
Parallel.ForEach ("Hello, world", (c, loopState) =>
{
  if (c == ',')
    loopState.Break();
  else
    Console.Write (c);
});
Optimization with local values
object locker = new object();
double grandTotal = 0;
Parallel.For (1, 10000000,
  () => 0.0,                        // Initialize the local value.
  (i, state, localTotal) =>         // Body delegate. Notice that it
     localTotal + Math.Sqrt (i),    // returns the new local total.
  localTotal =>                                    // Add the local value
    { lock (locker) grandTotal += localTotal; }    // to the master value.
);
Tasks - creating and starting tasks
Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!"));
Getting data back:
Task<string> task = Task.Factory.StartNew<string> (() =>    // Begin task
{
  using (var wc = new System.Net.WebClient())
    return wc.DownloadString ("http://www.linqpad.net");
});
RunSomeOtherMethod();         // We can do other work in parallel...
string result = task.Result;  // Wait for task to finish and fetch result.
Specifying a state object
static void Main()
{
  var task = Task.Factory.StartNew (Greet, "Hello");
  task.Wait();  // Wait for task to complete.
}
Putting it to better use:
static void Main()
{
  var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
  Console.WriteLine (task.AsyncState);   // Greeting
  task.Wait();
}
static void Greet (string message) { Console.Write (message); }
Child tasks
Task parent = Task.Factory.StartNew (() =>
{
  Console.WriteLine ("I am a parent");
  Task.Factory.StartNew (() =>        // Detached task
  {
    Console.WriteLine ("I am detached");
  });
  Task.Factory.StartNew (() =>        // Child task
  {
    Console.WriteLine ("I am a child");
  }, TaskCreationOptions.AttachedToParent);
});
Watiing on tasks
// Assume t1, t2 and t3 are tasks:
var exceptions = new List<Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);
Exception-handling tasks
int x = 0;
Task<int> calc = Task.Factory.StartNew (() => 7 / x);
try
{
  Console.WriteLine (calc.Result);
}
catch (AggregateException aex)
{
  Console.Write (aex.InnerException.Message);  // Attempted to divide by 0
}
Child exceptions bubbling
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() => 
{
  Task.Factory.StartNew (() =>   // Child
  {
    Task.Factory.StartNew (() => { throw null; }, atp);   // Grandchild
  }, atp);
});
// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();
Canceling tasks
var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
Task task = Task.Factory.StartNew (() => 
{
  // Do some stuff...
  token.ThrowIfCancellationRequested();  // Check for cancellation request
  // Do some stuff...
}, token);
...
cancelSource.Cancel();
try 
{
  task.Wait();
}
catch (AggregateException ex)
{
  if (ex.InnerException is OperationCanceledException)
    Console.Write ("Task canceled!");
}
Propogating cancellations
var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
Task task = Task.Factory.StartNew (() =>
{
  // Pass our cancellation token into a PLINQ query:
  var query = someSequence.AsParallel().WithCancellation (token)...
  ... enumerate query ...
});
Task continuations
Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));
Continuations and Task<TResult>
Task.Factory.StartNew<int> (() => 8)
  .ContinueWith (ant => ant.Result * 2)
  .ContinueWith (ant => Math.Sqrt (ant.Result))
  .ContinueWith (ant => Console.WriteLine (ant.Result));   // 4
Continuations and exceptions
Task task1 = Task.Factory.StartNew (() => { throw null; });
Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception));
Re-throwing
Task continuation = Task.Factory.StartNew     (()  => { throw null; })
                                .ContinueWith (ant =>
  {
    if (ant.Exception != null) throw ant.Exception;
    // Continue processing...
  });
continuation.Wait();    // Exception is now thrown back to caller.
TaskContinuationOptions
Task task1 = Task.Factory.StartNew (() => { throw null; });
Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
                                 TaskContinuationOptions.OnlyOnFaulted);
Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
                              TaskContinuationOptions.NotOnFaulted);
Ignoring exceptions
public static void IgnoreExceptions (this Task task)
{
  // This could be improved by adding code to log the exception
  task.ContinueWith (t => { var ignore = t.Exception; },
    TaskContinuationOptions.OnlyOnFaulted);
} 
Continuations and child tasks
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
                    TaskContinuationOptions.OnlyOnFaulted);
Conditional continuations
Task t1 = Task.Factory.StartNew (...);
Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
                              TaskContinuationOptions.OnlyOnFaulted);
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));
Continuations with multiple antecedents
var task1 = Task.Factory.StartNew (() => Console.Write ("X"));
var task2 = Task.Factory.StartNew (() => Console.Write ("Y"));
var continuation = Task.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));
// task1 and task2 would call complex functions in real life:
Task<int> task1 = Task.Factory.StartNew (() => 123);
Task<int> task2 = Task.Factory.StartNew (() => 456);
Task<int> task3 = Task.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result));
Console.WriteLine (task3.Result);           // 579
Multiple continuations on a single antecedent
var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
t.ContinueWith (ant => Console.Write ("X"));
t.ContinueWith (ant => Console.Write ("Y"));
Task Schedulers and UIs
public partial class MyWindow : Window
{
  TaskScheduler _uiScheduler;   // Declare this as a field so we can use
                                // it throughout our class.
  public MyWindow()
  {    
    InitializeComponent();
    // Get the UI scheduler for the thread that created the form:
    _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
    Task.Factory.StartNew<string> (SomeComplexWebService)
      .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
  }
  string SomeComplexWebService() { ... }
}
TaskFactory - creating your own
var factory = new TaskFactory (
  TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
  TaskContinuationOptions.None);
TaskCompletionSource
var source = new TaskCompletionSource<int>();
new Thread (() => { Thread.Sleep (5000); source.SetResult (123); })
  .Start();
Task<int> task = source.Task;      // Our "slave" task.
Console.WriteLine (task.Result);   // 123 
Working with AggregateException
try
{
  var query = from i in Enumerable.Range (0, 1000000)
              select 100 / i;
  // Enumerate query
  ...
}
catch (AggregateException aex)
{
  foreach (Exception ex in aex.InnerExceptions)
    Console.WriteLine (ex.Message);
}
Working with AggregateException - Flatten
catch (AggregateException aex)
{
  foreach (Exception ex in aex.Flatten().InnerExceptions)
    myLogWriter.LogException (ex);
}
Working with AggregateException - Handle
var parent = Task.Factory.StartNew (() => 
{
  // We’ll throw 3 exceptions at once using 3 child tasks:
  int[] numbers = { 0 };
  var childFactory = new TaskFactory
   (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
  childFactory.StartNew (() => 5 / numbers[0]);   // Division by zero
  childFactory.StartNew (() => numbers [1]);      // Index out of range
  childFactory.StartNew (() => { throw null; });  // Null reference
});
try { parent.Wait(); }
catch (AggregateException aex)
{
  aex.Flatten().Handle (ex =>   // Note that we still need to call Flatten
  {
    if (ex is DivideByZeroException)
    {
      Console.WriteLine ("Divide by zero");
      return true;                           // This exception is "handled"
    }
    if (ex is IndexOutOfRangeException)
    {
      Console.WriteLine ("Index out of range");
      return true;                           // This exception is "handled"   
    }
    return false;    // All other exceptions will get rethrown
  });
}
Concurrent collections - spellchecker example
var misspellings = new ConcurrentBag<Tuple<int,string>>();
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
  if (!wordLookup.Contains (word))
    misspellings.Add (Tuple.Create ((int) i, word));
});
Concurrent collections - BlockingCollection
public class PCQueue : IDisposable
{
  BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
  public PCQueue (int workerCount)
  {
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew (Consume);
  }
  public void Dispose() { _taskQ.CompleteAdding(); }
  public void EnqueueTask (Action action) { _taskQ.Add (action); }
  void Consume()
  {
    // This sequence that we’re enumerating will block when no elements
    // are available and will end when CompleteAdding is called.
    foreach (Action action in _taskQ.GetConsumingEnumerable())
      action();     // Perform task.
  }
}
BlockingCollection - leveraging TaskCompletionSource
public class PCQueue : IDisposable
{
  class WorkItem
  {
    public readonly TaskCompletionSource<object> TaskSource;
    public readonly Action Action;
    public readonly CancellationToken? CancelToken;
    public WorkItem (
      TaskCompletionSource<object> taskSource,
      Action action,
      CancellationToken? cancelToken)
    {
      TaskSource = taskSource;
      Action = action;
      CancelToken = cancelToken;
    }
  }
  BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();
  public PCQueue (int workerCount)
  {
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew (Consume);
  }
  public void Dispose() { _taskQ.CompleteAdding(); }
  public Task EnqueueTask (Action action) 
  {
    return EnqueueTask (action, null);
  }
  public Task EnqueueTask (Action action, CancellationToken? cancelToken)
  {
    var tcs = new TaskCompletionSource<object>();
    _taskQ.Add (new WorkItem (tcs, action, cancelToken));
    return tcs.Task;
  }
  void Consume()
  {
    foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
      if (workItem.CancelToken.HasValue && 
          workItem.CancelToken.Value.IsCancellationRequested)
      {
        workItem.TaskSource.SetCanceled();
      }
      else
        try
        {
          workItem.Action();
          workItem.TaskSource.SetResult (null);   // Indicate completion
        }
        catch (Exception ex)
        {
          workItem.TaskSource.SetException (ex);
        }
  }
}
SpinLock
var spinLock = new SpinLock (true);   // Enable owner tracking
bool lockTaken = false;
try
{
  spinLock.Enter (ref lockTaken);
  // Do stuff...
}
finally
{
  if (lockTaken) spinLock.Exit();
}
SpinWait
bool _proceed;
void Test()
{
  SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });
  ...
}
Better:
bool _proceed;
void Test()
{
  var spinWait = new SpinWait();
  while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }
  ...
}
Lock-free updates with SpinWait and Interlocked.CompareExchange
int x;
void MultiplyXBy (int factor)
{
  var spinWait = new SpinWait();
  while (true)
  {
    int snapshot1 = x;
    Thread.MemoryBarrier();
    int calc = snapshot1 * factor;
    int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1);
    if (snapshot1 == snapshot2) return;   // No one preempted us.
    spinWait.SpinOnce();
  }
}
Generic version
static void LockFreeUpdate<T> (ref T field, Func <T, T> updateFunction)
  where T : class
{
  var spinWait = new SpinWait();
  while (true)
  {
    T snapshot1 = field;
    T calc = updateFunction (snapshot1);
    T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1);
    if (snapshot1 == snapshot2) return;
    spinWait.SpinOnce();
  }
}
With immutable class
class Test
{
  ProgressStatus _status = new ProgressStatus (0, "Starting");
  class ProgressStatus    // Immutable class
  {
    public readonly int PercentComplete;
    public readonly string StatusMessage;
    public ProgressStatus (int percentComplete, string statusMessage)
    {
      PercentComplete = percentComplete;
      StatusMessage = statusMessage;
    }
  }
}
LockFreeUpdate (ref _status,
  s => new ProgressStatus (s.PercentComplete + 1, s.StatusMessage));