Over the last few weeks I’ve been playing with RabbitMQ and Rails have been completely blown away by how awesome it is.
I started out to build a simple chat application which would allow users to create a room and log into the room and post messages to multiple users who were logged in. Though, this is probably not the best use case for using AMQP but it helped me understand how AMQP worked and maybe I’ll get to use somewhere more apt.
RabbitMQ
What is RabbitMQ
RabbitMQ is an open source, highly scalable messaging system which provides support for STOMP, SMTP and HTTP messaging. [www.rabbitmq.com]
It’s built using the Advanced Message Queuing protocol (AMQP) which provides APIs in Ruby and several other programming languages.
Installation
The easiest way to install RabbitMQ on the Mac is using homebrew. The Macports installation has many issues and I would recommend going with HomeBrew. It also takes care of all dependencies like the correct Erlang installation etc.
You can test your installation by moving into the installation ‘bin’ directory and running rabbitmq-server
If you end up seeing something like
... starting error log relay ...done starting networking ...done starting direct_client ...done starting notify cluster nodes broker running
your good to go.
Creating our AMQP server
In Ruby we could run our AMQP server using the ‘amqp gem’ along with EventMachine.
The AMQP server code.
require 'rubygems' require 'amqp' require 'mongo' require 'em-websocket' require 'json' .... @sockets = [] EventMachine.run do connection = AMQP.connect(:host => '127.0.0.1') channel = AMQP::Channel.new(connection) puts "Connected to AMQP broker. #{AMQP::VERSION} " mongo = MongoManager.establish_connection("trackertalk_development") EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws| socket_detail = {:socket => ws} ws.onopen do @sockets < true) #AMQP::Consumer.new(channel, queue) elsif status[:status] == 'MESSAGE' full_message = " #{status[:username]} : #{status[:message]}" exchange.publish(full_message) end end ws.onclose do @sockets.delete ws end end end
The code snippet contains most of the amqp server code. The entire code lives inside the EM.run loop. Ignore the Mongo bit as it doesn’t play a part in our area of interest.
@sockets = [] EventMachine.run do connection = AMQP.connect(:host => '127.0.0.1') channel = AMQP::Channel.new(connection) ...
In this snippet I created a new AMQP connection to localhost. When not provided with any other parameter the default port used by AMQP is 5672 with the username: guest and password :guest. Since the RabbitMQ installation will never (in most cases) have to be accessed from outside the server its safe to run the installation with the defaults.
The second line creates a new channel.
Once the channel is established the EM::Websocket block is used to allow modern browsers to connect to our server on port 8080
EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws| socket_detail = {:socket => ws} ws.onopen do @sockets < true) #AMQP::Consumer.new(channel, queue) elsif status[:status] == 'MESSAGE' full_message = " #{status[:username]} : #{status[:message]}" exchange.publish(full_message) end end ws.onclose do @sockets.delete ws end
This is where the core logic lies. On the socket connect block I store the current socket in an array to use it later.
The ‘onmessage’ block is called when a new message arrives from the browser. Since I needed to dynamically create new exchanges and queues I used a very amateurish system. The message sent from the browser arrives with the following parameters in the JSON format
The following snippet is not a part of AMQP but a feature borrowed from XMPP to get my application working
{status:'status', username:'someusername', roomname:'someroomname', message:'somemessage'}
Status could have values from [‘status’, ‘message’, ‘presence’]
Username contains the ‘current_user’ name
Roomname indicates the ‘room’ the user has been logged into.
Status messages indicate that some kind of chore needs to be performed like subscribing to queues, binding exchanges etc.
Messages indicate that the message needs to be pushed to an exchange. When a message is published to an exchange, subscribing queues push the message to the browser using Websockets.
Presence messages indicate the users presence.
Why the entire application was a failure
The most important aspect while using AMQP is identifying the type of exchanges needed. In this case my assumptions on identifying rooms to be exchanges and users to have individual queues proved to be the reason for failure.
The reason why this assumption would fail is if a user queue would bind to multiple exchanges with each user having a unique queue assigned to him, a simple fanout exchange would push messages from different rooms to the user’s logged in chat room which is not what we want. This way users would end up receiving messages that are not relevant.
A ideal approach would have multiple users subscribe to a room (assumed to be the queue in this case) via a topic exchange or direct exchange.
I also believe that it would be logical to dynamically create queues and exchanges using the EM.periodic timer to check a NoSQL database or key-value store for new requests rather than using Websockets.
The AMQP server code is available here