Code Listings

Chapter 23: Asynchronous Methods

Download Asynchronator (Visual Studio Test Project)

Blocking sockets server

using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;

public class Server
{
 public void Serve (IPAddress address, int port)
 {
    ThreadPool.SetMinThreads (50, 50);    // Refer to Chapter 21
    ThreadPool.SetMaxThreads (50, 50);    // Refer to Chapter 21
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (Accept, c);
    }
  }

  void Accept (object clientObject)
  {
    using (TcpClient client = (TcpClient) clientObject)
    using (NetworkStream n = client.GetStream())
    {
      byte[] data = new byte [5000];

      int bytesRead = 0; int chunkSize = 1;
      while (bytesRead < data.Length && chunkSize > 0)
        bytesRead +=
          chunkSize = n.Read
            (data, bytesRead, data.Length - bytesRead);    // BLOCKS

      Array.Reverse (data);
      n.Write (data, 0, data.Length);                      // BLOCKS
    }
  }
}

Non-blocking sockets server

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (ReverseEcho, c);
    }
  }

  void ReverseEcho (object client)
  {
    new ReverseEcho().Begin ((TcpClient)client);
  }
}

class ReverseEcho
{
  TcpClient _client;
  NetworkStream _stream;
  byte[] _data = new byte [5000];
  int _bytesRead = 0;

  internal void Begin (TcpClient c)
  {
    try
    {
      _client = c;
      _stream = c.GetStream();
      Read();
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  void Read()            // Read in a nonblocking fashion.
  {
    while (true)
    {
      IAsyncResult r = _stream.BeginRead
       (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null);

      // This will nearly always return in the next line:
      if (!r.CompletedSynchronously) return;   // Handled by callback
      if (!EndRead (r)) break;
    }
    Write();
  }

  void ReadCallback (IAsyncResult r)
  {
    try
    {
      if (r.CompletedSynchronously) return;
      if (EndRead (r))
      {
        Read();       // More data to read!
        return;
      }
      Write();
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  bool EndRead (IAsyncResult r)   // Returns false if there’s no more data
  {
    int chunkSize = _stream.EndRead (r);
    _bytesRead += chunkSize;
    return chunkSize > 0 && _bytesRead < _data.Length;   // More to read
  }

  void Write()
  {
    Array.Reverse (_data);
    _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
  }

  void WriteCallback (IAsyncResult r)
  {
    try { _stream.EndWrite (r); }
    catch (Exception ex) { ProcessException (ex); }
    Cleanup();
  }

  void ProcessException (Exception ex)
  {
    Cleanup();
    Console.WriteLine ("Error: " + ex.Message);
  }

  void Cleanup()
  {
    if (_stream != null) _stream.Close();
    if (_client != null) _client.Close();
  }
}

Non-blocking server: with Tasks

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      new ReverseEcho().BeginAsync (c);
    }
  }
}

class ReverseEcho
{
  TcpClient _client;
  NetworkStream _stream;
  byte[] _data = new byte [5000];
  int _bytesRead = 0;

  internal void BeginAsync (TcpClient c)
  {
    _client = c;
    _stream = c.GetStream();

    var task = Task.Factory.StartNew (Read);

    // Set up centralized error handling and cleanup:

    task.ContinueWith (ant => 
      Console.WriteLine ("Error: " + ant.Exception.Message),
      TaskContinuationOptions.OnlyOnFaulted);                

    task.ContinueWith (ant =>
    {
      if (_stream != null) _stream.Close();
      if (_client != null) _client.Close();
    });
  }

  void Read()    // This will create a child task.
  {
    Task<int> readChunk = Task<int>.Factory.FromAsync (
      _stream.BeginRead, _stream.EndRead,
      _data, _bytesRead, _data.Length - _bytesRead, null,
      TaskCreationOptions.AttachedToParent);

    readChunk.ContinueWith (Write, TaskContinuationOptions.NotOnFaulted 
                                 | TaskCreationOptions.AttachedToParent);
  }

  void Write (Task<int> readChunk)
  {
    _bytesRead += readChunk.Result;
    if (readChunk.Result > 0 && _bytesRead < _data.Length)
    {
      Read();       // More data to read!
      return;
    }
    Array.Reverse (_data);
    Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
                            _data, 0, _data.Length, null,
                            TaskCreationOptions.AttachedToParent);
  }
}

Async methods and iterators

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      Task.Factory.Iterate (ReverseEcho(c)).ContinueWith (t =>
        Console.WriteLine ("Error: " + t.Exception.Message),
        TaskContinuationOptions.OnlyOnFaulted);
    }
  }

  IEnumerable<Task> ReverseEcho (TcpClient client)
  {
    using (client)
    using (var stream = client.GetStream())
    {
      byte[] data = new byte[Program.MessageLength];
      int bytesRead = 0;
      while (true)
      {
        // ReadASync is an extension method in the samples.
        Task<int> readChunk = stream.ReadAsync
          (data, bytesRead, data.Length - bytesRead);
        yield return readChunk;
        bytesRead += readChunk.Result;
        if (readChunk.Result <= 0 || bytesRead >= data.Length)
          break;
      }
      Array.Reverse(data);
      yield return stream.WriteAsync (data, 0, bytesRead);
    }
  }
}

Writing async methods

public class MessagingServices
{
  public static IAsyncResult BeginReverseEcho (TcpClient client,
                                               AsyncCallback callback,
                                               object userState)
  {
    var re = new ReverseEcho();
    re.Begin (client, callback, userState);
    return re;
  }

  public static byte[] EndReverseEcho (IAsyncResult r)
  {
    return ((ReverseEcho)r).End();
  }
}

class ReverseEcho : IAsyncResult
{
  TcpClient _client;
  NetworkStream _stream;
  object _userState;
  ManualResetEvent _waitHandle = new ManualResetEvent (false);
  int _bytesRead = 0;
  byte[] _data = new byte [5000];
  Exception _exception;

  internal ReverseEcho() { }

  // IAsyncResult members:

  public object AsyncState { get { return _userState; } }
  public WaitHandle AsyncWaitHandle { get { return _waitHandle; } }
  public bool CompletedSynchronously { get { return false; } }
  public bool IsCompleted
  {
    get { return _waitHandle.WaitOne (0, false); }
  }

  internal void Begin (TcpClient c, AsyncCallback callback, object state)
  {
    _client = c;
    _userState = state;
    _stream = _client.GetStream();
    
    Task.Factory.StartNew (Read).ContinueWith (ant =>
    {
      _exception = ant.Exception;   // In case an exception occurred.

      if (_stream != null)
        try { _stream.Close(); }
        catch (Exception ex) { _exception = ex; };

      _waitHandle.Set();

      if (callback != null) callback (this);
    });
  }

  internal byte[] End()     // Wait for completion + rethrow any error.
  {
    AsyncWaitHandle.WaitOne();
    if (_exception != null) throw _exception;
    return _data;
  }

  void Read()
  {
    Task<int> readChunk = Task<int>.Factory.FromAsync (
      _stream.BeginRead, _stream.EndRead,
      _data, _bytesRead, _data.Length - _bytesRead, null);

    readChunk.ContinueWith (ContinueRead,
                            TaskContinuationOptions.NotOnFaulted
                          | TaskContinuationOptions.AttachedToParent);
  }

  void ContinueRead (Task<int> readChunk)
  {
    _bytesRead += readChunk.Result;
    if (readChunk.Result > 0 && _bytesRead < _data.Length)
    {
      Read();       // More data to read!
      return;
    }
    Array.Reverse (_data);
    Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
                            _data, 0, _data.Length, null);
  }  
}

 

© 2007-2012, Joe Albahari, Ben Albahari and O'Reilly Media, Inc. All rights reserved

C# 5.0 in a Nutshell
Buy from amazon.com Available now