Code Listings

Chapter 19: Threading

Getting started:

class ThreadTest
{
  static void Main()
  {
    Thread t = new Thread (WriteY);          // Kick off a new thread
    t.Start();                               // running WriteY()

    // Simultaneously, do something on the main thread.
    for (int i = 0; i < 1000; i++) Console.Write ("x");
  }

  static void WriteY()
  {
    for (int i = 0; i < 1000; i++) Console.Write ("y");
  }
}

Passing data to a thread:

static void Main()
{
  Thread t = new Thread (Print);
  t.Start ("Hello from t!");
  Print ("Hello from the main thread!");
}

static void Print (object messageObj)
{
  string message = (string) messageObj;
  Console.WriteLine (message);
}

The outer variable trap:

static void Main()
{
  string text = "t1";
  Thread t1 = new Thread (delegate() { Print (text); });

  text = "t2";
  Thread t2 = new Thread (delegate() { Print (text); });

  t1.Start();
  t2.Start();
}

static void Print (string message) { Console.WriteLine (message); }

Separate local variable stacks:

static void Main()
{
  new Thread (Go).Start();      // Call Go() on a new thread
  Go();                         // Call Go() on the main thread
}
 
static void Go()
{
  // Declare and use a local variable - 'cycles'
  for (int cycles = 0; cycles < 5; cycles++) Console.Write (cycles);
}

Sharing data through a common field:

class ThreadTest
{
  static void Main()
  {
    Introducer intro = new Introducer();
    intro.Message = "Hello";

    new Thread (intro.Run).Start();

    Console.ReadLine();
    Console.WriteLine (intro.Reply);
  }
}

class Introducer
{
  public string Message;
  public string Reply;

  public void Run()
  {
    Console.WriteLine (Message);
    Reply = "Hi right back!";
  }
}

Thread pooling:

static void Main()
{
  ThreadPool.QueueUserWorkItem (Go);
  ThreadPool.QueueUserWorkItem (Go, 123);
  Console.ReadLine();
}

static void Go (object data)
{
  Console.WriteLine ("Hello from the thread pool! " + data);
}

Optimizing the pool:

static void Main()
{
  for (int i = 0; i < 50; i++) ThreadPool.QueueUserWorkItem (Go);
}

static void Go (object notUsed)
{
  // Compute a hash on a 100,000 byte random byte sequence:
  byte[] data = new byte [100000];
  new Random().NextBytes (data);
  System.Security.Cryptography.SHA1.Create().ComputeHash (data);
}

Exception handling:

public static void Main()
{
   new Thread (Go).Start();
}
 
static void Go()
{
  try
  {
    ...
    throw null;      // this exception will get caught below
    ...
  }
  catch (Exception ex)
  {
    // Typically log the exception, and/or signal another thread
    // that we've come unstuck
    ...
  }
}

Asynchronous delegates:

delegate int WorkInvoker (string text);

static void Main()
{
  WorkInvoker method = Work;
  IAsyncResult cookie = method.BeginInvoke ("test", null, null);
  //
  // ... here's where we can do other work in parallel...
  //
  int result = method.EndInvoke (cookie);
  Console.WriteLine ("String length is: " + result);
}

static int Work (string s) { return s.Length; }

Asynchronous delegates and callbacks:

static void Main()
{
  WorkInvoker method = Work;
  method.BeginInvoke ("test", Done, method);
  // ... 
  //
}

delegate int WorkInvoker (string text);

static int Work (string s) { return s.Length; }

static void Done (IAsyncResult cookie)
{
  WorkInvoker method = (WorkInvoker) cookie.AsyncState;
  int result = method.EndInvoke (cookie);
  Console.WriteLine ("String length is: " + result);
}

SimpleThreadState:

public static ThreadState SimpleThreadState (ThreadState ts)
{
  return ts & (ThreadState.Unstarted |
               ThreadState.WaitSleepJoin |
               ThreadState.Stopped);
}

Basic locking:

class ThreadSafe
{
  static object locker = new object();
  static int val1, val2;
 
  static void Go()
  {
    lock (locker)
    {
      if (val2 != 0) Console.WriteLine (val1 / val2);
      val2 = 0;
    }
  }
}

Nested locking:

static object x = new object();
 
static void Main()
{
  lock (x)
  {
     Console.WriteLine ("I have the lock");
     Nest();
     Console.WriteLine ("I still have the lock");
  }
  // Now the lock is released.
}
 
static void Nest()
{
  lock (x) { }
  // We still have the lock on x!
}

Using a Mutex to ensure only once instance of an application can run at once:

class OneAtATimePlease
{
  // Naming a Mutex makes it available computer-wide. Use a name that's
  // unique to your company and application (e.g., include your URL).

  static Mutex mutex = new Mutex (false, "oreilly.com OneAtATimeDemo");
  
  static void Main()
  {
    // Wait a few seconds if contended, in case another instance
    // of the program is still in the process of shutting down.
 
    if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false))
    {
      Console.WriteLine ("Another instance of the app is running. Bye!");
      return;
    }
    try
    {
      Console.WriteLine ("Running. Press Enter to exit");
      Console.ReadLine();
    }
    finally { mutex.ReleaseMutex(); }
  }
}

Semaphore:

class TheClub      // No door lists!
{
  static Semaphore s = new Semaphore (3, 3);   // Available=3; Capacity=3

  static void Main()
  {
    for (int i = 1; i <= 5; i++) new Thread (Enter).Start (i);
  }

  static void Enter (object id)
  {
    Console.WriteLine (id + " wants to enter");
    s.WaitOne();
    Console.WriteLine (id + " is in!");           // Only three threads
    Thread.Sleep (1000 * (int) id);               // can be here at
    Console.WriteLine (id + " is leaving");       // a time.
    s.Release();
  }
}

Thread safety and .NET Framework types:

class ThreadSafe
{
  static List <string> list = new List <string>();
 
  static void Main()
  {
    new Thread (AddItems).Start();
    new Thread (AddItems).Start();
  }
 
  static void AddItems()
  {
    for (int i = 0; i < 100; i++)
      lock (list)
        list.Add ("Item " + list.Count);
 
    string[] items;
    lock (list) items = list.ToArray();
    foreach (string s in items) Console.WriteLine (s);
  }
}

Thread safety in application servers:

static class UserCache
{
  static Dictionary <int, User> _users = new Dictionary <int, User>();

  internal static User GetUser (int id)
  {
    User u = null;

    lock (_users)
      if (_users.TryGetValue (id, out u))
        return u;

    u = RetrieveUser (id);           // Method to retrieve from database;
    lock (_users) _users [id] = u;
    return u;
  }
}

Atomicity and Interlocked:

class Program
{
  static long sum;
 
  static void Main()
  {                                                               // sum 
    // Simple increment/decrement operations:
    Interlocked.Increment (ref sum);                              // 1
    Interlocked.Decrement (ref sum);                              // 0
 
    // Add/subtract a value:
    Interlocked.Add (ref sum, 3);                                 // 3
 
    // Read a 64-bit field:
    Console.WriteLine (Interlocked.Read (ref sum));               // 3
 
    // Write a 64-bit field while reading previous value:
    // (This prints "3" while updating sum to 10)
    Console.WriteLine (Interlocked.Exchange (ref sum, 10));       // 10
 
    // Update a field only if it matches a certain value (10):
    Interlocked.CompareExchange (ref sum, 123, 10);               // 123
  }
}

Memory barriers and volatility:

class Unsafe
{
  static bool endIsNigh, repented;
 
  static void Main()
  {
    new Thread (Wait).Start();        // Start up the spinning waiter
    Thread.Sleep (1000);              // Give it a second to warm up!

    repented = true;
    endIsNigh = true;
  }
  
  static void Wait()
  {
    while (!endIsNigh);               // Spin until endIsNigh
    Console.Write (repented);
  }
}

Basic signaling with an event wait handle:

class BasicWaitHandle
{
  static EventWaitHandle wh = new AutoResetEvent (false);
 
  static void Main()
  {
    new Thread (Waiter).Start();
    Thread.Sleep (1000);                  // Pause for a second...
    wh.Set();                             // Wake up the Waiter.
  }
 
  static void Waiter()
  {
    Console.WriteLine ("Waiting...");
    wh.WaitOne();                        // Wait for notification
    Console.WriteLine ("Notified");
  }
}

Two-way signaling with EventWaitHandle:

class TwoWaySignaling
{
  static EventWaitHandle ready = new AutoResetEvent (false);
  static EventWaitHandle go = new AutoResetEvent (false);

  static volatile string message;         // We must either use volatile
                                          // or lock around this field
  static void Main()
  {
    new Thread (Work).Start();
 
    ready.WaitOne();            // First wait until worker is ready
    message = "ooo";
    go.Set();                   // Tell worker to go!

    ready.WaitOne();                
    message = "ahhh";           // Give the worker another message
    go.Set();

    ready.WaitOne();
    message = null;             // Signal the worker to exit
    go.Set();
  }
 
  static void Work()
  {
    while (true)
    {
      ready.Set();                          // Indicate that we're ready
      go.WaitOne();                         // Wait to be kicked off...
      if (message == null) return;          // Gracefully exit
      Console.WriteLine (message);
    }
  }
}

Pooling wait handles:

class Test
{
  static ManualResetEvent starter = new ManualResetEvent (false);
 
  public static void Main()
  {
    ThreadPool.RegisterWaitForSingleObject (starter, Go, "hello", -1, true);
    Thread.Sleep (5000);
    Console.WriteLine ("Signaling worker...");
    starter.Set();
    Console.ReadLine();
  }
 
  public static void Go (object data, bool timedOut)
  {
    Console.WriteLine ("Started " + data);
    // Perform task...
  }
}

Signaling with Wait and Pulse:

class SimpleWaitPulse
{
  static object locker = new object();
  static bool go;
 
  static void Main()
  {                                // The new thread will block
    new Thread (Work).Start();     // because go==false.

    Console.ReadLine();            // Wait for user to hit Enter

    lock (locker)                  // Let's now wake up the thread by        
    {                              // setting go=true and pulsing.
      go = true;
      Monitor.PulseAll (locker);
    }
  }
 
  static void Work()
  {
    lock (locker)
      while (!go)
        Monitor.Wait (locker);

    Console.WriteLine ("Woken!!!");
  }
}

Producer/Consumer queue:

using System;
using System.Threading;
using System.Collections.Generic;

public class TaskQueue : IDisposable
{
  object locker = new object();
  Thread[] workers;
  Queue<string> taskQ = new Queue<string>();

  public TaskQueue (int workerCount)
  {
    workers = new Thread [workerCount];

    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
      (workers [i] = new Thread (Consume)).Start();
  }

  public void Dispose()
  {
    // Enqueue one null task per worker to make each exit.
    foreach (Thread worker in workers) EnqueueTask (null);
  }

  public void EnqueueTask (string task)
  {
    lock (locker)
    {
      taskQ.Enqueue (task);            // We must pulse because we're
      Monitor.Pulse (locker);          // changing a blocking condition.
    }
  }

  void Consume()
  {
    while (true)                        // Keep consuming until
    {                                   // told otherwise
      string task;
      lock (locker)
      {
        while (taskQ.Count == 0) Monitor.Wait (locker);
        task = taskQ.Dequeue();
      }
      if (task == null) return;         // This signals our exit
      Console.Write (task);             // Perform task.
      Thread.Sleep (1000);              // Simulate time-consuming task
    }
  }
}
static void Main()
{
  using (TaskQueue q = new TaskQueue (2))
  {
    for (int i = 0; i < 10; i++)
    q.EnqueueTask (" Task" + i);
 
    Console.WriteLine ("Enqueued 10 tasks");
    Console.WriteLine ("Waiting for tasks to complete...");
  }

  // Exiting the using statement runs TaskQueue's Dispose method, which
  // shuts down the consumers, after all outstanding tasks are completed.

  Console.WriteLine ("\r\nAll tasks done!");
}

Two-way signaling with Wait/Pulse:

class Solved
{
  static object locker = new object();
  static bool ready, go;

  static void Main()
  {
    new Thread (SaySomething).Start();

    for (int i = 0; i < 5; i++)
      lock (locker)
      {
        while (!ready) Monitor.Wait (locker);
        ready = false;
        go = true;
        Monitor.PulseAll (locker);
      }
  }

  static void SaySomething()
  {
    for (int i = 0; i < 5; i++)
      lock (locker)
      {
        ready = true;
        Monitor.PulseAll (locker);              // Remember that calling
        while (!go) Monitor.Wait (locker);      // Monitor.Wait releases
        go = false;                             // and reacquires the lock.
        Console.WriteLine ("Wassup?");
      }
  }
}

Interrupt:

static void Main()
{
  Thread t = new Thread (delegate()
  {
    try
    {
      Thread.Sleep (Timeout.Infinite);
    }
    catch (ThreadInterruptedException)
    {
      Console.Write ("Forcibly ");
    }
    Console.WriteLine ("Woken!");
  });
 
  t.Start();
  t.Interrupt();
}

Safe cancellation:

class ProLife
{
  public static void Main()
  {
    RulyWorker w = new RulyWorker();
    Thread t = new Thread (w.Work);
    t.Start();
    Thread.Sleep (1000);

    Console.WriteLine ("aborting");
    w.Abort();                       // Safely abort the worker.
    Console.WriteLine ("aborted");
  }
 
  public class RulyWorker
  {
    volatile bool abort;   
    public void Abort() { abort = true; }
 
    public void Work()
    {
      while (true)
      {
        CheckAbort();
        // Do stuff...
        try      { OtherMethod(); }
        finally  { /* any required cleanup */ }
      }
    }
 
    void OtherMethod()
    {
      // Do stuff...
      CheckAbort();
    }
 
    void CheckAbort() { if (abort) Thread.CurrentThread.Abort(); }
  }
}

Local storage:

class Test
{
  // The same LocalDataStoreSlot object can be used across all threads.
  LocalDataStoreSlot secSlot = Thread.GetNamedDataSlot ("securityLevel");
 
  // This property has a separate value on each thread.
  int SecurityLevel
  {
    get
    {
      object data = Thread.GetData (secSlot);
      return data == null ? 0 : (int) data;    // null == uninitialized
    }
    set { Thread.SetData (secSlot, value); }
  }
  ...

Using BackgroundWorker:

using System;
using System.Threading;
using System.ComponentModel;
 
class Program
{
  static BackgroundWorker bw;

  static void Main()
  {
    bw = new BackgroundWorker();
    bw.WorkerReportsProgress = true;
    bw.WorkerSupportsCancellation = true;
    bw.DoWork += bw_DoWork;
    bw.ProgressChanged += bw_ProgressChanged;
    bw.RunWorkerCompleted += bw_RunWorkerCompleted;
 
    bw.RunWorkerAsync ("Hello to worker");
    
    Console.WriteLine ("Press Enter in the next 5 seconds to cancel");
    Console.ReadLine();
    if (bw.IsBusy) bw.CancelAsync();
    Console.ReadLine();
  }
 
  static void bw_DoWork (object sender, DoWorkEventArgs e)
  {
    for (int i = 0; i <= 100; i += 20)
    {
      if (bw.CancellationPending) { e.Cancel = true; return; }
      bw.ReportProgress (i);
      Thread.Sleep (1000);      // Just for the demo... don't go sleeping
    }                           // for real in pooled threads!
                                  
    e.Result = 123;    // This gets passed to RunWorkerCompleted
  }
 
  static void bw_RunWorkerCompleted (object sender,
                                     RunWorkerCompletedEventArgs e)
  {
    if (e.Cancelled)
      Console.WriteLine ("You cancelled!");
    else if (e.Error != null)
      Console.WriteLine ("Worker exception: " + e.Error.ToString());
    else
      Console.WriteLine ("Complete: " + e.Result);      // from DoWork
  }
 
  static void bw_ProgressChanged (object sender,
                                  ProgressChangedEventArgs e)
  {
    Console.WriteLine ("Reached " + e.ProgressPercentage + "%");
  }
}

ReaderWriterLockSlim:

class SlimDemo
{
  static ReaderWriterLockSlim rw = new ReaderWriterLockSlim();
  static List<int> items = new List<int>();
  static Random rand = new Random();

  static void Main()
  {
    new Thread (Read).Start();
    new Thread (Read).Start();
    new Thread (Read).Start();

    new Thread (Write).Start ("A");
    new Thread (Write).Start ("B");
  }

  static void Read()
  {
    while (true)
    {
      rw.EnterReadLock();
      foreach (int i in items) Thread.Sleep (10);
      rw.ExitReadLock();
    }
  }

  static void Write (object threadID)
  {
    while (true)
    {				
      int newNumber = GetRandNum (100);
      rw.EnterWriteLock();
      items.Add (newNumber);
      rw.ExitWriteLock();
      Console.WriteLine ("Thread " + threadID + " added " + newNumber);
      Thread.Sleep (100);
    }
  }

  static int GetRandNum (int max) { lock (rand) return rand.Next (max); }
}

Upgradeable locks with ReaderWriterLockSlim:

class SlimDemo
{
  static ReaderWriterLockSlim rw = new ReaderWriterLockSlim();
  static List<int> items = new List<int>();
  static Random rand = new Random();

  static void Main()
  {
    new Thread (Read).Start();
    new Thread (Read).Start();
    new Thread (Read).Start();

    new Thread (Write).Start ("A");
    new Thread (Write).Start ("B");
  }

  static void Read()
  {
    while (true)
    {
      rw.EnterReadLock();
      foreach (int i in items) Thread.Sleep (10);
      rw.ExitReadLock();
    }
  }

  static void Write (object threadID)
  {
    while (true)
    {
      int newNumber = GetRandNum (100);
      rw.EnterUpgradeableReadLock();
      if (!items.Contains (newNumber))
      {
        rw.EnterWriteLock();
        items.Add (newNumber);
        rw.ExitWriteLock();
        Console.WriteLine ("Thread " + threadID + " added " + newNumber);
      }
      rw.ExitUpgradeableReadLock();
      Thread.Sleep (100);
    }
  }

  static int GetRandNum (int max) { lock (rand) return rand.Next (max); }
}

Lock recursion:

var rw = new ReaderWriterLockSlim();

rw.EnterReadLock();
rw.EnterReadLock();        // Exception thrown
rw.ExitReadLock();
rw.ExitReadLock();
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);

rw.EnterWriteLock();
rw.EnterReadLock();
Console.WriteLine (rw.IsReadLockHeld);     // True
Console.WriteLine (rw.IsWriteLockHeld);    // True
rw.ExitReadLock();
rw.ExitWriteLock();

Using the Threading Timer:

using System;
using System.Threading;
 
class Program
{
  static void Main()
  {
    // First interval = 5000ms; subsequent intervals = 1000ms
    Timer tmr = new Timer (Tick, "tick...", 5000, 1000);
    Console.ReadLine();
    tmr.Dispose();                     // Ends the timer
  }
 
  static void Tick (object data)
  {
    // This runs on a pooled thread
    Console.WriteLine (data);          // Writes "tick..."
  }
}

Using the System Timer:

using System;
using System.Timers;   // Timers namespace rather than Threading
 
class SystemTimer
{
  static void Main()
  {
    Timer tmr = new Timer();       // Doesn't require any args
    tmr.Interval = 500;
    tmr.Elapsed += tmr_Elapsed;    // Uses an event instead of a delegate
    tmr.Start();                   // Start the timer
    Console.ReadLine();
    tmr.Stop();                    // Stop the timer
    Console.ReadLine();
    tmr.Start();                   // Re-start the timer
    Console.ReadLine();
    tmr.Dispose();                 // Permanently stop the timer
  }
 
  static void tmr_Elapsed (object sender, EventArgs e)
  {
    Console.WriteLine ("Tick");
  }
}

© 2007, O'Reilly Media, Inc. All rights reserved

C# 3.0 in a Nutshell
Buy from amazon.com Available now