Chapter 22 - Parallel Programming

PLINQ

Calculating prime numbers

// Calculate prime numbers using a simple (unoptimized) algorithm.
// This calculates prime numbers between 3 and a million, using all available cores:

IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3);

var parallelQuery = 
  from n in numbers.AsParallel()
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;

int[] primes = parallelQuery.ToArray();
primes.Take(100).Dump();

Calculating prime numbers - ordered

// Calculate prime numbers with ordered output.

IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3);

var parallelQuery = 
  from n in numbers.AsParallel().AsOrdered()
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;

int[] primes = parallelQuery.ToArray();
primes.Take(100).Dump();

// In this example, we could alternatively call OrderBy at the end of the query.

Parallel spell checker

string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");

if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
  File.WriteAllText (wordLookupFile,
    await new HttpClient().GetStringAsync (
      "http://www.albahari.com/ispell/allwords.txt"));

var wordLookup = new HashSet<string> (
  File.ReadAllLines (wordLookupFile),
  StringComparer.InvariantCultureIgnoreCase);

var random = new Random();
string[] wordList = wordLookup.ToArray();

string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();

wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

var query = wordsToTest
  .AsParallel()
  .Select ((word, index) => new IndexedWord { Word = word, Index = index })
  .Where (iword => !wordLookup.Contains (iword.Word))
  .OrderBy (iword => iword.Index);

query.Dump();

struct IndexedWord { public string Word; public int Index; }

Parallel spell checker - enhanced

void Main()
{
  string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");
  
  if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
    new WebClient().DownloadFile (
      "http://www.albahari.com/ispell/allwords.txt", wordLookupFile);
  
  var wordLookup = new HashSet<string> (
    File.ReadAllLines (wordLookupFile),
    StringComparer.InvariantCultureIgnoreCase);
  
  string[] wordList = wordLookup.ToArray();

  // Here, we're using ThreadLocal to generate a thread-safe random number generator,
  // so we can parallelize the building of the wordsToTest array.
  var localRandom = new ThreadLocal<Random>
    ( () => new Random (Guid.NewGuid().GetHashCode()) );

  string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
    .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
    .ToArray();
  
  wordsToTest [12345] = "woozsh";     // Introduce a couple
  wordsToTest [23456] = "wubsie";     // of spelling mistakes.
  
  var query = wordsToTest
    .AsParallel()
    .Select  ((word, index) => new IndexedWord { Word=word, Index=index })
    .Where   (iword => !wordLookup.Contains (iword.Word))
    .OrderBy (iword => iword.Index);
  
  query.Dump();
}

struct IndexedWord { public string Word; public int Index; }

Functional purity

{
  int i = 0;
  (from n in Enumerable.Range(0,999).AsParallel() select n * i++).Dump ("unsafe");
}

{
  Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i).Dump ("safe");
}

Changing degree of parallelism

"The Quick Brown Fox"
  .AsParallel().WithDegreeOfParallelism (2)
  .Where (c => !char.IsWhiteSpace (c))
  .AsParallel().WithDegreeOfParallelism (3)   // Forces Merge + Partition
  .Select (c => char.ToUpper (c))

Cancellation

IEnumerable<int> million = Enumerable.Range (3, 1000000);

var cancelSource = new CancellationTokenSource();

var primeNumberQuery = 
  from n in million.AsParallel().WithCancellation (cancelSource.Token)
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;

new Thread (() => {
          Thread.Sleep (100);      // Cancel query after
          cancelSource.Cancel();   // 100 milliseconds.
          }
       ).Start();
try 
{
  // Start query running:
  int[] primes = primeNumberQuery.ToArray();
  // We'll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
  Console.WriteLine ("Query canceled");
}

Output-side optimization

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);

Input-side optimization - forcing chunk partitioning

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };

var parallelQuery =
  Partitioner.Create (numbers, true).AsParallel()
  .Where (n => n % 2 == 0);
  
parallelQuery.Dump();

Optimizing aggregations - simple use of Aggregate

int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n);   // 6
sum.Dump();

Optimizing aggregations - seed factory function (contrived)

new[] { 1, 2, 3 }.AsParallel().Aggregate (
  () => 0,                                     // seedFactory
  (localTotal, n) => localTotal + n,           // updateAccumulatorFunc
  (mainTot, localTot) => mainTot + localTot,   // combineAccumulatorFunc
  finalResult => finalResult)                  // resultSelector

Optimizing aggregations - letter frequencies imperative

string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
  int index = char.ToUpper (c) - 'A';
  if (index >= 0 && index < 26) letterFrequencies [index]++;
};
letterFrequencies.Dump();

Optimizing aggregations - letter frequencies functional

string text = "Let’s suppose this is a really long string";

int[] result =
  text.Aggregate (
    new int[26],                // Create the "accumulator"
    (letterFrequencies, c) =>   // Aggregate a letter into the accumulator
    {
      int index = char.ToUpper (c) - 'A';
      if (index >= 0 && index < 26) letterFrequencies [index]++;
      return letterFrequencies;
    });

result.Dump();

Optimizing aggregations - letter frequencies parallel

string text = "Let’s suppose this is a really long string";

int[] result =
  text.AsParallel().Aggregate (
    () => new int[26],             // Create a new local accumulator
    
    (localFrequencies, c) =>       // Aggregate into the local accumulator
    {
      int index = char.ToUpper (c) - 'A';
      if (index >= 0 && index < 26) localFrequencies [index]++;
      return localFrequencies;
    },
                    // Aggregate local->main accumulator
    (mainFreq, localFreq) =>
      mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
    
    finalResult => finalResult     // Perform any final transformation
  );                                 // on the end result.

result.Dump();
The Parallel Class

Parallel.Invoke

Parallel.Invoke (
  () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
  () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));

Parallel.For

var keyPairs = new string[6];

Parallel.For (0, keyPairs.Length,
        i => keyPairs[i] = RSA.Create().ToXmlString (true));

keyPairs.Dump();

PLINQ version

ParallelEnumerable.Range (0, 6)
  .Select (i => RSA.Create().ToXmlString (true))
  .ToArray()

Parallel.Foreach - indexed

Parallel.ForEach ("Hello, world", (c, state, i) =>
{
  Console.WriteLine (c.ToString() + i);
});

Parallel Spellchecker with TPL

string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");

if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
  new WebClient().DownloadFile (
    "http://www.albahari.com/ispell/allwords.txt", wordLookupFile);

var wordLookup = new HashSet<string> (
  File.ReadAllLines (wordLookupFile),
  StringComparer.InvariantCultureIgnoreCase);

var random = new Random();
string[] wordList = wordLookup.ToArray();

string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();

wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

var misspellings = new ConcurrentBag<Tuple<int,string>>();

Parallel.ForEach (wordsToTest, (word, state, i) =>
{
  if (!wordLookup.Contains (word))
    misspellings.Add (Tuple.Create ((int) i, word));
});

misspellings.Dump();

Breaking early out of loops

Parallel.ForEach ("Hello, world", (c, loopState) =>
{
  if (c == ',')
    loopState.Break();
  else
    Console.Write (c);
});

Optimization with local values - problem

object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
        i => { lock (locker) total += Math.Sqrt (i); });
total.Dump();

Optimization with local values - solution

object locker = new object();
double grandTotal = 0;

Parallel.For (1, 10000000,

  () => 0.0,                         // Initialize the local value.
  
  (i, state, localTotal) =>          // Body delegate. Notice that it
    localTotal + Math.Sqrt (i),    // returns the new local total.
  
  localTotal =>                                      // Add the local value
    { lock (locker) grandTotal += localTotal; }    // to the master value.
);

grandTotal.Dump();

PLINQ version sum

ParallelEnumerable.Range (1, 10000000)
          .Sum (i => Math.Sqrt (i))
Task Parallelism

Creating and starting tasks

// Note: see Chapter 14 for a basic introduction to tasks.

var task = Task.Run (() => Console.WriteLine ("Hello from a task!"));
task.Wait();  // Wait for task to finish

Decoupling task creation and execution

// You can create "cold" (unstarted) tasks with Task's constructor:
var task = new Task (() => Console.Write ("Hello"));

"We can do something else here...".Dump();

task.Start();

Specifying a state object

static void Main()
{
  var task = Task.Factory.StartNew (Greet, "Hello");
}

static void Greet (object state) { Console.Write (state); }   // Hello

Putting the state object to better use

static void Main()
{
  var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
  Console.WriteLine (task.AsyncState);   // Greeting
}

static void Greet (string message) { Console.WriteLine (message); }

Child tasks

Task parent = Task.Factory.StartNew (() =>
{
  Console.WriteLine ("I am a parent");
  
  Task.Factory.StartNew (() =>        // Detached task
  {
    Console.WriteLine ("I am detached");
  });
  
  Task.Factory.StartNew (() =>        // Child task
  {
    Console.WriteLine ("I am a child");
  }, TaskCreationOptions.AttachedToParent);
});

parent.Wait();
Console.WriteLine ("Parent completed");

Exception-handling child tasks

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() => 
{
  Task.Factory.StartNew (() =>   // Child
  {
    Task.Factory.StartNew (() => { throw null; }, atp);   // Grandchild
  }, atp);
});

// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();

Canceling tasks

var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter (500);

Task task = Task.Factory.StartNew (() => 
{
  Thread.Sleep (1000);
  token.ThrowIfCancellationRequested();  // Check for cancellation request
}, token);

try { task.Wait(); }
catch (AggregateException ex)
{
  Console.WriteLine (ex.InnerException is TaskCanceledException);  // True
  Console.WriteLine (task.IsCanceled);                             // True
  Console.WriteLine (task.Status);                             // Canceled
}

Continuations

Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));

Continuations with return values

Task.Factory.StartNew<int> (() => 8)
  .ContinueWith (ant => ant.Result * 2)
  .ContinueWith (ant => Math.Sqrt (ant.Result))
  .ContinueWith (ant => Console.WriteLine (ant.Result));   // 4

Continuations and exceptions

Task task1 = Task.Factory.StartNew (() => { throw null; });
Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception));

task2.Wait();  // throws an AggregateException

Continuations - rethrowing antecedent exceptions

Task continuation = Task.Factory.StartNew     (()  => { throw null; })
                .ContinueWith (ant =>
  {
    if (ant.Exception != null) throw ant.Exception;
    // Continue processing...
  });

continuation.Wait();    // Exception is now thrown back to caller.

Continuations - exceptions and TaskContinuationOptions

Task task1 = Task.Factory.StartNew (() => { throw null; });

Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
                 TaskContinuationOptions.OnlyOnFaulted);

Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
                TaskContinuationOptions.NotOnFaulted);

error.Wait();

Continuations - extension to swallow exceptions

void Main()
{
  Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();
}

static class Extensions
{
  public static void IgnoreExceptions (this Task task)
  {
    // This could be improved by adding code to log the exception
    task.ContinueWith (t => { var ignore = t.Exception; },
      TaskContinuationOptions.OnlyOnFaulted);
  } 
}

Continuations and child tasks

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
          TaskContinuationOptions.OnlyOnFaulted)

.Wait(); // throws AggregateException containing three NullReferenceExceptions

Continuations - conditional

Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here"));

Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
                TaskContinuationOptions.OnlyOnFaulted);

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));    // This executes

Continuations - conditional (solution)

Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here"));

Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
                TaskContinuationOptions.OnlyOnFaulted);

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
                TaskContinuationOptions.NotOnCanceled);  // Does not execute

Continuations with multiple antecedents

var task1 = Task.Factory.StartNew (() => Console.Write ("X"));
var task2 = Task.Factory.StartNew (() => Console.Write ("Y"));

var continuation = Task.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));

Continuations with multiple antecedents - collating data

// task1 and task2 would call complex functions in real life:
Task<int> task1 = Task.Factory.StartNew (() => 123);
Task<int> task2 = Task.Factory.StartNew (() => 456);

Task<int> task3 = Task<int>.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result));

Console.WriteLine (task3.Result);           // 579

Multiple continuations on a single antecedents - collating data

var t = Task.Factory.StartNew (() => Thread.Sleep (1000));

var c1 = t.ContinueWith (ant => Console.Write ("X"));
var c2 = t.ContinueWith (ant => Console.Write ("Y"));

Task.WaitAll (c1, c2);

Task Schedulers and UIs

void Main()
{
  new MyWindow().ShowDialog();  
}

public partial class MyWindow : System.Windows.Window
{
  Label lblResult = new Label();
  TaskScheduler _uiScheduler;   // Declare this as a field so we can use
                  // it throughout our class.
  public MyWindow()
  {    
    InitializeComponent();    
  }
  
  protected override void OnActivated (EventArgs e)
  {
    // Get the UI scheduler for the thread that created the form:
    _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
  
    Task.Factory.StartNew<string> (SomeComplexWebService)
      .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);    
  }
  
  void InitializeComponent()
  {
    lblResult.FontSize = 20;
    Content = lblResult;
  }
  
  string SomeComplexWebService() { Thread.Sleep (1000); return "Foo"; }
}

Creating your own Task Factories

var factory = new TaskFactory (
  TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
  TaskContinuationOptions.None);

Task task1 = factory.StartNew (() => "foo".Dump());
Task task2 = factory.StartNew (() => "far".Dump());
Working with AggregateException

AggregateException

try
{
  var query = from i in ParallelEnumerable.Range (0, 1000000)
        select 100 / i;
  // Enumerate query
  query.Dump();
}
catch (AggregateException aex)
{
  foreach (Exception ex in aex.InnerExceptions)
    Console.WriteLine (ex.Message);
}

Flatten

try
{
  var query = from i in ParallelEnumerable.Range (0, 1000000)
        select 100 / i;
  // Enumerate query
  query.Dump();
}
catch (AggregateException aex)
{
  foreach (Exception ex in aex.Flatten().InnerExceptions)
    ex.Dump();
}

Handle

var parent = Task.Factory.StartNew (() => 
{
  // We’ll throw 3 exceptions at once using 3 child tasks:
  
  int[] numbers = { 0 };
  
  var childFactory = new TaskFactory
  (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
  
  childFactory.StartNew (() => 5 / numbers[0]);   // Division by zero
  childFactory.StartNew (() => numbers [1]);      // Index out of range
  childFactory.StartNew (() => { throw null; });  // Null reference
});

try { parent.Wait(); }
catch (AggregateException aex)
{
  aex.Flatten().Handle (ex =>   // Note that we still need to call Flatten
  {
    if (ex is DivideByZeroException)
    {
      Console.WriteLine ("Divide by zero");
      return true;                           // This exception is "handled"
    }
    if (ex is IndexOutOfRangeException)
    {
      Console.WriteLine ("Index out of range");
      return true;                           // This exception is "handled"   
    }
    return false;    // All other exceptions will get rethrown
  });
}
Concurrent Collections

Producer-Consumer Queue

void Main()
{
  using (var q = new PCQueue(1))
  {
    q.EnqueueTask (() => "Foo".Dump());
    q.EnqueueTask (() => "Far".Dump());
  }  
}

public class PCQueue : IDisposable
{
  BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
  
  public PCQueue (int workerCount)
  {
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew (Consume);
  }
  
  public void Dispose() { _taskQ.CompleteAdding(); }
  
  public void EnqueueTask (Action action) { _taskQ.Add (action); }
  
  void Consume()
  {
    // This sequence that we’re enumerating will block when no elements
    // are available and will end when CompleteAdding is called.
  
    foreach (Action action in _taskQ.GetConsumingEnumerable())
      action();     // Perform task.
  }  
}

Producer-Consumer Queue - with Tasks

void Main()
{
  using (var pcQ = new PCQueue(1))
  {
    Task task1 = pcQ.Enqueue (() => Console.WriteLine ("Too"));
    Task task2 = pcQ.Enqueue (() => Console.WriteLine ("Easy!"));
    
    task1.ContinueWith (_ => "Task 1 complete".Dump());
    task2.ContinueWith (_ => "Task 2 complete".Dump());
  }  
}

public class PCQueue : IDisposable
{
  BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
  
  public PCQueue (int workerCount)
  {
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
    Task.Factory.StartNew (Consume);
  }
  
  public Task Enqueue (Action action, CancellationToken cancelToken = default (CancellationToken))
  {
    var task = new Task (action, cancelToken);
    _taskQ.Add (task);
    return task;
  }
  
  public Task<TResult> Enqueue<TResult> (Func<TResult> func, 
    CancellationToken cancelToken = default (CancellationToken))
  {
    var task = new Task<TResult> (func, cancelToken);
    _taskQ.Add (task);
    return task;
  }
  
  void Consume()
  {
    foreach (var task in _taskQ.GetConsumingEnumerable())
    try 
    {
      if (!task.IsCanceled) task.RunSynchronously();
    } 
    catch (InvalidOperationException) { }  // Race condition
  }
  
  public void Dispose() { _taskQ.CompleteAdding(); }
}
EXTRA - Channels

Single Producer - Multiple Consumers

// The consumer is half as fast as the producer. We compensate by starting two consumers.

Channel<string> channel =
  Channel.CreateBounded<string> (new BoundedChannelOptions (1000)
  {
    // Specifying SingleReader and/or SingleWriter 
    // allows the Channel to make optimizing assumptions
    SingleReader = false,
    SingleWriter = true,
  });
  
var producer = Produce().ContinueWith (_ => channel.Writer.Complete());
var consumer1 = Consume(1);
var consumer2 = Consume(2);

await Task.WhenAll(consumer1, consumer2);

async Task Produce()
{
  for (int i = 0; i < 10; i++)
  {
    await channel.Writer.WriteAsync ($"Msg {i}");
    await Task.Delay (1000);
  }
  Console.WriteLine ("Producer done.");
}

async Task Consume(int id) // We add an ID just to visualize which one processed a given message
{
  while (await channel.Reader.WaitToReadAsync())
  {
    if (channel.Reader.TryRead (out string data))
    {
      Console.WriteLine ($"Processed on {id}: {data}");
      // Simulate processing takes twice as long as producing
      await Task.Delay (2000);
    }
  }
  Console.WriteLine ($"Consumer {id} done.");
}

Single Producer - Single Consumer

// The consumer is half as fast as the producer. The producer will finish first.

Channel<string> channel =
  Channel.CreateBounded<string> (new BoundedChannelOptions (1000)
  {
    // Specifying SingleReader and/or SingleWriter 
    // allows the Channel to make optimizing assumptions
    SingleReader = true,
    SingleWriter = true,
  });
var producer = Produce().ContinueWith (_ => channel.Writer.Complete());
var consumer = Consume();

async Task Produce()
{
  for (int i = 0; i < 10; i++)
  {
    await channel.Writer.WriteAsync ($"Msg {i}");
    await Task.Delay(1000);
  }
  Console.WriteLine("Producer done.");
}

async Task Consume()
{
  while (await channel.Reader.WaitToReadAsync())
  {
    if (channel.Reader.TryRead(out string data))
    {
      Console.WriteLine($"Processed: {data}");
      // Simulate processing takes twice as long as producing
      await Task.Delay(2000);
    }
  }
  Console.WriteLine("Consumer done.");
}
EXTRA - SpinLock and SpinWait

SpinLock

// See http://www.albahari.com/threading/part5.aspx for the accompanying text on SpinLock and SpinWait

var spinLock = new SpinLock (true);   // Enable owner tracking
bool lockTaken = false;
try
{
  spinLock.Enter (ref lockTaken);
  // Do stuff...
}
finally
{
  if (lockTaken) spinLock.Exit();
}

SpinWait - SpinUntil

bool _proceed;

void Main()
{
  var task = Task.Factory.StartNew (Test);
  Thread.Sleep(1000);
  _proceed = true;
  task.Wait();
}

void Test()
{
  SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });
  "Done!".Dump();
}

SpinWait - SpinOnce

bool _proceed;

void Main()
{
  var task = Task.Run (Test);
  Thread.Sleep(1000);
  _proceed = true;
  task.Wait();
}

void Test()
{
  var spinWait = new SpinWait();
  while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }
  "Done!".Dump();
}

SpinWait - Lock-free updates with CompareExchange

int x = 2;

void Main()
{
  // We can perform three multiplications on the same variable using 3 concurrent threads
  // safely without locks by using SpinWait with Interlocked.CompareExchange.
  
  var task1 = Task.Factory.StartNew (() => MultiplyXBy (3));
  var task2 = Task.Factory.StartNew (() => MultiplyXBy (4));
  var task3 = Task.Factory.StartNew (() => MultiplyXBy (5));
  
  Task.WaitAll (task1, task2, task3);
  x.Dump();
}

void MultiplyXBy (int factor)
{
  var spinWait = new SpinWait();
  while (true)
  {
    int snapshot1 = x;
    Thread.MemoryBarrier();
    int calc = snapshot1 * factor;
    int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1);
    if (snapshot1 == snapshot2) return;   // No one preempted us.
    spinWait.SpinOnce();
  }
}
C# 12 in a Nutshell
Buy from amazon.com Buy print or Kindle edition
Buy from ebooks.com Buy PDF edition
Buy from O'Reilly Read via O'Reilly subscription