Code Listings
Chapter 20: Asynchronous Methods
Download Asynchronator (Visual Studio Test Project)
Blocking server:
using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;
public class Server
{
public void Serve (IPAddress address, int port)
{
ThreadPool.SetMinThreads (50, 50); // Refer Chapter 19
TcpListener listener = new TcpListener (address, port);
listener.Start();
while (true)
{
TcpClient c = listener.AcceptTcpClient();
ThreadPool.QueueUserWorkItem (Accept, c);
}
}
void Accept (object clientObject)
{
using (TcpClient client = (TcpClient) clientObject)
using (NetworkStream n = client.GetStream())
{
byte[] data = new byte [5000];
int bytesRead = 0; int chunkSize = 1;
while (bytesRead < data.Length && chunkSize > 0)
bytesRead +=
chunkSize = n.Read
(data, bytesRead, data.Length - bytesRead); // BLOCKS
Array.Reverse (data);
n.Write (data, 0, data.Length); // BLOCKS
}
}
}
Non-blocking server:
using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;
namespace AsynchronousMethods.Nonblocking
{
public class Server
{
public void Serve (IPAddress address, int port)
{
ThreadPool.SetMinThreads (50, 50);
TcpListener listener = new TcpListener (address, port);
listener.Start();
while (true)
{
TcpClient c = listener.AcceptTcpClient();
ThreadPool.QueueUserWorkItem (ReverseEcho, c);
}
}
void ReverseEcho (object client)
{
new ReverseEcho().Begin ((TcpClient)client);
}
}
class ReverseEcho
{
volatile TcpClient _client;
volatile NetworkStream _stream;
byte [] _data = new byte [5000];
volatile int _bytesRead = 0;
internal void Begin (TcpClient c)
{
try
{
_client = c;
_stream = c.GetStream();
Read();
}
catch (Exception ex) { ProcessException (ex); }
}
void Read() // Read in a non-blocking fashion.
{
_stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead,
ReadCallback, null);
}
void ReadCallback (IAsyncResult r)
{
try
{
int chunkSize = _stream.EndRead (r);
_bytesRead += chunkSize;
if (chunkSize > 0 && _bytesRead < _data.Length)
{
Read(); // More data to read!
return;
}
Array.Reverse (_data);
_stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
}
catch (Exception ex) { ProcessException (ex); }
}
void WriteCallback (IAsyncResult r)
{
try { _stream.EndWrite (r); }
catch (Exception ex) { ProcessException (ex); }
Cleanup();
}
void ProcessException (Exception ex)
{
Cleanup();
Console.WriteLine ("Error: " + ex.Message);
}
void Cleanup()
{
if (_stream != null) _stream.Close();
if (_client != null) _client.Close();
}
}
}
Writing asynchronous methods:
public class MessagingServices
{
public static IAsyncResult BeginReverseEcho (TcpClient client,
AsyncCallback callback,
object userState)
{
var re = new ReverseEcho();
re.Begin (client, callback, userState);
return re;
}
public static byte [] EndReverseEcho (IAsyncResult r)
{
return ((ReverseEcho)r).End();
}
}
class ReverseEcho : IAsyncResult
{
volatile TcpClient _client;
volatile NetworkStream _stream;
volatile object _userState;
volatile AsyncCallback _callback;
ManualResetEvent _waitHandle = new ManualResetEvent (false);
volatile int _bytesRead = 0;
byte [] _data = new byte [Program.MessageLength];
volatile Exception _exception;
internal ReverseEcho() { }
// IAsyncResult members:
public object AsyncState { get { return _userState; } }
public WaitHandle AsyncWaitHandle { get { return _waitHandle; } }
public bool CompletedSynchronously { get { return false; } }
public bool IsCompleted
{
get { return _waitHandle.WaitOne (0, false); }
}
internal void Begin (TcpClient c, AsyncCallback callback, object state)
{
_client = c;
_callback = callback;
_userState = state;
try
{
_stream = _client.GetStream();
Read();
}
catch (Exception ex) { ProcessException (ex); }
}
internal byte [] End() // Wait for completion + re-throw any error.
{
AsyncWaitHandle.WaitOne();
AsyncWaitHandle.Close();
if (_exception != null) throw _exception;
return _data;
}
void Read() // This is always called from an exception-handled method
{
_stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead,
ReadCallback, null);
}
void ReadCallback (IAsyncResult r)
{
try
{
int chunkSize = _stream.EndRead (r);
_bytesRead += chunkSize;
if (chunkSize > 0 && _bytesRead < _data.Length)
{
Read(); // More data to read!
return;
}
Array.Reverse (_data);
_stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
}
catch (Exception ex) { ProcessException (ex); }
}
void WriteCallback (IAsyncResult r)
{
try { _stream.EndWrite (r); }
catch (Exception ex) { ProcessException (ex); return; }
Cleanup();
}
void ProcessException (Exception ex)
{
_exception = ex; // This will get re-thrown when the
Cleanup(); // consumer calls the End method.
}
void Cleanup()
{
try
{
if (_stream != null) _stream.Close();
}
catch (Exception ex)
{
if (_exception != null) _exception = ex;
}
// Signal that we're done and fire the callback.
_waitHandle.Set();
if (_callback != null) _callback (this);
}
}
© 2007, O'Reilly Media, Inc. All rights reserved