Code Listings

Chapter 20: Asynchronous Methods

Download Asynchronator (Visual Studio Test Project)

Blocking server:

using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);    // Refer Chapter 19
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (Accept, c);
    }
  }

  void Accept (object clientObject)
  {
    using (TcpClient client = (TcpClient) clientObject)
    using (NetworkStream n = client.GetStream())
    {
      byte[] data = new byte [5000];

      int bytesRead = 0; int chunkSize = 1;
      while (bytesRead < data.Length && chunkSize > 0)
        bytesRead +=
          chunkSize = n.Read
            (data, bytesRead, data.Length - bytesRead);    // BLOCKS

      Array.Reverse (data);
      n.Write (data, 0, data.Length);                      // BLOCKS
    }
  }
}

Non-blocking server:

using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;

namespace AsynchronousMethods.Nonblocking
{
  public class Server
  {
    public void Serve (IPAddress address, int port)
    {
      ThreadPool.SetMinThreads (50, 50);
      TcpListener listener = new TcpListener (address, port);
      listener.Start();
      while (true)
      {
        TcpClient c = listener.AcceptTcpClient();
        ThreadPool.QueueUserWorkItem (ReverseEcho, c);
      }
    }

    void ReverseEcho (object client)
    {
      new ReverseEcho().Begin ((TcpClient)client);
    }
  }

  class ReverseEcho
  {
    volatile TcpClient _client;
    volatile NetworkStream _stream;
    byte [] _data = new byte [5000];
    volatile int _bytesRead = 0;

    internal void Begin (TcpClient c)
    {
      try
      {
        _client = c;
        _stream = c.GetStream();
        Read();
      }
      catch (Exception ex) { ProcessException (ex); }
    }

    void Read()            // Read in a non-blocking fashion.
    {
      _stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead,
                        ReadCallback, null);
    }

    void ReadCallback (IAsyncResult r)
    {
      try
      {
        int chunkSize = _stream.EndRead (r);
        _bytesRead += chunkSize;
        if (chunkSize > 0 && _bytesRead < _data.Length)
        {
          Read();       // More data to read!
          return;
        }
        Array.Reverse (_data);
        _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
      }
      catch (Exception ex) { ProcessException (ex); }
    }

    void WriteCallback (IAsyncResult r)
    {
      try { _stream.EndWrite (r); }
      catch (Exception ex) { ProcessException (ex); }
      Cleanup();
    }

    void ProcessException (Exception ex)
    {
      Cleanup();
      Console.WriteLine ("Error: " + ex.Message);
    }

    void Cleanup()
    {
      if (_stream != null) _stream.Close();
      if (_client != null) _client.Close();
    }
  }
}

Writing asynchronous methods:

public class MessagingServices
{
  public static IAsyncResult BeginReverseEcho (TcpClient client,
                                               AsyncCallback callback,
                                               object userState)
  {
    var re = new ReverseEcho();
    re.Begin (client, callback, userState);
    return re;
  }

  public static byte [] EndReverseEcho (IAsyncResult r)
  {
    return ((ReverseEcho)r).End();
  }
}

class ReverseEcho : IAsyncResult
{
  volatile TcpClient _client;
  volatile NetworkStream _stream;
  volatile object _userState;
  volatile AsyncCallback _callback;
  ManualResetEvent _waitHandle = new ManualResetEvent (false);
  volatile int _bytesRead = 0;
  byte [] _data = new byte [Program.MessageLength];
  volatile Exception _exception;

  internal ReverseEcho() { }

  // IAsyncResult members:

  public object AsyncState { get { return _userState; } }
  public WaitHandle AsyncWaitHandle { get { return _waitHandle; } }
  public bool CompletedSynchronously { get { return false; } }
  public bool IsCompleted
  {
    get { return _waitHandle.WaitOne (0, false); }
  }

  internal void Begin (TcpClient c, AsyncCallback callback, object state)
  {
    _client = c;
    _callback = callback;
    _userState = state;
    try
    {
      _stream = _client.GetStream();
      Read();
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  internal byte [] End()     // Wait for completion + re-throw any error.
  {
    AsyncWaitHandle.WaitOne();
    AsyncWaitHandle.Close();
    if (_exception != null) throw _exception;
    return _data;
  }

  void Read()   // This is always called from an exception-handled method
  {
    _stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead,
                     ReadCallback, null);
  }

  void ReadCallback (IAsyncResult r)
  {
    try
    {
      int chunkSize = _stream.EndRead (r);
      _bytesRead += chunkSize;
      if (chunkSize > 0 && _bytesRead < _data.Length)
      {
        Read();       // More data to read!
        return;
      }
      Array.Reverse (_data);
      _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  void WriteCallback (IAsyncResult r)
  {
    try { _stream.EndWrite (r); }
    catch (Exception ex) { ProcessException (ex); return; }
    Cleanup();
  }

  void ProcessException (Exception ex)
  {
    _exception = ex;   // This will get re-thrown when the 
    Cleanup();        // consumer calls the End method.
  }

  void Cleanup()
  {
    try
    {
      if (_stream != null) _stream.Close();
    }
    catch (Exception ex)
    {
      if (_exception != null) _exception = ex;
    }
    // Signal that we're done and fire the callback.
    _waitHandle.Set();
    if (_callback != null) _callback (this);
  }
}

© 2007, O'Reilly Media, Inc. All rights reserved

C# 3.0 in a Nutshell
Buy from amazon.com Available now