Code Listings
Chapter 19: Threading
Getting started:
class ThreadTest
{
static void Main()
{
Thread t = new Thread (WriteY); // Kick off a new thread
t.Start(); // running WriteY()
// Simultaneously, do something on the main thread.
for (int i = 0; i < 1000; i++) Console.Write ("x");
}
static void WriteY()
{
for (int i = 0; i < 1000; i++) Console.Write ("y");
}
}
Passing data to a thread:
static void Main()
{
Thread t = new Thread (Print);
t.Start ("Hello from t!");
Print ("Hello from the main thread!");
}
static void Print (object messageObj)
{
string message = (string) messageObj;
Console.WriteLine (message);
}
The outer variable trap:
static void Main()
{
string text = "t1";
Thread t1 = new Thread (delegate() { Print (text); });
text = "t2";
Thread t2 = new Thread (delegate() { Print (text); });
t1.Start();
t2.Start();
}
static void Print (string message) { Console.WriteLine (message); }
Separate local variable stacks:
static void Main()
{
new Thread (Go).Start(); // Call Go() on a new thread
Go(); // Call Go() on the main thread
}
static void Go()
{
// Declare and use a local variable - 'cycles'
for (int cycles = 0; cycles < 5; cycles++) Console.Write (cycles);
}
Sharing data through a common field:
class ThreadTest
{
static void Main()
{
Introducer intro = new Introducer();
intro.Message = "Hello";
new Thread (intro.Run).Start();
Console.ReadLine();
Console.WriteLine (intro.Reply);
}
}
class Introducer
{
public string Message;
public string Reply;
public void Run()
{
Console.WriteLine (Message);
Reply = "Hi right back!";
}
}
Thread pooling:
static void Main()
{
ThreadPool.QueueUserWorkItem (Go);
ThreadPool.QueueUserWorkItem (Go, 123);
Console.ReadLine();
}
static void Go (object data)
{
Console.WriteLine ("Hello from the thread pool! " + data);
}
Optimizing the pool:
static void Main()
{
for (int i = 0; i < 50; i++) ThreadPool.QueueUserWorkItem (Go);
}
static void Go (object notUsed)
{
// Compute a hash on a 100,000 byte random byte sequence:
byte[] data = new byte [100000];
new Random().NextBytes (data);
System.Security.Cryptography.SHA1.Create().ComputeHash (data);
}
Exception handling:
public static void Main()
{
new Thread (Go).Start();
}
static void Go()
{
try
{
...
throw null; // this exception will get caught below
...
}
catch (Exception ex)
{
// Typically log the exception, and/or signal another thread
// that we've come unstuck
...
}
}
Asynchronous delegates:
delegate int WorkInvoker (string text);
static void Main()
{
WorkInvoker method = Work;
IAsyncResult cookie = method.BeginInvoke ("test", null, null);
//
// ... here's where we can do other work in parallel...
//
int result = method.EndInvoke (cookie);
Console.WriteLine ("String length is: " + result);
}
static int Work (string s) { return s.Length; }
Asynchronous delegates and callbacks:
static void Main()
{
WorkInvoker method = Work;
method.BeginInvoke ("test", Done, method);
// ...
//
}
delegate int WorkInvoker (string text);
static int Work (string s) { return s.Length; }
static void Done (IAsyncResult cookie)
{
WorkInvoker method = (WorkInvoker) cookie.AsyncState;
int result = method.EndInvoke (cookie);
Console.WriteLine ("String length is: " + result);
}
SimpleThreadState:
public static ThreadState SimpleThreadState (ThreadState ts)
{
return ts & (ThreadState.Unstarted |
ThreadState.WaitSleepJoin |
ThreadState.Stopped);
}
Basic locking:
class ThreadSafe
{
static object locker = new object();
static int val1, val2;
static void Go()
{
lock (locker)
{
if (val2 != 0) Console.WriteLine (val1 / val2);
val2 = 0;
}
}
}
Nested locking:
static object x = new object();
static void Main()
{
lock (x)
{
Console.WriteLine ("I have the lock");
Nest();
Console.WriteLine ("I still have the lock");
}
// Now the lock is released.
}
static void Nest()
{
lock (x) { }
// We still have the lock on x!
}
Using a Mutex to ensure only once instance of an application can run at once:
class OneAtATimePlease
{
// Naming a Mutex makes it available computer-wide. Use a name that's
// unique to your company and application (e.g., include your URL).
static Mutex mutex = new Mutex (false, "oreilly.com OneAtATimeDemo");
static void Main()
{
// Wait a few seconds if contended, in case another instance
// of the program is still in the process of shutting down.
if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false))
{
Console.WriteLine ("Another instance of the app is running. Bye!");
return;
}
try
{
Console.WriteLine ("Running. Press Enter to exit");
Console.ReadLine();
}
finally { mutex.ReleaseMutex(); }
}
}
Semaphore:
class TheClub // No door lists!
{
static Semaphore s = new Semaphore (3, 3); // Available=3; Capacity=3
static void Main()
{
for (int i = 1; i <= 5; i++) new Thread (Enter).Start (i);
}
static void Enter (object id)
{
Console.WriteLine (id + " wants to enter");
s.WaitOne();
Console.WriteLine (id + " is in!"); // Only three threads
Thread.Sleep (1000 * (int) id); // can be here at
Console.WriteLine (id + " is leaving"); // a time.
s.Release();
}
}
Thread safety and .NET Framework types:
class ThreadSafe
{
static List <string> list = new List <string>();
static void Main()
{
new Thread (AddItems).Start();
new Thread (AddItems).Start();
}
static void AddItems()
{
for (int i = 0; i < 100; i++)
lock (list)
list.Add ("Item " + list.Count);
string[] items;
lock (list) items = list.ToArray();
foreach (string s in items) Console.WriteLine (s);
}
}
Thread safety in application servers:
static class UserCache
{
static Dictionary <int, User> _users = new Dictionary <int, User>();
internal static User GetUser (int id)
{
User u = null;
lock (_users)
if (_users.TryGetValue (id, out u))
return u;
u = RetrieveUser (id); // Method to retrieve from database;
lock (_users) _users [id] = u;
return u;
}
}
Atomicity and Interlocked:
class Program
{
static long sum;
static void Main()
{ // sum
// Simple increment/decrement operations:
Interlocked.Increment (ref sum); // 1
Interlocked.Decrement (ref sum); // 0
// Add/subtract a value:
Interlocked.Add (ref sum, 3); // 3
// Read a 64-bit field:
Console.WriteLine (Interlocked.Read (ref sum)); // 3
// Write a 64-bit field while reading previous value:
// (This prints "3" while updating sum to 10)
Console.WriteLine (Interlocked.Exchange (ref sum, 10)); // 10
// Update a field only if it matches a certain value (10):
Interlocked.CompareExchange (ref sum, 123, 10); // 123
}
}
Memory barriers and volatility:
class Unsafe
{
static bool endIsNigh, repented;
static void Main()
{
new Thread (Wait).Start(); // Start up the spinning waiter
Thread.Sleep (1000); // Give it a second to warm up!
repented = true;
endIsNigh = true;
}
static void Wait()
{
while (!endIsNigh); // Spin until endIsNigh
Console.Write (repented);
}
}
Basic signaling with an event wait handle:
class BasicWaitHandle
{
static EventWaitHandle wh = new AutoResetEvent (false);
static void Main()
{
new Thread (Waiter).Start();
Thread.Sleep (1000); // Pause for a second...
wh.Set(); // Wake up the Waiter.
}
static void Waiter()
{
Console.WriteLine ("Waiting...");
wh.WaitOne(); // Wait for notification
Console.WriteLine ("Notified");
}
}
Two-way signaling with EventWaitHandle:
class TwoWaySignaling
{
static EventWaitHandle ready = new AutoResetEvent (false);
static EventWaitHandle go = new AutoResetEvent (false);
static volatile string message; // We must either use volatile
// or lock around this field
static void Main()
{
new Thread (Work).Start();
ready.WaitOne(); // First wait until worker is ready
message = "ooo";
go.Set(); // Tell worker to go!
ready.WaitOne();
message = "ahhh"; // Give the worker another message
go.Set();
ready.WaitOne();
message = null; // Signal the worker to exit
go.Set();
}
static void Work()
{
while (true)
{
ready.Set(); // Indicate that we're ready
go.WaitOne(); // Wait to be kicked off...
if (message == null) return; // Gracefully exit
Console.WriteLine (message);
}
}
}
Pooling wait handles:
class Test
{
static ManualResetEvent starter = new ManualResetEvent (false);
public static void Main()
{
ThreadPool.RegisterWaitForSingleObject (starter, Go, "hello", -1, true);
Thread.Sleep (5000);
Console.WriteLine ("Signaling worker...");
starter.Set();
Console.ReadLine();
}
public static void Go (object data, bool timedOut)
{
Console.WriteLine ("Started " + data);
// Perform task...
}
}
Signaling with Wait and Pulse:
class SimpleWaitPulse
{
static object locker = new object();
static bool go;
static void Main()
{ // The new thread will block
new Thread (Work).Start(); // because go==false.
Console.ReadLine(); // Wait for user to hit Enter
lock (locker) // Let's now wake up the thread by
{ // setting go=true and pulsing.
go = true;
Monitor.PulseAll (locker);
}
}
static void Work()
{
lock (locker)
while (!go)
Monitor.Wait (locker);
Console.WriteLine ("Woken!!!");
}
}
Producer/Consumer queue:
using System;
using System.Threading;
using System.Collections.Generic;
public class TaskQueue : IDisposable
{
object locker = new object();
Thread[] workers;
Queue<string> taskQ = new Queue<string>();
public TaskQueue (int workerCount)
{
workers = new Thread [workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(workers [i] = new Thread (Consume)).Start();
}
public void Dispose()
{
// Enqueue one null task per worker to make each exit.
foreach (Thread worker in workers) EnqueueTask (null);
}
public void EnqueueTask (string task)
{
lock (locker)
{
taskQ.Enqueue (task); // We must pulse because we're
Monitor.Pulse (locker); // changing a blocking condition.
}
}
void Consume()
{
while (true) // Keep consuming until
{ // told otherwise
string task;
lock (locker)
{
while (taskQ.Count == 0) Monitor.Wait (locker);
task = taskQ.Dequeue();
}
if (task == null) return; // This signals our exit
Console.Write (task); // Perform task.
Thread.Sleep (1000); // Simulate time-consuming task
}
}
}
static void Main()
{
using (TaskQueue q = new TaskQueue (2))
{
for (int i = 0; i < 10; i++)
q.EnqueueTask (" Task" + i);
Console.WriteLine ("Enqueued 10 tasks");
Console.WriteLine ("Waiting for tasks to complete...");
}
// Exiting the using statement runs TaskQueue's Dispose method, which
// shuts down the consumers, after all outstanding tasks are completed.
Console.WriteLine ("\r\nAll tasks done!");
}
Two-way signaling with Wait/Pulse:
class Solved
{
static object locker = new object();
static bool ready, go;
static void Main()
{
new Thread (SaySomething).Start();
for (int i = 0; i < 5; i++)
lock (locker)
{
while (!ready) Monitor.Wait (locker);
ready = false;
go = true;
Monitor.PulseAll (locker);
}
}
static void SaySomething()
{
for (int i = 0; i < 5; i++)
lock (locker)
{
ready = true;
Monitor.PulseAll (locker); // Remember that calling
while (!go) Monitor.Wait (locker); // Monitor.Wait releases
go = false; // and reacquires the lock.
Console.WriteLine ("Wassup?");
}
}
}
Interrupt:
static void Main()
{
Thread t = new Thread (delegate()
{
try
{
Thread.Sleep (Timeout.Infinite);
}
catch (ThreadInterruptedException)
{
Console.Write ("Forcibly ");
}
Console.WriteLine ("Woken!");
});
t.Start();
t.Interrupt();
}
Safe cancellation:
class ProLife
{
public static void Main()
{
RulyWorker w = new RulyWorker();
Thread t = new Thread (w.Work);
t.Start();
Thread.Sleep (1000);
Console.WriteLine ("aborting");
w.Abort(); // Safely abort the worker.
Console.WriteLine ("aborted");
}
public class RulyWorker
{
volatile bool abort;
public void Abort() { abort = true; }
public void Work()
{
while (true)
{
CheckAbort();
// Do stuff...
try { OtherMethod(); }
finally { /* any required cleanup */ }
}
}
void OtherMethod()
{
// Do stuff...
CheckAbort();
}
void CheckAbort() { if (abort) Thread.CurrentThread.Abort(); }
}
}
Local storage:
class Test
{
// The same LocalDataStoreSlot object can be used across all threads.
LocalDataStoreSlot secSlot = Thread.GetNamedDataSlot ("securityLevel");
// This property has a separate value on each thread.
int SecurityLevel
{
get
{
object data = Thread.GetData (secSlot);
return data == null ? 0 : (int) data; // null == uninitialized
}
set { Thread.SetData (secSlot, value); }
}
...
Using BackgroundWorker:
using System;
using System.Threading;
using System.ComponentModel;
class Program
{
static BackgroundWorker bw;
static void Main()
{
bw = new BackgroundWorker();
bw.WorkerReportsProgress = true;
bw.WorkerSupportsCancellation = true;
bw.DoWork += bw_DoWork;
bw.ProgressChanged += bw_ProgressChanged;
bw.RunWorkerCompleted += bw_RunWorkerCompleted;
bw.RunWorkerAsync ("Hello to worker");
Console.WriteLine ("Press Enter in the next 5 seconds to cancel");
Console.ReadLine();
if (bw.IsBusy) bw.CancelAsync();
Console.ReadLine();
}
static void bw_DoWork (object sender, DoWorkEventArgs e)
{
for (int i = 0; i <= 100; i += 20)
{
if (bw.CancellationPending) { e.Cancel = true; return; }
bw.ReportProgress (i);
Thread.Sleep (1000); // Just for the demo... don't go sleeping
} // for real in pooled threads!
e.Result = 123; // This gets passed to RunWorkerCompleted
}
static void bw_RunWorkerCompleted (object sender,
RunWorkerCompletedEventArgs e)
{
if (e.Cancelled)
Console.WriteLine ("You cancelled!");
else if (e.Error != null)
Console.WriteLine ("Worker exception: " + e.Error.ToString());
else
Console.WriteLine ("Complete: " + e.Result); // from DoWork
}
static void bw_ProgressChanged (object sender,
ProgressChangedEventArgs e)
{
Console.WriteLine ("Reached " + e.ProgressPercentage + "%");
}
}
ReaderWriterLockSlim:
class SlimDemo
{
static ReaderWriterLockSlim rw = new ReaderWriterLockSlim();
static List<int> items = new List<int>();
static Random rand = new Random();
static void Main()
{
new Thread (Read).Start();
new Thread (Read).Start();
new Thread (Read).Start();
new Thread (Write).Start ("A");
new Thread (Write).Start ("B");
}
static void Read()
{
while (true)
{
rw.EnterReadLock();
foreach (int i in items) Thread.Sleep (10);
rw.ExitReadLock();
}
}
static void Write (object threadID)
{
while (true)
{
int newNumber = GetRandNum (100);
rw.EnterWriteLock();
items.Add (newNumber);
rw.ExitWriteLock();
Console.WriteLine ("Thread " + threadID + " added " + newNumber);
Thread.Sleep (100);
}
}
static int GetRandNum (int max) { lock (rand) return rand.Next (max); }
}
Upgradeable locks with ReaderWriterLockSlim:
class SlimDemo
{
static ReaderWriterLockSlim rw = new ReaderWriterLockSlim();
static List<int> items = new List<int>();
static Random rand = new Random();
static void Main()
{
new Thread (Read).Start();
new Thread (Read).Start();
new Thread (Read).Start();
new Thread (Write).Start ("A");
new Thread (Write).Start ("B");
}
static void Read()
{
while (true)
{
rw.EnterReadLock();
foreach (int i in items) Thread.Sleep (10);
rw.ExitReadLock();
}
}
static void Write (object threadID)
{
while (true)
{
int newNumber = GetRandNum (100);
rw.EnterUpgradeableReadLock();
if (!items.Contains (newNumber))
{
rw.EnterWriteLock();
items.Add (newNumber);
rw.ExitWriteLock();
Console.WriteLine ("Thread " + threadID + " added " + newNumber);
}
rw.ExitUpgradeableReadLock();
Thread.Sleep (100);
}
}
static int GetRandNum (int max) { lock (rand) return rand.Next (max); }
}
Lock recursion:
var rw = new ReaderWriterLockSlim(); rw.EnterReadLock(); rw.EnterReadLock(); // Exception thrown rw.ExitReadLock(); rw.ExitReadLock();
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion); rw.EnterWriteLock(); rw.EnterReadLock(); Console.WriteLine (rw.IsReadLockHeld); // True Console.WriteLine (rw.IsWriteLockHeld); // True rw.ExitReadLock(); rw.ExitWriteLock();
Using the Threading Timer:
using System;
using System.Threading;
class Program
{
static void Main()
{
// First interval = 5000ms; subsequent intervals = 1000ms
Timer tmr = new Timer (Tick, "tick...", 5000, 1000);
Console.ReadLine();
tmr.Dispose(); // Ends the timer
}
static void Tick (object data)
{
// This runs on a pooled thread
Console.WriteLine (data); // Writes "tick..."
}
}
Using the System Timer:
using System;
using System.Timers; // Timers namespace rather than Threading
class SystemTimer
{
static void Main()
{
Timer tmr = new Timer(); // Doesn't require any args
tmr.Interval = 500;
tmr.Elapsed += tmr_Elapsed; // Uses an event instead of a delegate
tmr.Start(); // Start the timer
Console.ReadLine();
tmr.Stop(); // Stop the timer
Console.ReadLine();
tmr.Start(); // Re-start the timer
Console.ReadLine();
tmr.Dispose(); // Permanently stop the timer
}
static void tmr_Elapsed (object sender, EventArgs e)
{
Console.WriteLine ("Tick");
}
}
© 2007, O'Reilly Media, Inc. All rights reserved