Chapter 8. Threading and Concurrency Support

8.1. Introduction

The purpose of the Spring.Threading namespace is to provide a place to keep useful concurrency abstractions that augment those in the BCL. Since Doug Lea has provided a wealth of mature public domain concurrency abstractions in his Java based 'EDU.oswego.cs.dl.util.concurrent' libraries we decided to port a few of his abstractions to .NET. So far, we've only ported three classes, the minimum necessary to provide basic object pooling functionality to support an AOP based pooling aspect and to provide a Semaphore class that was mistakenly not included in .NET 1.0/1.1.

There is also an important abstraction, IThreadStorage, for performing thread local storage.

8.2. Thread Local Storage

Depending on your runtime environment there are different strategies to use for storing objects in thread local storage. If you are in web applications a single Request may be executed on different threads. As such, the location to store thread local objects is in HttpContext.Current. For other environments System.Runtime.Remoting.Messaging.CallContext is used. For more background information on the motivation behind these choices, say as compared the the attribute [ThreadStatic] refer to "Piers7"'s blog and this forum post. The interface IThreadStorage serves as the basis for the thread local storage abstraction and various implementations can be selected from depending on your runtime requirements. Configuring the implementation of IThreadStorage makes it easier to have more portability across runtime environments.

The API is quite simple and shown below

public interface IThreadStorage
{
  object GetData(string name)

  void SetData(string name, object value)

  void FreeNamedDataSlot(string name)

}

The methods GetData and SetData are responsible for retrieving and setting the object that is to be bound to thread local storage and associating it with a name. Clearing the thread local storage is done via the method FreeNamedDataSlot.

In Spring.Core is the implementation, CallContextStorage, that directly uses CallContext and also the implementation LogicalThreadContext whic by default uses CallContextStorage but can be configured via the static method SetStorage(IThreadStorage). The methods on CallContextStorage and LogicalThreadContext are static.

In Spring.Web is the implementation HttpContextStorage which uses the HttpContext to store thread local data and HybridContextStorage that uses HttpContext if within a web environment, i.e. HttpContext.Current != null, and CallContext otherwise.

8.3. Synchronization Primitives

When you take a look at these synchronization classes, you'll wonder why it's even necessary when System.Threading provides plenty of synchronization options. Although System.Threading provides great synchronization classes, it doesn't provide well-factored abstractions and interfaces for us. Without these abstractions, we will tend to code at a low-level. With enough experience, you'll eventually come up with some abstractions that work well. Doug Lea has already done a lot of that research and has a class library that we can take advantage of.

8.3.1. ISync

ISync is the central interface for all classes that control access to resources from multiple threads. It's a simple interface which has two basic use cases. The first case is to block indefinitely until a condition is met:

void ConcurrentRun(ISync lock) {
  lock.Acquire(); // block until condition met
  try {
    // ... access shared resources
  }
  finally {
    lock.Release();
  }
}

The other case is to specify a maximum amount of time to block before the condition is met:

void ImpatientConcurrentRun(ISync lock) {
  // block for at most 10 milliseconds for condition
  if ( lock.Attempt(10) ) {
    try {
      // ... access shared resources
    }
    finally {
      lock.Release();
    }
  } else {
    // complain of time out
  }
}

8.3.2. SyncHolder

The SyncHolder class implements the System.IDisposable interface and so provides a way to use an ISync with the using C# keyword: the ISync will be automatically Acquired and then Released on exiting from the block.

This should simplify the programming model for code using (!) an ISync:

ISync sync = ...
...
using (new SyncHolder(sync))
  {
    // ... code to be executed
    // holding the ISync lock
  }
      

There is surely the timed version, a little more cumbersome as you must deal with timeouts:

ISync sync = ...
long msecs = 100;
...
// try to acquire the ISync for msecs milliseconds
try 
{
  using (new SyncHolder(sync, msecs))
  {
    // ... code to be executed
    // holding the ISync lock
  }
}    
catch (TimeoutException)
{
  // deal with failed lock acquisition
}

      

8.3.3. Latch

The Latch class implements the ISync interface and provides an implementation of a latch. A latch is a boolean condition that is set at most once, ever. Once a single release is issued, all acquires will pass. It is similar to a ManualResetEvent initialized unsignalled (Reset) and can only be Set(). A typical use is to act as a start signal for a group of worker threads.

class Boss {
  Latch _startPermit;

  void Worker() {
    // very slow worker initialization ...
    // ... attach to messaging system
    // ... connect to database
    _startPermit.Acquire();
    // ... use resources initialized in Mush
    // ... do real work
  }

  void Mush() {
    _startPermit = new Latch();
    for (int i=0; i<10; ++i) {
      new Thread(new ThreadStart(Worker)).Start();
    }
    // very slow main initialization ...
    // ... parse configuration
    // ... initialize other resources used by workers
    _startPermit.Release();
  }

}

8.3.4. Semaphore

The Semaphore class implements the ISync interface and provides an implementation of a semaphore. Conceptually, a semaphore maintains a set of permits. Each Acquire() blocks if necessary until a permit is available, and then takes it. Each Release() adds a permit. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly. A typical use is to control access to a pool of shared objects.

class LimitedConcurrentUploader {
  // ensure we don't exceed maxUpload simultaneous uploads
  Semaphore _available;
  public LimitedConcurrentUploader(maxUploads) {
    _available = new Semaphore(maxUploads);
  }
  // no matter how many threads call this method no more
  // than maxUploads concurrent uploads will occur.
  public Upload(IDataTransfer upload) {
    _available.Acquire();
    try {
      upload.TransferData();
    }
    finally {
      _available.Release();
    }
  }
}