Monday, November 26, 2012

RabbitMQ On Windows With .NET, A Case Study

Any reader of this blog will know that my big project over the last year has been to create a simple .NET API for RabbitMQ called EasyNetQ.  I’ve been working as a software architect at 15Below for the last year and a half. The prime motivation for writing EasyNetQ was so that our developers would have an easy API for working with RabbitMQ on .NET. I was very fortunate that, founder and technical director, John Clynes, supported my wish to build it as an open source library. I originally wrote this post for the VMWare blog as a case study of running RabbitMQ in a Microsoft environment.
15Below is based in Brighton on the south coast of England, famous for it’s Regency pavilion and Victorian pier. We provide messaging and integration services for the travel industry. Our clients include Ryanair, Qantas, JetBlue, Thomas Cook and around 30 other airline and rail customers. We send hundreds of millions of transactional notifications each year to our customer’s passengers.
RabbitMQ has helped us to significantly simplify and stabilise our software. It’s one of those black boxes that you install, configure, and then really don’t have to worry about. In over a year of production we’ve found it to be extremely stable and reliable.
Prior to introducing RabbitMQ our applications would use SQL Server as a queuing mechanism. Each task would be represented by a row in a workflow table. Each process in the workflow would poll the table looking for rows that matched its status, process the rows in in a batch, and then update the rows’ status field for the next process to pick up. Each step in the process would be hosted by an application service that implemented its own threading model, often using a different approach to all the other services. This created highly coupled software, with workflow steps and infrastructure concerns, such as threading and load balancing, mixed together with business logic. We also discovered that a relational database is not a natural fit for a queuing system. The contention on the workflow tables is high, with constant inserts, selects and updates causing locking issues. Deleting completed items is also problematic on highly indexed tables and we had considerable problems with continuously growing tables.
I wrote about the ‘Database As Queue Anti-Pattern’ in a previous post in more detail.
RabbitMQ provides a number of features that helped us overcome these problems. Firstly it is designed from the beginning as a high-performance messaging platform. It easily outperformed our SQL Server based solution with none of its locking or deletion problems. Rabbit’s event-oriented messaging model also takes away much of the need for complex multi-threaded batch processing code that was previously a cause of instability in our systems.
We first introduced RabbitMQ about 18 months ago as the core infrastructure behind our Flight Status product. We wanted a high performance messaging product with a proven track record that supported common messaging patterns, such as publish/subscribe and request/response. A further requirement was that it should provide automatic work distribution and load balancing.
The need to support messaging patterns ruled out simple store-and-forward queues such as MSMQ and ActiveMQ. We were very impressed by ZeroMQ, but decided that we really needed the centralised manageability of a broker based product. This left RabbitMQ. Although support for AMQP, an open messaging standard, wasn’t in our list of requirements, its implementation by RabbitMQ made us more confident that we were choosing a sustainable strategy.
We are very much a Microsoft shop, so we had some initial concerns about RabbitMQ’s performance and stability on Windows. We were reassured by reading some accounts of RabbitMQ’s and indeed Erlang’s use on Windows by organisations with some very impressive load requirements. Subsequent experience has borne these reports out, and we have found RabbitMQ on Server 2008 to be rock solid.
As a Microsoft shop, our development platform is .NET. Although VMWare provide an AMQP C# client, it is a low-level API, not suitable for use by more junior developers. For this reason we created our own high-level .NET API for RabbitMQ that provides simple single method access to common messaging patterns and does not require a deep knowledge of AMQP.  This API is called EasyNetQ. We’ve open sourced it and, with over 3000 downloads, it is now the leading high-level API for RabbitMQ with .NET. You can find more information about it at EasyNetQ.com. We would recommend looking at it if you are a .NET shop using RabbitMQ.
15Below’s Flight-Status product provides real-time flight information to passengers and their family and friends. We interface with the airline’s real-time flight information stream generated from their operation systems and provide a platform that allows them to apply complex business logic to this stream. We render customer tailored output, and communicate with the airline’s customers via a range of channels, including email, SMS, voice and iPhone/Android push. RabbitMQ allows us to build each piece; the client for the fight information stream, the message renderer, the sink channels and the business logic; as separate components that communicate using structured messages. Our architecture looks something like this:
rabbit_based_architecture
The green boxes are our core product systems, the blue boxes represent custom code that we write for each customer. A ‘customer saga’ is code that models a long-running business process and includes all the workflow logic for a particular customer’s flight information requirements. A ‘core product service’ is an independent service that implements a feature of our product. An example would be the service that takes flight information and combines it with a customer defined template to create an email to be sent to a passenger. Constructing services as independently deployable and runnable applications gives us great flexibility and scalability. If we need to scale up a particular component, we simply install more copies. RabbitMQ’s automatic work sharing feature means that we can do this without any reconfiguration of existing components. This architecture also makes it easy to test each application service in isolation since it’s simply a question of firing messages at the service and watching its response.
In conclusion, RabbitMQ has provided a rock solid piece of infrastructure with the features to allow us to significantly reduce the architectural complexity of our systems. We can now build software for our clients faster and more reliably. It scales to higher loads than our previous relational-database based systems and is more flexible in the face of changing customer requirements.

Friday, November 23, 2012

Using BlockingCollection To Communicate Between Threads

Consider these (somewhat) common programming challenges:

  • I’m using a third party library that is not thread safe, but I want my application to share work between multiple threads. How do I marshal calls between my multi-threaded code to the single threaded library?
  • I have a single source of events on a single thread, but I want to share the work between a pool of multiple threads?
  • I have multiple threads emitting events, but I want to consume them on a single thread?

One way of doing this would be to have some shared state, a field or a property on a static class, and wrap locks around it so that multiple threads can access it safely. This is a pretty common way of trying to skin this particular cat, but it’s shot through with traps for the unwary. Also, it can hurt performance because access to the shared resource is serialized, even though the things accessing it are running in parallel.

A better way is to use a BlockingCollection and have your threads communicate via message classes.

BlockingCollection is a class in the new System.Collections.Concurrent namespace that arrived with .NET 4.0. It contains a ConcurrentQueue, although you can swap this for a ConcurrentStack or a ConcurrentBag if you want. You push objects in at one end and sit in a loop consuming them from the other. The (multiple) producer(s) and (multiple) consumer(s) can be running on different threads without any locks. That’s OK because the Concurrent namespace collection classes are guaranteed to be thread safe. The ‘blocking’ part of the name is there because the consuming end blocks until an object is available. Justin Etheredge has an excellent post that looks at BlockingCollection in more detail here.

For an example, let’s implement a parallel pipeline. A ventilator produces tasks to be processed in parallel, a set of workers process the tasks on separate threads, and a sink collects the results back together again. It shows both one-to-many and many-to-one thread communication. I’ve stolen the idea and the diagram from the excellent ZeroMQ Guide:

zguide_parallel_workers

First we’ll need a class that represents a piece of work, we’ll keep it super simple for this example:

public class WorkItem
{
public string Text { get; set; }
}

We’ll need two BlockingCollections, one to take the tasks from the ventilator to the workers, and another to take the finished work from the workers to the sink:

var ventilatorQueue = new BlockingCollection<WorkItem>();
var sinkQueue = new BlockingCollection<WorkItem>();

Now let’s write our ventilator:

public static void StartVentilator(BlockingCollection<WorkItem> ventilatorQueue)
{
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 100; i++)
{
ventilatorQueue.Add(new WorkItem { Text = string.Format("Item {0}", i) });
}
}, TaskCreationOptions.LongRunning);
}

It just iterates 100 times creating work items and pushing them on the ventilatorQueue.

Here is a worker:

public static void StartWorker(int workerNumber,
BlockingCollection<WorkItem> ventilatorQueue,
BlockingCollection<WorkItem> sinkQueue)
{
Task.Factory.StartNew(() =>
{
foreach (var workItem in ventilatorQueue.GetConsumingEnumerable())
{
// pretend to take some time to process
Thread.Sleep(30);
workItem.Text = workItem.Text + " processed by worker " + workerNumber;
sinkQueue.Add(workItem);
}
}, TaskCreationOptions.LongRunning);
}

BlockingCollection provides a GetConsumingEnumerable method that yields each item in turn. It blocks if there are no items on the queue. Note that I’m not worrying about shutdown patterns in this code. In production code you’d need to worry about how to close down your worker threads.

Next let’s write our sink:

public static void StartSink(BlockingCollection<WorkItem> sinkQueue)
{
Task.Factory.StartNew(() =>
{
foreach (var workItem in sinkQueue.GetConsumingEnumerable())
{
Console.WriteLine("Processed Messsage: {0}", workItem.Text);
}
}, TaskCreationOptions.LongRunning);
}

Once again, this sits in an infinite foreach loop consuming items from the sinkQueue.

Finally we need to wire up the pieces and kick it off:

StartSink(sinkQueue);

StartWorker(0, ventilatorQueue, sinkQueue);
StartWorker(1, ventilatorQueue, sinkQueue);
StartWorker(2, ventilatorQueue, sinkQueue);

StartVentilator(ventilatorQueue);

I’ve started the sink first, then the workers and finally the producer. It doesn’t overly matter what order they start in since the queues will store any tasks the ventilator creates before the workers and the sink start.

Running the code I get output something like this:

Processed Messsage: Item 1 processed by worker 1
Processed Messsage: Item 2 processed by worker 0
Processed Messsage: Item 0 processed by worker 2
Processed Messsage: Item 5 processed by worker 2
Processed Messsage: Item 3 processed by worker 1

....

Processed Messsage: Item 95 processed by worker 0
Processed Messsage: Item 98 processed by worker 0
Processed Messsage: Item 97 processed by worker 2
Processed Messsage: Item 96 processed by worker 1
Processed Messsage: Item 99 processed by worker 0

This pattern is a great way of decoupling the communication between a source and a sink, or a producer and a consumer. It also allows you to have multiple sources and multiple sinks, but primarily it’s a safe way for multiple threads to interact.

The complete example is here on GitHub.

Thursday, November 15, 2012

A C# .NET Client Proxy For The RabbitMQ Management API

RabbitMQ comes with a very nice Management UI and a HTTP JSON API, that allows you to configure and monitor your RabbitMQ broker. From the website:

The rabbitmq-management plugin provides an HTTP-based API for management and monitoring of your RabbitMQ server, along with a browser-based UI and a command line tool, rabbitmqadmin. Features include:

  • Declare, list and delete exchanges, queues, bindings, users, virtual hosts and permissions.
  • Monitor queue length, message rates globally and per channel, data rates per connection, etc.
  • Send and receive messages.
  • Monitor Erlang processes, file descriptors, memory use.
  • Export / import object definitions to JSON.
  • Force close connections, purge queues.

Wouldn’t it be cool if you could do all these management tasks from your .NET code? Well now you can. I’ve just added a new project to EasyNetQ called EasyNetQ.Management.Client. This is a .NET client-side proxy for the HTTP-based API.

It’s on NuGet, so to install it, you simply run:

PM> Install-Package EasyNetQ.Management.Client

To give an overview of the sort of things you can do with EasyNetQ.Client.Management, have a look at this code. It first creates a new Virtual Host and a User, and gives the User permissions on the Virtual Host. Then it re-connects as the new user, creates an exchange and a queue, binds them, and publishes a message to the exchange. Finally it gets the first message from the queue and outputs it to the console.

var initial = new ManagementClient("http://localhost", "guest", "guest");

// first create a new virtual host
var vhost = initial.CreateVirtualHost("my_virtual_host");

// next create a user for that virutal host
var user = initial.CreateUser(new UserInfo("mike", "topSecret"));

// give the new user all permissions on the virtual host
initial.CreatePermission(new PermissionInfo(user, vhost));

// now log in again as the new user
var management = new ManagementClient("http://localhost", user.name, "topSecret");

// test that everything's OK
management.IsAlive(vhost);

// create an exchange
var exchange = management.CreateExchange(new ExchangeInfo("my_exchagne", "direct"), vhost);

// create a queue
var queue = management.CreateQueue(new QueueInfo("my_queue"), vhost);

// bind the exchange to the queue
management.CreateBinding(exchange, queue, new BindingInfo("my_routing_key"));

// publish a test message
management.Publish(exchange, new PublishInfo("my_routing_key", "Hello World!"));

// get any messages on the queue
var messages = management.GetMessagesFromQueue(queue, new GetMessagesCriteria(1, false));

foreach (var message in messages)
{
Console.Out.WriteLine("message.payload = {0}", message.payload);
}

This library is also ideal for monitoring queue levels, channels and connections on your RabbitMQ broker. For example, this code prints out details of all the current connections to the RabbitMQ broker:

var connections = managementClient.GetConnections();

foreach (var connection in connections)
{
Console.Out.WriteLine("connection.name = {0}", connection.name);
Console.WriteLine("user:\t{0}", connection.client_properties.user);
Console.WriteLine("application:\t{0}", connection.client_properties.application);
Console.WriteLine("client_api:\t{0}", connection.client_properties.client_api);
Console.WriteLine("application_location:\t{0}", connection.client_properties.application_location);
Console.WriteLine("connected:\t{0}", connection.client_properties.connected);
Console.WriteLine("easynetq_version:\t{0}", connection.client_properties.easynetq_version);
Console.WriteLine("machine_name:\t{0}", connection.client_properties.machine_name);
}

On my machine, with one consumer running it outputs this:
 
connection.name = [::1]:64754 -> [::1]:5672
user: guest
application: EasyNetQ.Tests.Performance.Consumer.exe
client_api: EasyNetQ
application_location: D:\Source\EasyNetQ\Source\EasyNetQ.Tests.Performance.Consumer\bin\Debug
connected: 14/11/2012 15:06:19
easynetq_version: 0.9.0.0
machine_name: THOMAS

You can see the name of the application that’s making the connection, the machine it’s running on and even its location on disk. That’s rather nice. From this information it wouldn’t be too hard to auto-generate a complete system diagram of your distributed messaging application. Now there’s an idea :)

For more information, check out the documentation.

Monday, November 12, 2012

Nicer Client Properties For EasyNetQ

EasyNetQ is my lightweight easy-to-use .NET API for RabbitMQ.

Today I added a small but very nice feature, better client properties. Now when you look at connections created by EasyNetQ you can see the machine that connected, the application and the application’s location on disk. It also gives you the date and time that EasyNetQ first connected. Very useful for debugging.

Here’s an example. Check out the ‘Client Properties’ section.

nice_connection_properties

Tuesday, November 06, 2012

EasyNetQ Publisher Confirms

EasyNetQ is my easy-to-use .NET API for RabbitMQ.

The default AMQP publish is not transactional and doesn't guarantee that your message will actually reach the broker. AMQP does specify a transactional publish, but with RabbitMQ it is extremely slow, around 200 slower than a non-transactional publish, and we haven't supported it via the EasyNetQ API. For high-performance guaranteed delivery it's recommended that you use 'Publisher Confirms'. Simply speaking, this an extension to AMQP that provides a call-back when your message has been successfully received by the broker.

What does 'successfully received' mean? It depends ...

  • A transient message is confirmed the moment it is enqueued.
  • A persistent message is confirmed as soon as it is persisted to disk, or when it is consumed on every queue.
  • An unroutable transient message is confirmed directly it is published.

For more information on publisher confirms, please read the announcement on the RabbitMQ blog.

To use publisher confirms, you must first create the publish channel with publisher confirms on:

var channel = bus.OpenPublishChannel(x => x.WithPublisherConfirms())

Next you must specify success and failure callbacks when you publish your message:

channel.Publish(message, x => 
x.OnSuccess(() =>
{
// do success processing here
})
.OnFailure(() =>
{
// do failure processing here
}));

Be careful not to dispose the publish channel before your call-backs have had a chance to execute.

Here's an example of a simple test. We're publishing 10,000 messages and then waiting for them all to be acknowledged before disposing the channel. There's a timeout, so if the batch takes longer than 10 seconds we abort with an exception.

const int batchSize = 10000;
var callbackCount = 0;
var stopwatch = new Stopwatch();
stopwatch.Start();

using (var channel = bus.OpenPublishChannel(x => x.WithPublisherConfirms()))
{
for (int i = 0; i < batchSize; i++)
{
var message = new MyMessage {Text = string.Format("Hello Message {0}", i)};
channel.Publish(message, x =>
x.OnSuccess(() => {
callbackCount++;
})
.OnFailure(() =>
{
callbackCount++;
}));
}

// wait until all the publications have been acknowleged.
while (callbackCount < batchSize)
{
if (stopwatch.Elapsed.Seconds > 10)
{
throw new ApplicationException("Aborted batch with timeout");
}
Thread.Sleep(10);
}
}