Improving on the original AMQP chat application

Continuing from where my earlier post on AMQP left off, this post describes the changes introduced to make the application more object oriented, shifting from the fanout exchange to direct exchanges and new problems on long polling.

My initial example of the amqp_server would make most Ruby developers cringe. So a quick glance at the AMQP documentation (which is awesome) revealed a simple approach on how to structure the code. The complete source is available here

Here is the revised amqp_server

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'
require 'evma_httpserver'
require File.expand_path('../message_parser.rb', __FILE__)
require File.expand_path('../producer.rb', __FILE__)
require File.expand_path('../worker.rb', __FILE__)
require File.expand_path('../consumer.rb', __FILE__)
require File.expand_path('../http_server.rb', __FILE__)
require File.expand_path('../setup.rb', __FILE__)
require File.expand_path('../socket_manager', __FILE__)

# start the run loop do
  connection = AMQP.connect(:host => '', :port => 5672)
  channel =

  socket_manager =
  EventMachine.start_server('', 8082, Setup, [socket_manager, channel])
# EventMachine.start_server('', 8081, HttpServer, channel)

  puts "---- Server started on 8081 -----"

   EventMachine::WebSocket.start(:host => '', :port => 9000) do |ws|

    ws.onopen do
      puts "EStaiblished......"
      ws.send('Connection open')

     puts ">>>>>>>>#{ws.request["query"]} <<<<<<<<< message[:roomname])

    ws.onclose do
      puts " socket connection closed."
      roomname = ws.request["query"]["roomname"]
      username = ws.request["query"]["username"], ws)

Switching from Fanout to Direct Exchanges

The most significant change is switching from fanout exchanges to direct exchanges. This identifies the room to have a unique queue and is bound to an exchange with a routing_key = room_name. All new messages that arrive are published via the exchange with the same routing key. This works like a charm when working with websockets but won’t be a wise approach when using long polling or simple polling.

Why this works with sockets but is not the best design for polling

In this post we consider each room to have a queue and all messages directed to this room and available from this queue. With websockets its easy to keep track of users connected to this room and on arrival of a message its easy to broadcast this to all the websockets connected on that room.

Keeping track of websockets for a room. socket_manager.rb

class SocketAPI
  def self.api
    @sockets ||= {}

class SocketManager
  attr_accessor :sockets

  def initialize
    @sockets = SocketAPI.api

  def add_socket(roomname, sock)
    puts "=#{roomname}" * 50
    puts  "IN ADD SOCKET"
    puts "=" * 50
    puts "SOCKETS #{SocketAPI.api.inspect}"
    @sockets = SocketAPI.api
    if @sockets["#{roomname}"]
      puts "=" * 50
      puts "SOCKET HASH Exists"
      puts "=" * 50
      socket_array = @sockets["#{roomname}"]
      #puts "=#{roomname.blank?}" * 50
      puts "SOCKET HASH DOES NOT Exists"
      puts "=" * 50
      @sockets[roomname.to_s] = []
      socket_array = @sockets["#{roomname}"]

  def remove_socket(roomname, sock)
    sockets = SocketAPI.api
    sockets["#{roomname}"].delete sock

So all that is needed is identify the associated sockets for a room and push messages from the consumer to the browser. With polling however we would need to use the ‘pull api’ for queues. Heres an example

require "rubygems"
require "amqp" do
  connection = AMQP.connect(:host => '')
  puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."

  channel  =
  queue    = channel.queue("c", :auto_delete => true)
  exchange ="cexchange")

#  queue.subscribe do |payload|
#    puts "Received a message: #{payload}. Disconnecting..."
#    connection.close { EventMachine.stop }
#  end

  queue.bind(exchange, :routing_key => "cratos")

  exchange.publish "Hello, world!", :routing_key => "cratos"
  exchange.publish "Goodbye world", :routing_key => "cratos"

  exchange.publish "Goodbye world", :routing_key =>

  q = channel.queue("c", :auto_delete => true)

  q.status do |message_count, consumer_count|
    messages = message_count
    consumers = consumer_count

    if messages > 0
      0.upto(messages - 1) do
        q.pop { |m, p| puts "#{m} Payload #{p}" }

So we pop off messages from the queue one by one, but in this case we have no information about all the logged in users. So the messages are wiped out from the queue by the first poll that arrives from the members of the room. What this means is that the queue is wiped clean with the first poll updating one user’s browser window with the latest messages while the others see the old messages.

What are the alternatives

Right now it seems like “topic exchanges” would work. Every user could have a queue for each room. So the routing key could be something like “sid.harry_potter” where the period separates the username and the roomname. Maybe this is not the best alternative but the first one I could think of.

Ugly hack

One part of the code which seems like a horribly ugly hack to me is the Setup class which creates our queues. Setup.rb simply defines a basic EventMachine Server which listens on port 8082 for incoming requests. This request arrives when the user creates a new room (in our rails app).

The incoming requests triggers a call to the worker.rb which creates the queues. The other change aren’t that significant other than moving chunks to respective classes.

I still continue to use Websockets but using websockets for production wouldn’t be wise considering its limited adoption. Hence I dabbled with SocketIO and simple XHR-polling.

Though my current link to the source does not contain my experiments with Long Polling I would like to briefly go into the problems I faced.

Some of the basic issues would be violating the same origin policy and the easiest solution that I could come up with was to use Apache proxying.

   ServerName localhost
   DocumentRoot /etc/apache2/www/trackertalk/public/    
   ErrorLog "/private/var/log/apache2/"
   CustomLog "/private/var/log/apache2/" common
   ProxyPass /poll http://a.localhost:8081
   ProxyPassReverse	/poll http://a.localhost:8081
   RailsEnv development	

   ServerName a.localhost
   ProxyRequests Off
   ProxyPass / http://localhost:8081
   ProxyPassReverse / http://localhost:8081

All that is needed is to forward all requests that arrive to localhost/poll to a.localhost:8081 where our http_server would live. The http_server is another http_server that uses EM::Http_Server on port 8082 to handle incoming requests. It would need to be able to handle incoming requests (like a poll to check for messages in the relevant queue and forward it back to the client or a new message and add it to the correct queue.) and provide
the required reponse.

If you know or have built applications using AMQP and Ruby, I would love to know more about how production ready applications are structured and built. Any suggestions, corrections or feedback on my code would be awesome