Improving on the original AMQP chat application

Continuing from where my earlier post on AMQP left off, this post describes the changes introduced to make the application more object oriented, shifting from the fanout exchange to direct exchanges and new problems on long polling.

My initial example of the amqp_server would make most Ruby developers cringe. So a quick glance at the AMQP documentation (which is awesome) revealed a simple approach on how to structure the code. The complete source is available here

Here is the revised amqp_server

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'
require 'evma_httpserver'
require File.expand_path('../message_parser.rb', __FILE__)
require File.expand_path('../producer.rb', __FILE__)
require File.expand_path('../worker.rb', __FILE__)
require File.expand_path('../consumer.rb', __FILE__)
require File.expand_path('../http_server.rb', __FILE__)
require File.expand_path('../setup.rb', __FILE__)
require File.expand_path('../socket_manager', __FILE__)


# start the run loop
EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1', :port => 5672)
  channel = AMQP::Channel.new(connection)

  socket_manager = SocketManager.new
  EventMachine.start_server('127.0.0.1', 8082, Setup, [socket_manager, channel])
# EventMachine.start_server('127.0.0.1', 8081, HttpServer, channel)

  puts "---- Server started on 8081 -----"


   EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 9000) do |ws|

    ws.onopen do
      puts "EStaiblished......"
      ws.send('Connection open')

     puts ">>>>>>>>#{ws.request["query"]} <<<<<<<<< message[:roomname])
    end

    ws.onclose do
      puts " socket connection closed."
      roomname = ws.request["query"]["roomname"]
      username = ws.request["query"]["username"]
      SocketManager.new().remove_socket(roomname, ws)
    end
  end
end

Switching from Fanout to Direct Exchanges

The most significant change is switching from fanout exchanges to direct exchanges. This identifies the room to have a unique queue and is bound to an exchange with a routing_key = room_name. All new messages that arrive are published via the exchange with the same routing key. This works like a charm when working with websockets but won’t be a wise approach when using long polling or simple polling.

Why this works with sockets but is not the best design for polling

In this post we consider each room to have a queue and all messages directed to this room and available from this queue. With websockets its easy to keep track of users connected to this room and on arrival of a message its easy to broadcast this to all the websockets connected on that room.

Keeping track of websockets for a room. socket_manager.rb

class SocketAPI
  def self.api
    @sockets ||= {}
  end
end

class SocketManager
  attr_accessor :sockets

  def initialize
    @sockets = SocketAPI.api
  end

  def add_socket(roomname, sock)
    puts "=#{roomname}" * 50
    puts  "IN ADD SOCKET"
    puts "=" * 50
    puts "SOCKETS #{SocketAPI.api.inspect}"
    @sockets = SocketAPI.api
    if @sockets["#{roomname}"]
      puts "=" * 50
      puts "SOCKET HASH Exists"
      puts "=" * 50
      socket_array = @sockets["#{roomname}"]
      socket_array.push(sock)
    else
      #puts "=#{roomname.blank?}" * 50
      puts "SOCKET HASH DOES NOT Exists"
      puts "=" * 50
      @sockets[roomname.to_s] = []
      socket_array = @sockets["#{roomname}"]
      socket_array.push(sock)
    end
  end

  def remove_socket(roomname, sock)
    sockets = SocketAPI.api
    sockets["#{roomname}"].delete sock
  end
end

So all that is needed is identify the associated sockets for a room and push messages from the consumer to the browser. With polling however we would need to use the ‘pull api’ for queues. Heres an example


require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."

  channel  = AMQP::Channel.new(connection)
  queue    = channel.queue("c", :auto_delete => true)
  exchange = channel.direct("cexchange")

#  queue.subscribe do |payload|
#    puts "Received a message: #{payload}. Disconnecting..."
#    connection.close { EventMachine.stop }
#  end

  queue.bind(exchange, :routing_key => "cratos")


  exchange.publish "Hello, world!", :routing_key => "cratos"
  exchange.publish "Goodbye world", :routing_key => "cratos"

  exchange.publish "Goodbye world", :routing_key => queue.name


  q = channel.queue("c", :auto_delete => true)


  q.status do |message_count, consumer_count|
    messages = message_count
    consumers = consumer_count

    if messages > 0
      0.upto(messages - 1) do
        q.pop { |m, p| puts "#{m} Payload #{p}" }
      end
    end
  end
end

So we pop off messages from the queue one by one, but in this case we have no information about all the logged in users. So the messages are wiped out from the queue by the first poll that arrives from the members of the room. What this means is that the queue is wiped clean with the first poll updating one user’s browser window with the latest messages while the others see the old messages.

What are the alternatives

Right now it seems like “topic exchanges” would work. Every user could have a queue for each room. So the routing key could be something like “sid.harry_potter” where the period separates the username and the roomname. Maybe this is not the best alternative but the first one I could think of.

Ugly hack

One part of the code which seems like a horribly ugly hack to me is the Setup class which creates our queues. Setup.rb simply defines a basic EventMachine Server which listens on port 8082 for incoming requests. This request arrives when the user creates a new room (in our rails app).

The incoming requests triggers a call to the worker.rb which creates the queues. The other change aren’t that significant other than moving chunks to respective classes.

I still continue to use Websockets but using websockets for production wouldn’t be wise considering its limited adoption. Hence I dabbled with SocketIO and simple XHR-polling.

Though my current link to the source does not contain my experiments with Long Polling I would like to briefly go into the problems I faced.

Some of the basic issues would be violating the same origin policy and the easiest solution that I could come up with was to use Apache proxying.



   ServerName localhost
   DocumentRoot /etc/apache2/www/trackertalk/public/    
   ErrorLog "/private/var/log/apache2/dummy-host.example.com-error_log"
   CustomLog "/private/var/log/apache2/dummy-host.example.com-access_log" common
   ProxyPass /poll http://a.localhost:8081
   ProxyPassReverse	/poll http://a.localhost:8081
   RailsEnv development	



   ServerName a.localhost
   ProxyRequests Off
   ProxyPass / http://localhost:8081
   ProxyPassReverse / http://localhost:8081


All that is needed is to forward all requests that arrive to localhost/poll to a.localhost:8081 where our http_server would live. The http_server is another http_server that uses EM::Http_Server on port 8082 to handle incoming requests. It would need to be able to handle incoming requests (like a poll to check for messages in the relevant queue and forward it back to the client or a new message and add it to the correct queue.) and provide
the required reponse.

If you know or have built applications using AMQP and Ruby, I would love to know more about how production ready applications are structured and built. Any suggestions, corrections or feedback on my code would be awesome

About these ads

One thought on “Improving on the original AMQP chat application

  1. seems like your setup.rb never get executed ?
    it never prints the message in the puts statements

    Can you please give some pointers ?
    I am also facing the problem that messages are never pushed to browser .

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s