RabbitMQ and Rails. Why my chat application failed.

Over the last few weeks I’ve been playing with RabbitMQ and Rails have been completely blown away by how awesome it is.

I started out to build a simple chat application which would allow users to create a room and log into the room and post messages to multiple users who were logged in. Though, this is probably not the best use case for using AMQP but it helped me understand how AMQP worked and maybe I’ll get to use somewhere more apt.

RabbitMQ

What is RabbitMQ
RabbitMQ is an open source, highly scalable messaging system which provides support for STOMP, SMTP and HTTP messaging. [www.rabbitmq.com]

It’s built using the Advanced Message Queuing protocol (AMQP) which provides APIs in Ruby and several other programming languages.

Installation

The easiest way to install RabbitMQ on the Mac is using homebrew. The Macports installation has many issues and I would recommend going with HomeBrew. It also takes care of all dependencies like the correct Erlang installation etc.

You can test your installation by moving into the installation ‘bin’ directory and running rabbitmq-server

If you end up seeing something like

 ...
 starting error log relay                                              ...done
 starting networking                                                   ...done
 starting direct_client                                                ...done
 starting notify cluster nodes 
 broker running

your good to go.

Creating our AMQP server

In Ruby we could run our AMQP server using the ‘amqp gem’ along with EventMachine.

The AMQP server code.

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'

....


@sockets = []
EventMachine.run do
  connection  = AMQP.connect(:host => '127.0.0.1')
  channel = AMQP::Channel.new(connection)
  puts "Connected to AMQP broker. #{AMQP::VERSION} "
  mongo = MongoManager.establish_connection("trackertalk_development")

  EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
    socket_detail = {:socket => ws}
    ws.onopen do
      @sockets < true)
        #AMQP::Consumer.new(channel, queue)

      elsif status[:status] == 'MESSAGE'
        full_message = " #{status[:username]} :  #{status[:message]}"
        exchange.publish(full_message)
      end
    end

    ws.onclose do
      @sockets.delete ws
    end
  end

end

The code snippet contains most of the amqp server code. The entire code lives inside the EM.run loop. Ignore the Mongo bit as it doesn’t play a part in our area of interest.

@sockets = []
EventMachine.run do
  connection  = AMQP.connect(:host => '127.0.0.1')
  channel = AMQP::Channel.new(connection)
...
 

In this snippet I created a new AMQP connection to localhost. When not provided with any other parameter the default port used by AMQP is 5672 with the username: guest and password :guest. Since the RabbitMQ installation will never (in most cases) have to be accessed from outside the server its safe to run the installation with the defaults.

The second line creates a new channel.

Once the channel is established the EM::Websocket block is used to allow modern browsers to connect to our server on port 8080

EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
    socket_detail = {:socket => ws}
    ws.onopen do
      @sockets < true)
        #AMQP::Consumer.new(channel, queue)

      elsif status[:status] == 'MESSAGE'
        full_message = " #{status[:username]} :  #{status[:message]}"
        exchange.publish(full_message)
      end
    end

    ws.onclose do
      @sockets.delete ws
    end

This is where the core logic lies. On the socket connect block I store the current socket in an array to use it later.

The ‘onmessage’ block is called when a new message arrives from the browser. Since I needed to dynamically create new exchanges and queues I used a very amateurish system. The message sent from the browser arrives with the following parameters in the JSON format

The following snippet is not a part of AMQP but a feature borrowed from XMPP to get my application working

{status:'status', username:'someusername', roomname:'someroomname', message:'somemessage'}

Status could have values from [‘status’, ‘message’, ‘presence’]
Username contains the ‘current_user’ name
Roomname indicates the ‘room’ the user has been logged into.

Status messages indicate that some kind of chore needs to be performed like subscribing to queues, binding exchanges etc.

Messages indicate that the message needs to be pushed to an exchange. When a message is published to an exchange, subscribing queues push the message to the browser using Websockets.

Presence messages indicate the users presence.

Why the entire application was a failure

The most important aspect while using AMQP is identifying the type of exchanges needed. In this case my assumptions on identifying rooms to be exchanges and users to have individual queues proved to be the reason for failure.

The reason why this assumption would fail is if a user queue would bind to multiple exchanges with each user having a unique queue assigned to him, a simple fanout exchange would push messages from different rooms to the user’s logged in chat room which is not what we want. This way users would end up receiving messages that are not relevant.

A ideal approach would have multiple users subscribe to a room (assumed to be the queue in this case) via a topic exchange or direct exchange.

I also believe that it would be logical to dynamically create queues and exchanges using the EM.periodic timer to check a NoSQL database or key-value store for new requests rather than using Websockets.

The AMQP server code is available here

Advertisements