# Code Listings

## Chapter 22: Parallel Programming

PLINQ: AsParallel

```// Calculate prime numbers using a simple (unoptimized) algorithm.

IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);

var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;

int[] primes = parallelQuery.ToArray();
```

When (not) to call .AsParallel

```mySequence.AsParallel()           // Wraps sequence in ParallelQuery<int>
.Where (n => n > 100)   // Outputs another ParallelQuery<int>
.AsParallel()           // Unnecessary - and inefficient!
.Select (n => n * n)
```

Parallel Spellchecker

```if (!File.Exists ("WordLookup.txt"))    // Contains about 150,000 words
"http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");

var wordLookup = new HashSet<string> (
StringComparer.InvariantCultureIgnoreCase);

var random = new Random();
string[] wordList = wordLookup.ToArray();

string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();

wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

var query = wordsToTest
.AsParallel()
.Select  ((word, index) => new IndexedWord { Word=word, Index=index })
.Where   (iword => !wordLookup.Contains (iword.Word))
.OrderBy (iword => iword.Index);

foreach (var mistake in query)
Console.WriteLine (mistake.Word + " - index = " + mistake.Index);
```
`struct IndexedWord { public string Word; public int Index; }`

Using ThreadLocal to parallelize creation of wordlist

```var localRandom = new ThreadLocal<Random>
( () => new Random (Guid.NewGuid().GetHashCode()) );

string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
.Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
.ToArray();
```

Functional purity - wrong

```// The following query multiplies each element by its position.
// Given an input of Enumerable.Range(0,999), it should output squares.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;
```

Functional purity - right

`var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);`

Calling Blocking or I/O-Intensive Functions

```from site in new[]
{
"www.albahari.com",
"www.oreilly.com",
"www.takeonit.com",
"stackoverflow.com",
"www.rebeccarey.com"
}
.AsParallel().WithDegreeOfParallelism(6)
let p = new Ping().Send (site)
select new
{
site,
Result = p.Status,
Time = p.RoundtripTime
}
```

Camera class

```class Camera
{
public Camera (int cameraID) { CameraID = cameraID; }

// Get image from camera: return a simple string rather than an image
public string GetNextFrame()
{
Thread.Sleep (123);       // Simulate time taken to get snapshot
return "Frame from camera " + CameraID;
}
}
```

Frame composition - with PLINQ

```Camera[] cameras = Enumerable.Range (0, 4)    // Create 4 camera objects.
.Select (i => new Camera (i))
.ToArray();

while (true)
{
string[] data = cameras
.AsParallel().AsOrdered().WithDegreeOfParallelism (4)
.Select (c => c.GetNextFrame()).ToArray();

Console.WriteLine (string.Join (", ", data));   // Display data...
}
```

Changing the degree of parallelism

```"The Quick Brown Fox"
.AsParallel().WithDegreeOfParallelism (2)
.Where (c => !char.IsWhiteSpace (c))
.AsParallel().WithDegreeOfParallelism (3)   // Forces Merge + Partition
.Select (c => char.ToUpper (c))
```

Cancellation

```IEnumerable<int> million = Enumerable.Range (3, 1000000);

var cancelSource = new CancellationTokenSource();

from n in million.AsParallel().WithCancellation (cancelSource.Token)
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;

Thread.Sleep (100);      // Cancel query after
cancelSource.Cancel();   // 100 milliseconds.
}
).Start();
try
{
// Start query running:
// We'll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
Console.WriteLine ("Query canceled");
}
```

Optimizing PLINQ: output-side optimization

`"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);`

Forcing chunk partitioning

```int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
Partitioner.Create (numbers, true).AsParallel()
.Where (...)
```

Aggregate operator - recap

```int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n);   // 6
```

Calculating letter frequencies - sequential imperative

```string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index <= 26) letterFrequencies [index]++;
};
```

Calculating letter frequencies - sequential with Aggregate

```int[] result =
text.Aggregate (
new int[26],                // Create the "accumulator"
(letterFrequencies, c) =>   // Aggregate a letter into the accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index <= 26) letterFrequencies [index]++;
return letterFrequencies;
});
```

Calculating letter frequencies - parallel with Aggregate

```int[] result =
text.AsParallel().Aggregate (
() => new int[26],             // Create a new local accumulator

(localFrequencies, c) =>       // Aggregate into the local accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index <= 26) localFrequencies [index]++;
return localFrequencies;
},
// Aggregate local->main accumulator
(mainFreq, localFreq) =>
mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),

finalResult => finalResult     // Perform any final transformation
);                               // on the end result.
```

TPL Parallel Class - Parallel.Invoke

```Parallel.Invoke (
```

Parallel.For

```var keyPairs = new string[6];

Parallel.For (0, keyPairs.Length,
i => keyPairs[i] = RSA.Create().ToXmlString (true));
```

PLINQ solution

```string[] keyPairs =
ParallelEnumerable.Range (0, 6)
.Select (i => RSA.Create().ToXmlString (true))
.ToArray();
```

Parallel.For - outer vs inner loops

```Parallel.For (0, 100, i =>
{
Parallel.For (0, 50, j => Foo (i, j));   // Sequential would be better
});                                        // for the inner loop.
```

Indexed Parallel.ForEach

```Parallel.ForEach ("Hello, world", (c, state, i) =>
{
Console.WriteLine (c.ToString() + i);
});
```

Parallel Spellchecker with TPL

```if (!File.Exists ("WordLookup.txt"))    // Contains about 150,000 words
"http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");

var wordLookup = new HashSet<string> (
StringComparer.InvariantCultureIgnoreCase);

var random = new Random();
string[] wordList = wordLookup.ToArray();

string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();

wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

var misspellings = new ConcurrentBag<Tuple<int,string>>();

Parallel.ForEach (wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains (word))
});
```

ParallelLoopState: Breaking early out of loops

```Parallel.ForEach ("Hello, world", (c, loopState) =>
{
if (c == ',')
loopState.Break();
else
Console.Write (c);
});
```

Optimization with local values

```object locker = new object();
double grandTotal = 0;

Parallel.For (1, 10000000,

() => 0.0,                        // Initialize the local value.

(i, state, localTotal) =>         // Body delegate. Notice that it
localTotal + Math.Sqrt (i),    // returns the new local total.

localTotal =>                                    // Add the local value
{ lock (locker) grandTotal += localTotal; }    // to the master value.
);
```

`Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!"));`

Getting data back:

```Task<string> task = Task.Factory.StartNew<string> (() =>    // Begin task
{
using (var wc = new System.Net.WebClient())
});

RunSomeOtherMethod();         // We can do other work in parallel...

string result = task.Result;  // Wait for task to finish and fetch result.
```

Specifying a state object

```static void Main()
{
}
```

Putting it to better use:

```static void Main()
{
}

static void Greet (string message) { Console.Write (message); }
```

```Task parent = Task.Factory.StartNew (() =>
{
Console.WriteLine ("I am a parent");

{
Console.WriteLine ("I am detached");
});

{
Console.WriteLine ("I am a child");
});
```

```// Assume t1, t2 and t3 are tasks:
var exceptions = new List<Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);
```

```int x = 0;
try
{
Console.WriteLine (calc.Result);
}
catch (AggregateException aex)
{
Console.Write (aex.InnerException.Message);  // Attempted to divide by 0
}
```

Child exceptions bubbling

```TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() =>
{
{
Task.Factory.StartNew (() => { throw null; }, atp);   // Grandchild
}, atp);
});

// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();
```

```var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;

{
// Do some stuff...
token.ThrowIfCancellationRequested();  // Check for cancellation request
// Do some stuff...
}, token);
...
cancelSource.Cancel();
```
```try
{
}
catch (AggregateException ex)
{
if (ex.InnerException is OperationCanceledException)
}
```

Propogating cancellations

```var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;

{
// Pass our cancellation token into a PLINQ query:
var query = someSequence.AsParallel().WithCancellation (token)...
... enumerate query ...
});
```

```Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
```

```Task.Factory.StartNew<int> (() => 8)
.ContinueWith (ant => ant.Result * 2)
.ContinueWith (ant => Math.Sqrt (ant.Result))
.ContinueWith (ant => Console.WriteLine (ant.Result));   // 4
```

Continuations and exceptions

```Task task1 = Task.Factory.StartNew (() => { throw null; });
```

Re-throwing

```Task continuation = Task.Factory.StartNew     (()  => { throw null; })
.ContinueWith (ant =>
{
if (ant.Exception != null) throw ant.Exception;
// Continue processing...
});

continuation.Wait();    // Exception is now thrown back to caller.
```

```Task task1 = Task.Factory.StartNew (() => { throw null; });

```

Ignoring exceptions

```public static void IgnoreExceptions (this Task task)
{
// This could be improved by adding code to log the exception
task.ContinueWith (t => { var ignore = t.Exception; },
}
```

```TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
{
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
```

Conditional continuations

```Task t1 = Task.Factory.StartNew (...);

Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));
```

Continuations with multiple antecedents

```var task1 = Task.Factory.StartNew (() => Console.Write ("X"));

```
```// task1 and task2 would call complex functions in real life:

```

Multiple continuations on a single antecedent

```var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
t.ContinueWith (ant => Console.Write ("X"));
t.ContinueWith (ant => Console.Write ("Y"));
```

```public partial class MyWindow : Window
{
TaskScheduler _uiScheduler;   // Declare this as a field so we can use
// it throughout our class.
public MyWindow()
{
InitializeComponent();

// Get the UI scheduler for the thread that created the form:

.ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
}

string SomeComplexWebService() { ... }
}
```

```var factory = new TaskFactory (
```

```var source = new TaskCompletionSource<int>();

.Start();

```

Working with AggregateException

```try
{
var query = from i in Enumerable.Range (0, 1000000)
select 100 / i;
// Enumerate query
...
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.InnerExceptions)
Console.WriteLine (ex.Message);
}
```

Working with AggregateException - Flatten

```catch (AggregateException aex)
{
foreach (Exception ex in aex.Flatten().InnerExceptions)
myLogWriter.LogException (ex);
}
```

Working with AggregateException - Handle

```var parent = Task.Factory.StartNew (() =>
{
// We’ll throw 3 exceptions at once using 3 child tasks:

int[] numbers = { 0 };

childFactory.StartNew (() => 5 / numbers[0]);   // Division by zero
childFactory.StartNew (() => numbers [1]);      // Index out of range
childFactory.StartNew (() => { throw null; });  // Null reference
});

try { parent.Wait(); }
catch (AggregateException aex)
{
aex.Flatten().Handle (ex =>   // Note that we still need to call Flatten
{
if (ex is DivideByZeroException)
{
Console.WriteLine ("Divide by zero");
return true;                           // This exception is "handled"
}
if (ex is IndexOutOfRangeException)
{
Console.WriteLine ("Index out of range");
return true;                           // This exception is "handled"
}
return false;    // All other exceptions will get rethrown
});
}
```

Concurrent collections - spellchecker example

```var misspellings = new ConcurrentBag<Tuple<int,string>>();

Parallel.ForEach (wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains (word))
});```

Concurrent collections - BlockingCollection

```public class PCQueue : IDisposable
{

public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
}

void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called.

}
}
```

```public class PCQueue : IDisposable
{
class WorkItem
{

public WorkItem (
Action action,
CancellationToken? cancelToken)
{
Action = action;
CancelToken = cancelToken;
}
}

public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
}

{
}

{
}

void Consume()
{
if (workItem.CancelToken.HasValue &&
workItem.CancelToken.Value.IsCancellationRequested)
{
}
else
try
{
workItem.Action();
}
catch (Exception ex)
{
}
}
}
```

SpinLock

```var spinLock = new SpinLock (true);   // Enable owner tracking
bool lockTaken = false;
try
{
spinLock.Enter (ref lockTaken);
// Do stuff...
}
finally
{
if (lockTaken) spinLock.Exit();
}
```

SpinWait

```bool _proceed;
void Test()
{
SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });
...
}
```

Better:

```bool _proceed;
void Test()
{
var spinWait = new SpinWait();
while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }
...
}
```

Lock-free updates with SpinWait and Interlocked.CompareExchange

```int x;

void MultiplyXBy (int factor)
{
var spinWait = new SpinWait();
while (true)
{
int snapshot1 = x;
int calc = snapshot1 * factor;
int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1);
if (snapshot1 == snapshot2) return;   // No one preempted us.
spinWait.SpinOnce();
}
}
```

Generic version

```static void LockFreeUpdate<T> (ref T field, Func <T, T> updateFunction)
where T : class
{
var spinWait = new SpinWait();
while (true)
{
T snapshot1 = field;
T calc = updateFunction (snapshot1);
T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1);
if (snapshot1 == snapshot2) return;
spinWait.SpinOnce();
}
}
```

With immutable class

```class Test
{
ProgressStatus _status = new ProgressStatus (0, "Starting");

class ProgressStatus    // Immutable class
{
```LockFreeUpdate (ref _status,