Improving on the original AMQP chat application

Continuing from where my earlier post on AMQP left off, this post describes the changes introduced to make the application more object oriented, shifting from the fanout exchange to direct exchanges and new problems on long polling.

My initial example of the amqp_server would make most Ruby developers cringe. So a quick glance at the AMQP documentation (which is awesome) revealed a simple approach on how to structure the code. The complete source is available here

Here is the revised amqp_server

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'
require 'evma_httpserver'
require File.expand_path('../message_parser.rb', __FILE__)
require File.expand_path('../producer.rb', __FILE__)
require File.expand_path('../worker.rb', __FILE__)
require File.expand_path('../consumer.rb', __FILE__)
require File.expand_path('../http_server.rb', __FILE__)
require File.expand_path('../setup.rb', __FILE__)
require File.expand_path('../socket_manager', __FILE__)


# start the run loop
EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1', :port => 5672)
  channel = AMQP::Channel.new(connection)

  socket_manager = SocketManager.new
  EventMachine.start_server('127.0.0.1', 8082, Setup, [socket_manager, channel])
# EventMachine.start_server('127.0.0.1', 8081, HttpServer, channel)

  puts "---- Server started on 8081 -----"


   EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 9000) do |ws|

    ws.onopen do
      puts "EStaiblished......"
      ws.send('Connection open')

     puts ">>>>>>>>#{ws.request["query"]} <<<<<<<<< message[:roomname])
    end

    ws.onclose do
      puts " socket connection closed."
      roomname = ws.request["query"]["roomname"]
      username = ws.request["query"]["username"]
      SocketManager.new().remove_socket(roomname, ws)
    end
  end
end

Switching from Fanout to Direct Exchanges

The most significant change is switching from fanout exchanges to direct exchanges. This identifies the room to have a unique queue and is bound to an exchange with a routing_key = room_name. All new messages that arrive are published via the exchange with the same routing key. This works like a charm when working with websockets but won’t be a wise approach when using long polling or simple polling.

Why this works with sockets but is not the best design for polling

In this post we consider each room to have a queue and all messages directed to this room and available from this queue. With websockets its easy to keep track of users connected to this room and on arrival of a message its easy to broadcast this to all the websockets connected on that room.

Keeping track of websockets for a room. socket_manager.rb

class SocketAPI
  def self.api
    @sockets ||= {}
  end
end

class SocketManager
  attr_accessor :sockets

  def initialize
    @sockets = SocketAPI.api
  end

  def add_socket(roomname, sock)
    puts "=#{roomname}" * 50
    puts  "IN ADD SOCKET"
    puts "=" * 50
    puts "SOCKETS #{SocketAPI.api.inspect}"
    @sockets = SocketAPI.api
    if @sockets["#{roomname}"]
      puts "=" * 50
      puts "SOCKET HASH Exists"
      puts "=" * 50
      socket_array = @sockets["#{roomname}"]
      socket_array.push(sock)
    else
      #puts "=#{roomname.blank?}" * 50
      puts "SOCKET HASH DOES NOT Exists"
      puts "=" * 50
      @sockets[roomname.to_s] = []
      socket_array = @sockets["#{roomname}"]
      socket_array.push(sock)
    end
  end

  def remove_socket(roomname, sock)
    sockets = SocketAPI.api
    sockets["#{roomname}"].delete sock
  end
end

So all that is needed is identify the associated sockets for a room and push messages from the consumer to the browser. With polling however we would need to use the ‘pull api’ for queues. Heres an example


require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."

  channel  = AMQP::Channel.new(connection)
  queue    = channel.queue("c", :auto_delete => true)
  exchange = channel.direct("cexchange")

#  queue.subscribe do |payload|
#    puts "Received a message: #{payload}. Disconnecting..."
#    connection.close { EventMachine.stop }
#  end

  queue.bind(exchange, :routing_key => "cratos")


  exchange.publish "Hello, world!", :routing_key => "cratos"
  exchange.publish "Goodbye world", :routing_key => "cratos"

  exchange.publish "Goodbye world", :routing_key => queue.name


  q = channel.queue("c", :auto_delete => true)


  q.status do |message_count, consumer_count|
    messages = message_count
    consumers = consumer_count

    if messages > 0
      0.upto(messages - 1) do
        q.pop { |m, p| puts "#{m} Payload #{p}" }
      end
    end
  end
end

So we pop off messages from the queue one by one, but in this case we have no information about all the logged in users. So the messages are wiped out from the queue by the first poll that arrives from the members of the room. What this means is that the queue is wiped clean with the first poll updating one user’s browser window with the latest messages while the others see the old messages.

What are the alternatives

Right now it seems like “topic exchanges” would work. Every user could have a queue for each room. So the routing key could be something like “sid.harry_potter” where the period separates the username and the roomname. Maybe this is not the best alternative but the first one I could think of.

Ugly hack

One part of the code which seems like a horribly ugly hack to me is the Setup class which creates our queues. Setup.rb simply defines a basic EventMachine Server which listens on port 8082 for incoming requests. This request arrives when the user creates a new room (in our rails app).

The incoming requests triggers a call to the worker.rb which creates the queues. The other change aren’t that significant other than moving chunks to respective classes.

I still continue to use Websockets but using websockets for production wouldn’t be wise considering its limited adoption. Hence I dabbled with SocketIO and simple XHR-polling.

Though my current link to the source does not contain my experiments with Long Polling I would like to briefly go into the problems I faced.

Some of the basic issues would be violating the same origin policy and the easiest solution that I could come up with was to use Apache proxying.



   ServerName localhost
   DocumentRoot /etc/apache2/www/trackertalk/public/    
   ErrorLog "/private/var/log/apache2/dummy-host.example.com-error_log"
   CustomLog "/private/var/log/apache2/dummy-host.example.com-access_log" common
   ProxyPass /poll http://a.localhost:8081
   ProxyPassReverse	/poll http://a.localhost:8081
   RailsEnv development	



   ServerName a.localhost
   ProxyRequests Off
   ProxyPass / http://localhost:8081
   ProxyPassReverse / http://localhost:8081


All that is needed is to forward all requests that arrive to localhost/poll to a.localhost:8081 where our http_server would live. The http_server is another http_server that uses EM::Http_Server on port 8082 to handle incoming requests. It would need to be able to handle incoming requests (like a poll to check for messages in the relevant queue and forward it back to the client or a new message and add it to the correct queue.) and provide
the required reponse.

If you know or have built applications using AMQP and Ruby, I would love to know more about how production ready applications are structured and built. Any suggestions, corrections or feedback on my code would be awesome

Advertisements

RabbitMQ and Rails. Why my chat application failed.

Over the last few weeks I’ve been playing with RabbitMQ and Rails have been completely blown away by how awesome it is.

I started out to build a simple chat application which would allow users to create a room and log into the room and post messages to multiple users who were logged in. Though, this is probably not the best use case for using AMQP but it helped me understand how AMQP worked and maybe I’ll get to use somewhere more apt.

RabbitMQ

What is RabbitMQ
RabbitMQ is an open source, highly scalable messaging system which provides support for STOMP, SMTP and HTTP messaging. [www.rabbitmq.com]

It’s built using the Advanced Message Queuing protocol (AMQP) which provides APIs in Ruby and several other programming languages.

Installation

The easiest way to install RabbitMQ on the Mac is using homebrew. The Macports installation has many issues and I would recommend going with HomeBrew. It also takes care of all dependencies like the correct Erlang installation etc.

You can test your installation by moving into the installation ‘bin’ directory and running rabbitmq-server

If you end up seeing something like

 ...
 starting error log relay                                              ...done
 starting networking                                                   ...done
 starting direct_client                                                ...done
 starting notify cluster nodes 
 broker running

your good to go.

Creating our AMQP server

In Ruby we could run our AMQP server using the ‘amqp gem’ along with EventMachine.

The AMQP server code.

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'

....


@sockets = []
EventMachine.run do
  connection  = AMQP.connect(:host => '127.0.0.1')
  channel = AMQP::Channel.new(connection)
  puts "Connected to AMQP broker. #{AMQP::VERSION} "
  mongo = MongoManager.establish_connection("trackertalk_development")

  EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
    socket_detail = {:socket => ws}
    ws.onopen do
      @sockets < true)
        #AMQP::Consumer.new(channel, queue)

      elsif status[:status] == 'MESSAGE'
        full_message = " #{status[:username]} :  #{status[:message]}"
        exchange.publish(full_message)
      end
    end

    ws.onclose do
      @sockets.delete ws
    end
  end

end

The code snippet contains most of the amqp server code. The entire code lives inside the EM.run loop. Ignore the Mongo bit as it doesn’t play a part in our area of interest.

@sockets = []
EventMachine.run do
  connection  = AMQP.connect(:host => '127.0.0.1')
  channel = AMQP::Channel.new(connection)
...
 

In this snippet I created a new AMQP connection to localhost. When not provided with any other parameter the default port used by AMQP is 5672 with the username: guest and password :guest. Since the RabbitMQ installation will never (in most cases) have to be accessed from outside the server its safe to run the installation with the defaults.

The second line creates a new channel.

Once the channel is established the EM::Websocket block is used to allow modern browsers to connect to our server on port 8080

EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
    socket_detail = {:socket => ws}
    ws.onopen do
      @sockets < true)
        #AMQP::Consumer.new(channel, queue)

      elsif status[:status] == 'MESSAGE'
        full_message = " #{status[:username]} :  #{status[:message]}"
        exchange.publish(full_message)
      end
    end

    ws.onclose do
      @sockets.delete ws
    end

This is where the core logic lies. On the socket connect block I store the current socket in an array to use it later.

The ‘onmessage’ block is called when a new message arrives from the browser. Since I needed to dynamically create new exchanges and queues I used a very amateurish system. The message sent from the browser arrives with the following parameters in the JSON format

The following snippet is not a part of AMQP but a feature borrowed from XMPP to get my application working

{status:'status', username:'someusername', roomname:'someroomname', message:'somemessage'}

Status could have values from [‘status’, ‘message’, ‘presence’]
Username contains the ‘current_user’ name
Roomname indicates the ‘room’ the user has been logged into.

Status messages indicate that some kind of chore needs to be performed like subscribing to queues, binding exchanges etc.

Messages indicate that the message needs to be pushed to an exchange. When a message is published to an exchange, subscribing queues push the message to the browser using Websockets.

Presence messages indicate the users presence.

Why the entire application was a failure

The most important aspect while using AMQP is identifying the type of exchanges needed. In this case my assumptions on identifying rooms to be exchanges and users to have individual queues proved to be the reason for failure.

The reason why this assumption would fail is if a user queue would bind to multiple exchanges with each user having a unique queue assigned to him, a simple fanout exchange would push messages from different rooms to the user’s logged in chat room which is not what we want. This way users would end up receiving messages that are not relevant.

A ideal approach would have multiple users subscribe to a room (assumed to be the queue in this case) via a topic exchange or direct exchange.

I also believe that it would be logical to dynamically create queues and exchanges using the EM.periodic timer to check a NoSQL database or key-value store for new requests rather than using Websockets.

The AMQP server code is available here