Chapter 22 - Parallel Programming

PLINQ

Calculating prime numbers

```// Calculate prime numbers using a simple (unoptimized) algorithm.
// This calculates prime numbers between 3 and a million, using all available cores:

IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3);

var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;

int[] primes = parallelQuery.ToArray();
primes.Take(100).Dump();```

Calculating prime numbers - ordered

```// Calculate prime numbers with ordered output.

IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3);

var parallelQuery =
from n in numbers.AsParallel().AsOrdered()
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;

int[] primes = parallelQuery.ToArray();
primes.Take(100).Dump();

// In this example, we could alternatively call OrderBy at the end of the query.```

Parallel spell checker

```string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");

if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
File.WriteAllText (wordLookupFile,
await new HttpClient().GetStringAsync (
"http://www.albahari.com/ispell/allwords.txt"));

var wordLookup = new HashSet<string> (
StringComparer.InvariantCultureIgnoreCase);

var random = new Random();
string[] wordList = wordLookup.ToArray();

string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();

wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

var query = wordsToTest
.AsParallel()
.Select  ((word, index) => (word, index))
.Where (iword => !wordLookup.Contains (iword.word))
.OrderBy (iword => iword.index);

query.Dump();```

Parallel spell checker - enhanced

```void Main()
{
string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");

if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
"http://www.albahari.com/ispell/allwords.txt", wordLookupFile);

var wordLookup = new HashSet<string> (
StringComparer.InvariantCultureIgnoreCase);

string[] wordList = wordLookup.ToArray();

// Here, we're using ThreadLocal to generate a thread-safe random number generator,
// so we can parallelize the building of the wordsToTest array.
( () => new Random (Guid.NewGuid().GetHashCode()) );

string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
.Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
.ToArray();

wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

var query = wordsToTest
.AsParallel()
.Select  ((word, index) => new IndexedWord { Word=word, Index=index })
.Where   (iword => !wordLookup.Contains (iword.Word))
.OrderBy (iword => iword.Index);

query.Dump();
}

struct IndexedWord { public string Word; public int Index; }```

Functional purity

```{
int i = 0;
(from n in Enumerable.Range(0,999).AsParallel() select n * i++).Dump ("unsafe");
}

{
Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i).Dump ("safe");
}```

Changing degree of parallelism

```"The Quick Brown Fox"
.AsParallel().WithDegreeOfParallelism (2)
.Where (c => !char.IsWhiteSpace (c))
.AsParallel().WithDegreeOfParallelism (3)   // Forces Merge + Partition
.Select (c => char.ToUpper (c))```

Cancellation

```IEnumerable<int> million = Enumerable.Range (3, 1000000);

var cancelSource = new CancellationTokenSource();

from n in million.AsParallel().WithCancellation (cancelSource.Token)
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;

Thread.Sleep (100);      // Cancel query after
cancelSource.Cancel();   // 100 milliseconds.
}
).Start();
try
{
// Start query running:
// We'll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
Console.WriteLine ("Query canceled");
}```

Output-side optimization

`"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);`

Input-side optimization - forcing chunk partitioning

```int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };

var parallelQuery =
Partitioner.Create (numbers, true).AsParallel()
.Where (n => n % 2 == 0);

parallelQuery.Dump();```

Optimizing aggregations - simple use of Aggregate

```int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n);   // 6
sum.Dump();```

Optimizing aggregations - seed factory function (contrived)

```new[] { 1, 2, 3 }.AsParallel().Aggregate (
() => 0,                                     // seedFactory
(localTotal, n) => localTotal + n,           // updateAccumulatorFunc
(mainTot, localTot) => mainTot + localTot,   // combineAccumulatorFunc
finalResult => finalResult)                  // resultSelector```

Optimizing aggregations - letter frequencies imperative

```string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) letterFrequencies [index]++;
};
letterFrequencies.Dump();```

Optimizing aggregations - letter frequencies functional

```string text = "Let’s suppose this is a really long string";

int[] result =
text.Aggregate (
new int[26],                // Create the "accumulator"
(letterFrequencies, c) =>   // Aggregate a letter into the accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) letterFrequencies [index]++;
return letterFrequencies;
});

result.Dump();```

Optimizing aggregations - letter frequencies parallel

```string text = "Let’s suppose this is a really long string";

int[] result =
text.AsParallel().Aggregate (
() => new int[26],             // Create a new local accumulator

(localFrequencies, c) =>       // Aggregate into the local accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) localFrequencies [index]++;
return localFrequencies;
},
// Aggregate local->main accumulator
(mainFreq, localFreq) =>
mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),

finalResult => finalResult     // Perform any final transformation
);                                 // on the end result.

result.Dump();```
The Parallel Class

Parallel.Invoke

```Parallel.Invoke (

Parallel.For

```var keyPairs = new string[6];

Parallel.For (0, keyPairs.Length,
i => keyPairs[i] = RSA.Create().ToXmlString (true));

keyPairs.Dump();```

PLINQ version

```ParallelEnumerable.Range (0, 6)
.Select (i => RSA.Create().ToXmlString (true))
.ToArray()```

Parallel.Foreach - indexed

```Parallel.ForEach ("Hello, world", (c, state, i) =>
{
Console.WriteLine (c.ToString() + i);
});```

Parallel Spellchecker with TPL

```string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");

if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
"http://www.albahari.com/ispell/allwords.txt", wordLookupFile);

var wordLookup = new HashSet<string> (
StringComparer.InvariantCultureIgnoreCase);

var random = new Random();
string[] wordList = wordLookup.ToArray();

string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();

wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

var misspellings = new ConcurrentBag<Tuple<int,string>>();

Parallel.ForEach (wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains (word))
});

misspellings.Dump();```

Breaking early out of loops

```Parallel.ForEach ("Hello, world", (c, loopState) =>
{
if (c == ',')
loopState.Break();
else
Console.Write (c);
});```

Optimization with local values - problem

```object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
i => { lock (locker) total += Math.Sqrt (i); });
total.Dump();```

Optimization with local values - solution

```object locker = new object();
double grandTotal = 0;

Parallel.For (1, 10000000,

() => 0.0,                         // Initialize the local value.

(i, state, localTotal) =>          // Body delegate. Notice that it
localTotal + Math.Sqrt (i),    // returns the new local total.

localTotal =>                                      // Add the local value
{ lock (locker) grandTotal += localTotal; }    // to the master value.
);

grandTotal.Dump();```

PLINQ version sum

```ParallelEnumerable.Range (1, 10000000)
.Sum (i => Math.Sqrt (i))```

```// Note: see Chapter 14 for a basic introduction to tasks.

```// You can create "cold" (unstarted) tasks with Task's constructor:

"We can do something else here...".Dump();

Specifying a state object

```static void Main()
{
}

static void Greet (object state) { Console.Write (state); }   // Hello```

Putting the state object to better use

```static void Main()
{
}

static void Greet (string message) { Console.WriteLine (message); }```

```Task parent = Task.Factory.StartNew (() =>
{
Console.WriteLine ("I am a parent");

{
Console.WriteLine ("I am detached");
});

{
Console.WriteLine ("I am a child");
});

parent.Wait();
Console.WriteLine ("Parent completed");```

```TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() =>
{
{
Task.Factory.StartNew (() => { throw null; }, atp);   // Grandchild
}, atp);
});

// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();```

```var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter (500);

{
token.ThrowIfCancellationRequested();  // Check for cancellation request
}, token);

catch (AggregateException ex)
{
Console.WriteLine (ex.InnerException is TaskCanceledException);  // True
}```

Continuations

```Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));

Continuations with return values

```Task.Factory.StartNew<int> (() => 8)
.ContinueWith (ant => ant.Result * 2)
.ContinueWith (ant => Math.Sqrt (ant.Result))
.ContinueWith (ant => Console.WriteLine (ant.Result));   // 4```

Continuations and exceptions

```Task task1 = Task.Factory.StartNew (() => { throw null; });

Continuations - rethrowing antecedent exceptions

```Task continuation = Task.Factory.StartNew     (()  => { throw null; })
.ContinueWith (ant =>
{
if (ant.Exception != null) throw ant.Exception;
// Continue processing...
});

continuation.Wait();    // Exception is now thrown back to caller.```

```Task task1 = Task.Factory.StartNew (() => { throw null; });

error.Wait();```

Continuations - extension to swallow exceptions

```void Main()
{
Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();
}

static class Extensions
{
{
// This could be improved by adding code to log the exception
task.ContinueWith (t => { var ignore = t.Exception; },
}
}```

```TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
{
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),

.Wait(); // throws AggregateException containing three NullReferenceExceptions```

Continuations - conditional

```Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here"));

Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));    // This executes```

Continuations - conditional (solution)

```Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here"));

Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),

Continuations with multiple antecedents

```var task1 = Task.Factory.StartNew (() => Console.Write ("X"));

Continuations with multiple antecedents - collating data

```// task1 and task2 would call complex functions in real life:

Multiple continuations on a single antecedents - collating data

```var t = Task.Factory.StartNew (() => Thread.Sleep (1000));

var c1 = t.ContinueWith (ant => Console.Write ("X"));
var c2 = t.ContinueWith (ant => Console.Write ("Y"));

```void Main()
{
new MyWindow().ShowDialog();
}

public partial class MyWindow : System.Windows.Window
{
Label lblResult = new Label();
TaskScheduler _uiScheduler;   // Declare this as a field so we can use
// it throughout our class.
public MyWindow()
{
InitializeComponent();
}

protected override void OnActivated (EventArgs e)
{
// Get the UI scheduler for the thread that created the form:

.ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
}

void InitializeComponent()
{
lblResult.FontSize = 20;
Content = lblResult;
}

string SomeComplexWebService() { Thread.Sleep (1000); return "Foo"; }
}```

```var factory = new TaskFactory (

Working with AggregateException

AggregateException

```try
{
var query = from i in ParallelEnumerable.Range (0, 1000000)
select 100 / i;
// Enumerate query
query.Dump();
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.InnerExceptions)
Console.WriteLine (ex.Message);
}```

Flatten

```try
{
var query = from i in ParallelEnumerable.Range (0, 1000000)
select 100 / i;
// Enumerate query
query.Dump();
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.Flatten().InnerExceptions)
ex.Dump();
}```

Handle

```var parent = Task.Factory.StartNew (() =>
{
// We’ll throw 3 exceptions at once using 3 child tasks:

int[] numbers = { 0 };

childFactory.StartNew (() => 5 / numbers[0]);   // Division by zero
childFactory.StartNew (() => numbers [1]);      // Index out of range
childFactory.StartNew (() => { throw null; });  // Null reference
});

try { parent.Wait(); }
catch (AggregateException aex)
{
aex.Flatten().Handle (ex =>   // Note that we still need to call Flatten
{
if (ex is DivideByZeroException)
{
Console.WriteLine ("Divide by zero");
return true;                           // This exception is "handled"
}
if (ex is IndexOutOfRangeException)
{
Console.WriteLine ("Index out of range");
return true;                           // This exception is "handled"
}
return false;    // All other exceptions will get rethrown
});
}```
Concurrent Collections

Producer-Consumer Queue

```void Main()
{
using (var q = new PCQueue(1))
{
}
}

public class PCQueue : IDisposable
{

public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
}

void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called.

}
}```

```void Main()
{
using (var pcQ = new PCQueue(1))
{

}
}

public class PCQueue : IDisposable
{

public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
}

public Task Enqueue (Action action, CancellationToken cancelToken = default (CancellationToken))
{
}

CancellationToken cancelToken = default (CancellationToken))
{
}

void Consume()
{
try
{
}
catch (InvalidOperationException) { }  // Race condition
}

}```
EXTRA - Channels

Single Producer - Multiple Consumers

```// The consumer is half as fast as the producer. We compensate by starting two consumers.

Channel<string> channel =
Channel.CreateBounded<string> (new BoundedChannelOptions (1000)
{
// allows the Channel to make optimizing assumptions
SingleWriter = true,
});

var producer = Produce().ContinueWith (_ => channel.Writer.Complete());
var consumer1 = Consume(1);
var consumer2 = Consume(2);

{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync (\$"Msg {i}");
}
Console.WriteLine ("Producer done.");
}

async Task Consume(int id) // We add an ID just to visualize which one processed a given message
{
{
{
Console.WriteLine (\$"Processed on {id}: {data}");
// Simulate processing takes twice as long as producing
}
}
Console.WriteLine (\$"Consumer {id} done.");
}```

Single Producer - Single Consumer

```// The consumer is half as fast as the producer. The producer will finish first.

Channel<string> channel =
Channel.CreateBounded<string> (new BoundedChannelOptions (1000)
{
// allows the Channel to make optimizing assumptions
SingleWriter = true,
});
var producer = Produce().ContinueWith (_ => channel.Writer.Complete());
var consumer = Consume();

{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync (\$"Msg {i}");
}
Console.WriteLine("Producer done.");
}

{
{
{
Console.WriteLine(\$"Processed: {data}");
// Simulate processing takes twice as long as producing
}
}
Console.WriteLine("Consumer done.");
}```
EXTRA - SpinLock and SpinWait

SpinLock

```// See http://www.albahari.com/threading/part5.aspx for the accompanying text on SpinLock and SpinWait

var spinLock = new SpinLock (true);   // Enable owner tracking
bool lockTaken = false;
try
{
spinLock.Enter (ref lockTaken);
// Do stuff...
}
finally
{
if (lockTaken) spinLock.Exit();
}```

SpinWait - SpinUntil

```bool _proceed;

void Main()
{
_proceed = true;
}

void Test()
{
SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });
"Done!".Dump();
}```

SpinWait - SpinOnce

```bool _proceed;

void Main()
{
_proceed = true;
}

void Test()
{
var spinWait = new SpinWait();
while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }
"Done!".Dump();
}```

SpinWait - Lock-free updates with CompareExchange

```int x = 2;

void Main()
{
// We can perform three multiplications on the same variable using 3 concurrent threads
// safely without locks by using SpinWait with Interlocked.CompareExchange.

x.Dump();
}

void MultiplyXBy (int factor)
{
var spinWait = new SpinWait();
while (true)
{
int snapshot1 = x;