Chapter 22 - Advanced Threading

Exclusive Locking

Simple use of lock

static readonly object _locker = new object();
static int _val1, _val2;

static void Main()
{
  for (int i = 1; i <= 1000; i++)
  {
    if (i % 100 == 0) Console.WriteLine ($"Tried {i} times to get DivideByZeroException");
    
    var t1 = new Thread (Go); t1.Start();
    var t2 = new Thread (Go); t2.Start();
    var t3 = new Thread (Go); t3.Start();
    
    t1.Join(); t2.Join(); t3.Join();
  }
}

static void Go()
{
  lock (_locker)  // Threadsafe: will never get DivideByZeroException
  {
    if (_val2 != 0) Console.WriteLine (_val1 / _val2);
    _val2 = 0;
  }
}

Nested locking

static readonly object _locker = new object();

static void Main()
{
  lock (_locker)
  {
   AnotherMethod();
   // We still have the lock - because locks are reentrant.
  }
}

static void AnotherMethod()
{
  lock (_locker) { Console.WriteLine ("Another method"); }
}

Deadlocks

object locker1 = new object();
object locker2 = new object();

new Thread (() => {
          lock (locker1)
          {
            Thread.Sleep (1000);
            lock (locker2) { }      // Deadlock
          }
          }).Start();
lock (locker2)
{
  Thread.Sleep (1000);
  lock (locker1) { }                          // Deadlock
}

Mutex

// To test this in LINQPad, run the query then clone the query (Shift+Ctrl+C) and run the copy at the same time.

static void Main()
{
  // Naming a Mutex makes it available computer-wide. Use a name that's
  // unique to your company and application (e.g., include your URL).
  
  using (var mutex = new Mutex (false, "oreilly.com OneAtATimeDemo"))
  {
    // Wait a few seconds if contended, in case another instance
    // of the program is still in the process of shutting down.
  
    if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false))
    {
      Console.WriteLine ("Another instance of the app is running. Bye!");
      return;
    }
    
    RunProgram();
  }
}

static void RunProgram()
{
  Console.WriteLine ("Running. Press Enter to exit");
  Console.ReadLine();
}
Locking and Thread Safety

Thread safety and Framework types

static List <string> _list = new List <string>();

static void Main()
{
  new Thread (AddItem).Start();
  new Thread (AddItem).Start();
}
  
static void AddItem()
{
  lock (_list) _list.Add ("Item " + _list.Count);
  
  string[] items;
  lock (_list) items = _list.ToArray();
  foreach (string s in items) Console.WriteLine (s);
}

// Note: In LINQPad, press Shift+F5 to clear static variables.

Thread safety in application servers

void Main()
{  
  new Thread (() => UserCache.GetUser (1).Dump()).Start();
  new Thread (() => UserCache.GetUser (1).Dump()).Start();
  new Thread (() => UserCache.GetUser (1).Dump()).Start();
}

static class UserCache
{
  static Dictionary <int, User> _users = new Dictionary <int, User>();
  
  internal static User GetUser (int id)
  {
    User u = null;
    
    lock (_users)
      if (_users.TryGetValue (id, out u))
        return u;
    
    u = RetrieveUser (id);           // Method to retrieve from database;
    lock (_users) _users [id] = u;
    return u;
  }

  static User RetrieveUser (int id)
  {
    Thread.Sleep(1000);  // simulate a time-consuming operation
    return new User { ID = id };
  }
}

class User { public int ID; }

Thread safety in application servers - enhanced

async Task Main()
{  
  new Thread (() => UserCache.GetUserAsync (1).Dump()).Start();
  new Thread (() => UserCache.GetUserAsync (1).Dump()).Start();
  new Thread (() => UserCache.GetUserAsync (1).Dump()).Start();
  
  // You can also await this method:
  User user = await UserCache.GetUserAsync (1);
  user.Dump();
}

static class UserCache
{
  static Dictionary <int, Task<User>> _userTasks = 
     new Dictionary <int, Task<User>>();
  
  internal static Task<User> GetUserAsync (int id)
  {
    lock (_userTasks)
      if (_userTasks.TryGetValue (id, out var userTask))
        return userTask;
      else
        return _userTasks [id] = Task.Run (() => RetrieveUser (id));
  }

  static User RetrieveUser (int id)
  {
    Thread.Sleep(1000);  // simulate a time-consuming operation
    return new User { ID = id };
  }

}

class User { public int ID; }
Non-exclusive Locking

Semaphore

static SemaphoreSlim _sem = new SemaphoreSlim (3);    // Capacity of 3

static void Main()
{
  for (int i = 1; i <= 5; i++) new Thread (Enter).Start (i);
}

static void Enter (object id)
{
  Console.WriteLine (id + " wants to enter");
  _sem.Wait();
  Console.WriteLine (id + " is in!");           // Only three threads
  Thread.Sleep (1000 * (int) id);               // can be here at
  Console.WriteLine (id + " is leaving");       // a time.
  _sem.Release();
}

Async semaphores and locks

SemaphoreSlim _semaphore = new SemaphoreSlim (4);  // 4 downloads at a time

void Main()
{
  Util.AutoScrollResults = true;
  for (int i = 0; i < 50; i++)
  {
    int local = i;
    DownloadWithSemaphoreAsync ("http://someinvaliduri/" + i)
      .ContinueWith (c => ("Finished download " + local).Dump());
  }
}

async Task<byte[]> DownloadWithSemaphoreAsync (string uri)
{
  using (await _semaphore.EnterAsync())
    return await new WebClient().DownloadDataTaskAsync (uri);
}

static class Extensions
{
  public static async Task<IDisposable> EnterAsync (this SemaphoreSlim ss)
  {
    await ss.WaitAsync().ConfigureAwait (false);
    return Disposable.Create (() => ss.Release());
  }
}

ReaderWriterLockSlim

static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static List<int> _items = new List<int>();
static Random _rand = new Random();
  
static void Main()
{
  new Thread (Read).Start();
  new Thread (Read).Start();
  new Thread (Read).Start();
  
  new Thread (Write).Start ("A");
  new Thread (Write).Start ("B");
}
  
static void Read()
{
  while (true)
  {
    _rw.EnterReadLock();
    foreach (int i in _items) Thread.Sleep (10);
    _rw.ExitReadLock();
  }
}
  
static void Write (object threadID)
{
  while (true)
  {
    int newNumber = GetRandNum (100);
    _rw.EnterWriteLock();
    _items.Add (newNumber);
    _rw.ExitWriteLock();
    Console.WriteLine ("Thread " + threadID + " added " + newNumber);
    Thread.Sleep (100);
  }
}
  
static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }

ReaderWriterLockSlim - upgradeable locks

static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static List<int> _items = new List<int>();
static Random _rand = new Random();
  
static void Main()
{
  new Thread (Read).Start();
  new Thread (Read).Start();
  new Thread (Read).Start();
  
  new Thread (Write).Start ("A");
  new Thread (Write).Start ("B");
}
  
static void Read()
{
  while (true)
  {
    _rw.EnterReadLock();
    foreach (int i in _items) Thread.Sleep (10);
    _rw.ExitReadLock();
  }
}
  
static void Write (object threadID)
{
  while (true)
  {
    int newNumber = GetRandNum (100);
    _rw.EnterUpgradeableReadLock();
    if (!_items.Contains (newNumber))
    {
      _rw.EnterWriteLock();
      _items.Add (newNumber);
      _rw.ExitWriteLock();
      Console.WriteLine ("Thread " + threadID + " added " + newNumber);
    }
    _rw.ExitUpgradeableReadLock();
    Thread.Sleep (100);
  }
}
  
static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }

ReaderWriterLockSlim - lock recursion

var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();

rw.EnterWriteLock();
rw.EnterReadLock();
Console.WriteLine (rw.IsReadLockHeld);     // True
Console.WriteLine (rw.IsWriteLockHeld);    // True
rw.ExitReadLock();
rw.ExitWriteLock();
Signaling with Event Wait Handles

AutoResetEvent

static EventWaitHandle _waitHandle = new AutoResetEvent (false);

static void Main()
{
  new Thread (Waiter).Start();
  Thread.Sleep (1000);                  // Pause for a second...
  _waitHandle.Set();                    // Wake up the Waiter.
}
  
static void Waiter()
{
  Console.WriteLine ("Waiting...");
  _waitHandle.WaitOne();                // Wait for notification
  Console.WriteLine ("Notified");
}

Two-way signaling

static EventWaitHandle _ready = new AutoResetEvent (false);
static EventWaitHandle _go = new AutoResetEvent (false);
static readonly object _locker = new object();
static string _message;
  
static void Main()
{
  new Thread (Work).Start();
  
  _ready.WaitOne();                  // First wait until worker is ready
  lock (_locker) _message = "ooo";
  _go.Set();                         // Tell worker to go
  
  _ready.WaitOne();
  lock (_locker) _message = "ahhh";  // Give the worker another message
  _go.Set();
  
  _ready.WaitOne();
  lock (_locker) _message = null;    // Signal the worker to exit
  _go.Set();
}
  
static void Work()
{
  while (true)
  {
    _ready.Set();                          // Indicate that we're ready
    _go.WaitOne();                         // Wait to be kicked off...
    lock (_locker)
    {
      if (_message == null) return;        // Gracefully exit
      Console.WriteLine (_message);
    }
  }
}

CountdownEvent

static CountdownEvent _countdown = new CountdownEvent (3);

static void Main()
{
  new Thread (SaySomething).Start ("I am thread 1");
  new Thread (SaySomething).Start ("I am thread 2");
  new Thread (SaySomething).Start ("I am thread 3");
  _countdown.Wait();   // Blocks until Signal has been called 3 times
  Console.WriteLine ("All threads have finished speaking!");
}

static void SaySomething (object thing)
{
  Thread.Sleep (1000);
  Console.WriteLine (thing);
  _countdown.Signal();
}

Wait Handles and continuations

static ManualResetEvent _starter = new ManualResetEvent (false);

public static void Main()
{
  RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject (_starter, Go, "Some Data", -1, true);
  Thread.Sleep (5000);
  Console.WriteLine ("Signaling worker...");
  _starter.Set();
  Console.ReadLine();
  reg.Unregister (_starter);    // Clean up when we’re done.
}

public static void Go (object data, bool timedOut)
{
  Console.WriteLine ("Started - " + data);
  // Perform task...
}
The Barrier Class

Barrier

static Barrier _barrier = new Barrier (3);

static void Main()
{
  new Thread (Speak).Start();
  new Thread (Speak).Start();
  new Thread (Speak).Start();
}

static void Speak()
{
  for (int i = 0; i < 5; i++)
  {
    Console.Write (i + " ");
    _barrier.SignalAndWait();
  }
}

Barrier - post-phase action

static Barrier _barrier = new Barrier (3, barrier => Console.WriteLine());

static void Main()
{
  new Thread (Speak).Start();
  new Thread (Speak).Start();
  new Thread (Speak).Start();
}

static void Speak()
{
  for (int i = 0; i < 5; i++)
  {
    Console.Write (i + " ");
    _barrier.SignalAndWait();
  }
}
Lazy Initialization

Intro

void Main()
{
  new Foo().Expensive.Dump();
}

class Foo
{
  Expensive _expensive;
  public Expensive Expensive         // Lazily instantiate Expensive
  {
    get
    {
      if (_expensive == null) _expensive = new Expensive();
      return _expensive;
    }
  }
}

class Expensive {  /* Suppose this is expensive to construct */  }

Intro - with lock

void Main()
{
  new Foo().Expensive.Dump();
}

class Foo
{
  Expensive _expensive;
  readonly object _expenseLock = new object();
    
  public Expensive Expensive
  {
    get
    {
      lock (_expenseLock)
      {
        if (_expensive == null) _expensive = new Expensive();
        return _expensive;
      }
    }
  }
}

class Expensive {  /* Suppose this is expensive to construct */  }

Lazy of T

void Main()
{
  new Foo().Expensive.Dump();
}

class Foo
{
  Lazy<Expensive> _expensive = new Lazy<Expensive> (() => new Expensive(), true);
  public Expensive Expensive { get { return _expensive.Value; } }
}

class Expensive {  /* Suppose this is expensive to construct */  }

LazyInitializer

void Main()
{
  new Foo().Expensive.Dump();
}

class Foo
{
  Expensive _expensive;
  public Expensive Expensive
  {                    // Implement double-checked locking
    get 
    { 
      LazyInitializer.EnsureInitialized (ref _expensive, () => new Expensive());
      return _expensive;
    }
  }
}

class Expensive {  /* Suppose this is expensive to construct */  }
Thread-local Storage

ThreadStatic

[ThreadStatic] static int _x;

void Main()
{
  new Thread (() => { Thread.Sleep(1000); _x++; _x.Dump(); }).Start();
  new Thread (() => { Thread.Sleep(2000); _x++; _x.Dump(); }).Start();
  new Thread (() => { Thread.Sleep(3000); _x++; _x.Dump(); }).Start();
}

ThreadLocal

static ThreadLocal<int> _x = new ThreadLocal<int> (() => 3);

void Main()
{
  new Thread (() => { Thread.Sleep(1000); _x.Value++; _x.Dump(); }).Start();
  new Thread (() => { Thread.Sleep(2000); _x.Value++; _x.Dump(); }).Start();
  new Thread (() => { Thread.Sleep(3000); _x.Value++; _x.Dump(); }).Start();
}

GetData and SetData

void Main()
{
  var test = new Test();
  new Thread (() => { Thread.Sleep(1000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start();
  new Thread (() => { Thread.Sleep(2000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start();
  new Thread (() => { Thread.Sleep(3000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start();
}

class Test
{
  // The same LocalDataStoreSlot object can be used across all threads.
  LocalDataStoreSlot _secSlot = Thread.GetNamedDataSlot ("securityLevel");
  
  // This property has a separate value on each thread.
  public int SecurityLevel
  {
    get
    {
      object data = Thread.GetData (_secSlot);
      return data == null ? 0 : (int) data;    // null == uninitialized
    }
    set 
    {
      Thread.SetData (_secSlot, value);
    }
  }
}

AsyncLocal

static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>();

async Task Main()
{
  Thread.CurrentThread.ManagedThreadId.Dump ("Current Thread ID");
  _asyncLocalTest.Value = "test";
  
  await Task.Delay (1000);
  
  Thread.CurrentThread.ManagedThreadId.Dump ("Current Thread ID");
  Console.WriteLine (_asyncLocalTest.Value);
}

AsyncLocal - concurrent

static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>();

void Main()
{
  new Thread (() => Test ("one")).Start();
  new Thread (() => Test ("two")).Start();
}

async void Test (string value)
{
  _asyncLocalTest.Value = value;
  await Task.Delay (1000);
  Console.WriteLine (value + " " + _asyncLocalTest.Value);
}

AsyncLocal - inherited value

static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>();

void Main()
{
  _asyncLocalTest.Value = "test";
  new Thread (AnotherMethod).Start();
}

void AnotherMethod() => Console.WriteLine (_asyncLocalTest.Value);  // test

AsyncLocal - inherited value - copy

static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>();

void Main()
{
  _asyncLocalTest.Value = "test";
  var t = new Thread (AnotherMethod);
  t.Start(); t.Join();
  Console.WriteLine (_asyncLocalTest.Value);   // test  (not ha-ha!)
}

void AnotherMethod() => _asyncLocalTest.Value = "ha-ha!";

AsyncLocal - inherited value - copy - limitation

static AsyncLocal<StringBuilder> _asyncLocalTest = new AsyncLocal<StringBuilder>();

void Main()
{
  _asyncLocalTest.Value = new StringBuilder ("test");
  var t = new Thread (AnotherMethod);
  t.Start(); t.Join();
  Console.WriteLine (_asyncLocalTest.Value.ToString());   // test haha!
}

void AnotherMethod() => _asyncLocalTest.Value.Append (" ha-ha!");
Timers

Multithreaded timers - Threading Timer

static void Main()
{
  // First interval = 5000ms; subsequent intervals = 1000ms
  Timer tmr = new Timer (Tick, "tick...", 5000, 1000);
  Console.WriteLine ("Press Enter to stop");
  Console.ReadLine();
  tmr.Dispose();         // This both stops the timer and cleans up.
}
  
static void Tick (object data)
{
  // This runs on a pooled thread
  Console.WriteLine (data);          // Writes "tick..."
}

Multithreaded timers - System.Timer

static void Main()
{
  var tmr = new System.Timers.Timer();       // Doesn't require any args
  tmr.Interval = 500;
  tmr.Elapsed += tmr_Elapsed;    // Uses an event instead of a delegate
  tmr.Start();                   // Start the timer
  Console.ReadLine();
  tmr.Stop();                    // Stop the timer
  Console.ReadLine();
  tmr.Start();                   // Restart the timer
  Console.ReadLine();
  tmr.Dispose();                 // Permanently stop the timer
}
  
static void tmr_Elapsed (object sender, EventArgs e)
{
  Console.WriteLine ("Tick");
}
EXTRA - Wait and Pulse

Signaling with Wait and Pulse

// See http://www.albahari.com/threading/part4.aspx ("Signaling with Wait and Pulse") for the accompanying text.

static readonly object _locker = new object();
static bool _go;

static void Main()
{                                // The new thread will block because _go==false.
  new Thread (Work).Start();   
  Console.WriteLine ("Press Enter to signal");
  Console.ReadLine();          // Wait for user to hit Enter

  lock (_locker)                 // Let's now wake up the thread by
  {                              // setting _go=true and pulsing.
    _go = true;
    Monitor.Pulse (_locker);
  }
}

static void Work()
{
  lock (_locker)
  while (!_go)
  Monitor.Wait (_locker);    // Lock is released while we’re waiting
  
  Console.WriteLine ("Woken!!!");
}

Now not to use Wait and Pulse

// Non-deterministic!

static readonly object _locker = new object();

static void Main()
{
  new Thread (Work).Start();
  lock (_locker) Monitor.Pulse (_locker);
}
 
static void Work()
{
  lock (_locker) Monitor.Wait (_locker);
  Console.WriteLine ("Woken!!!");
}

Producer-consumer queue

static void Main()
{
  PCQueue q = new PCQueue (2);
  
  Console.WriteLine ("Enqueuing 10 items...");
  
  for (int i = 0; i < 10; i++)
  {
    int itemNumber = i;      // To avoid the captured variable trap
    q.EnqueueItem (() =>
    {
      Thread.Sleep (1000);          // Simulate time-consuming work
      Console.Write (" Task" + itemNumber);
    });
  }
  
  q.Shutdown (true);
  Console.WriteLine();
  Console.WriteLine ("Workers complete!");
}

public class PCQueue
{
  readonly object _locker = new object();
  Thread[] _workers;
  Queue<Action> _itemQ = new Queue<Action>();
  
  public PCQueue (int workerCount)
  {
    _workers = new Thread [workerCount];
  
    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
      (_workers [i] = new Thread (Consume)).Start();
  }
    
  public void Shutdown (bool waitForWorkers)
  {
    // Enqueue one null item per worker to make each exit.
    foreach (Thread worker in _workers)
    EnqueueItem (null);
    
    // Wait for workers to finish
    if (waitForWorkers)
      foreach (Thread worker in _workers)
        worker.Join();
  }
  
  public void EnqueueItem (Action item)
  {
    lock (_locker)
    {
      _itemQ.Enqueue (item);           // We must pulse because we're
      Monitor.Pulse (_locker);         // changing a blocking condition.
    }
  }
  
  void Consume()
  {
    while (true)                        // Keep consuming until
    {                                   // told otherwise.
      Action item;
      lock (_locker)
      {
        while (_itemQ.Count == 0) Monitor.Wait (_locker);
        item = _itemQ.Dequeue();
      }
      if (item == null) return;         // This signals our exit.
      item();                           // Execute item.
    }
  }
}

Two-way signaling and races

static readonly object _locker = new object();
static bool _go;

static void Main()
{
  new Thread (SaySomething).Start();
  
  for (int i = 0; i < 5; i++)
    lock (_locker) 
    {
      _go = true;
      Monitor.PulseAll (_locker); }
    }

static void SaySomething()
{
  for (int i = 0; i < 5; i++)
    lock (_locker)
    {
      while (!_go) Monitor.Wait (_locker);
      _go = false;
      Console.WriteLine ("Wassup?");
    }
}

Two-way signaling and races - solution

static readonly object _locker = new object();
static bool _ready, _go;

static void Main()
{
  new Thread (SaySomething).Start();
  
  for (int i = 0; i < 5; i++)
    lock (_locker)
    {
      while (!_ready) Monitor.Wait (_locker);
      _ready = false;
      _go = true;
      Monitor.PulseAll (_locker);
    }
}

static void SaySomething()
{
  for (int i = 0; i < 5; i++)
    lock (_locker)
    {
      _ready = true;
      Monitor.PulseAll (_locker);           // Remember that calling
      while (!_go) Monitor.Wait (_locker);  // Monitor.Wait releases
      _go = false;                          // and reacquires the lock.
      Console.WriteLine ("Wassup?");
    }
}

Simulating a ManualResetEvent

void Main()
{
  new Thread (() => { Thread.Sleep (2000); Set(); }).Start();
  Console.WriteLine ("Waiting...");
  WaitOne();
  Console.WriteLine ("Signaled");
}

readonly object _locker = new object();
bool _signal;
 
void WaitOne()
{
  lock (_locker)
  {
    while (!_signal) Monitor.Wait (_locker);
  }
}
 
void Set()
{
  lock (_locker) { _signal = true; Monitor.PulseAll (_locker); }
}
 
void Reset() { lock (_locker) _signal = false; }

Writing a CountdownEvent

void Main()
{
  var cd = new Countdown(5);
  new Thread (() =>
  {
    for (int i = 0; i < 5; i++)
    {
      Thread.Sleep(1000);
      cd.Signal();
      Console.WriteLine ("Signal " + i);
    }
  }).Start();
  Console.WriteLine ("Waiting");
  cd.Wait();
  Console.WriteLine ("Unblocked");
}

public class Countdown
{
  object _locker = new object ();
  int _value;
  
  public Countdown() { }
  public Countdown (int initialCount) { _value = initialCount; }
  
  public void Signal() { AddCount (-1); }
  
  public void AddCount (int amount)
  {
    lock (_locker) 
    { 
      _value += amount;
      if (_value <= 0) Monitor.PulseAll (_locker);
    }
  }
  
  public void Wait()
  {
    lock (_locker)
      while (_value > 0)
        Monitor.Wait (_locker);
  }
}

Thread rendezvous

static object _locker = new object();
 
static CountdownEvent _countdown = new CountdownEvent(2);

public static void Main()
{
  // Get each thread to sleep a random amount of time.
  Random r = new Random();
  new Thread (Mate).Start (r.Next (10000));
  Thread.Sleep (r.Next (10000));
  
  _countdown.Signal();
  _countdown.Wait();
  
  Console.Write ("Mate! ");
}

static void Mate (object delay)
{
  Thread.Sleep ((int) delay);
  
  _countdown.Signal();
  _countdown.Wait();
  
  Console.Write ("Mate! ");
}
C# 12 in a Nutshell
Buy from amazon.com Buy print or Kindle edition
Buy from ebooks.com Buy PDF edition
Buy from O'Reilly Read via O'Reilly subscription