Async Responses in Thin and a Simple Long Polling Server

I’ve been playing with Thin, the Ruby web server which packages the awesomeness of Rack, Mongrel and Eventmachine (\m/). However, the thing that blew me away completely was the James Tucker’s Async Rack that was integrated to Thin. The combination opens a whole new world of realtime responses, something that I’ve been constantly switching to NodeJS and SocketIO for.

Async Rack

A simple rack application would look like this


class MyRackApp
  def call(env)
    [200, {'Content-Type' => 'text/html'}, ["Hello World"]]


# thin start -R -D -p3000

This simple rack app on hitting port 3000 responds with the a status code (200), the content-type and some body. Nothing special. With async rack we’d be able to send the response head back while we build the response. Once the response is built we send the body and close the connection.

A quick glance at Patrick’s Blog (apologies for not being able to find the last name of the author of this blog) should give you an excellent understanding of what I’m trying to explain

A simple async app


AsyncResponse = [-1, {}, []].freeze

class MyAsyncApp
  def call(env) do
      body = [200, {"Content-Type" => "text/html"}, ["<em > This is Async </em >"]]


The AsyncResponse is a constant which returns a -1 status which tells the server that the response will be coming later asynchronously. It, as defined in the examples provided in thin an “Async Template” Async Template

On a request the code initially returns an AsyncResponse while the thread waits on the sleep request. Once the thread is active again it builds the response and sends the response keeping the connection alive.

An async app where we build the response and close the connection

From here on, it would really help to have the thin code open in your text editor. We would be looking at the request.rb, response.rb and the connection.rb methods from /thin folder.

require 'rubygems'

class DeferrableBody
	include EventMachine::Deferrable

	def each &blk
		puts "Blocks: #{blk.inspect}"
		@callback_blk = blk

	def append(data)
		puts " -- appending data --"
		data.each do |data_chunk|
			puts " -- triggering callback --"

class AsyncLife
	AsyncResponse = [-1, {}, []].freeze
	def call(env)
		body =

		EM.next_tick {
			puts "Next tick before async"
			puts "#{env['async.callback']}"
			env['async.callback'].call([200, {'Content-Type' => 'text/plain'}, body])

		puts "-- activated 5 second timer --"
		EM.add_timer(5) {
			puts "--5 second timer done --"
			body.append(['Mary had a little lamb'])

			puts "-- activated 2 second timer --"
				puts "--7 second timer done --"
				body.append(['This is the house that jack built'])
				puts "-- succeed called -- "




This example uses Eventmachine Deferrable which allow us to determine when we’re done building our response. But the tricky part I struggled to get my head around was the strangle looking each method and how it uses the append method to build our responses.

Walking through the code

When the request comes in from the browser the application renders the AsyncResponse constant which tells the server that the response body is going to be built over time. Now on request eventmachine’s ( checkout thin/connection.rb ) post_init method creates a @request and @response object. This triggers a post_process method which on the first call returns without setting the header, status or body and prepares for the asynchronous response.

On next_tick we begin to create our header. We initialize a EM::Deferrable object which is assigned as the response body and this ensures the header is sent ahead of the body (because we don’t have anything to iterate over in the each block where the response is sent) The env[‘async.callback’] is a closure for the method post_process which is created by method(:post_process) checkout the pre_process method in the thin/connection.rb

The each method in our Deferrable class overrides the default each implementation defined by the response object. So now our each block is saved in an instance variable @callback_blk which is call ed when we call the append method. So essentially we are calling send_data on each of the data blocks we’re sending back when we call the append method.

Once that’s done we call succeed which tells Eventmachine to trigger the callback to denote we’re done building the body. It ensures the request response connection is closed.

The default each implementation

 # Send the response      
      @response.each do |chunk|        
        trace { chunk }
        send_data chunk

Thats pretty much what I gathered by going through the code on async response with thin and rack. Another useful module is the thin_async bit written by the creator of thin @macournoyer available here . This pretty much abstracts the trouble of overriding the each block.

Here’s an example of a simple long polling server I built using thin available here

Hope this is helpful

EventMachine: Deferring Time consuming tasks to a separate thread

A small update on my last post which focuses on building a sample twitter feed application using eventmachine and the twitter gem. If you’ve already gone through the documentation and references, especially the introduction to eventmachine PDF, you would have noticed that the reactor is single threaded and the EM methods aren’t thread safe. It also describes a simple example on how to use EM.defer to run time consuming tasks on a separate thread as a background process by using a thread from the thread pool.

Though there isn’t significant benefit in running our twitter feed application’s ‘update fetch’ operation as a background process but for purely illustrative purposes lets make our server perform updates on a separate thread. EM.defer provides a callback option that allows us to fetch values returned by the background thread and return it to the user in the main thread.

Update your event_machine_server.rb file with the following changes and run both the client and server.

The code is also available here

require 'rubygems'
require 'eventmachine'
require 'socket'
require 'json'
require 'hashie'
require File.expand_path("twitter_lib.rb", __FILE__ + "/..")
require File.expand_path("tweet_Fetcher.rb", __FILE__ + "/..")

class Echo < EM::Connection

 attr_reader :data, :response, :status, :fetcher   
 def post_init   
   @status = :inactive 
    ip, port = Socket.unpack_sockaddr_in( get_peername) #peer.methods.inspect
    puts  "Connection Established from #{ip} #{port}"

  def receive_data(data)
    puts "[LOGGING: RECEIVED] #{data}"
    @data = JSON.parse(data)
    puts "[LOGGING: PARSED DATA ] #{@data} #{@data.class.to_s}"

  def unbind
    puts "Connection Lost" + self.inspect
  def respond
  def execute_request
    if @data["op"] == "fetch"
      puts "Please wait while we fetch the data ..."
      @status = :active
      response = @fetcher.fetch      
    elsif @data["op"] == "update"
      puts "Fetching update . . ."
      response = @fetcher.fetch_update
    #  send_data(response.to_json)      
  def self.activate_event_machine(this = nil) do 
        puts "Starting echo server . . . ."
        EM.start_server('', 6789, Echo)
        puts "STARTED "
  def self.activate_periodic_timer(this = nil)
    response = nil
    operations = do     
      response = this.execute_request
    callback ={ this.send_data(response.to_json)}
    EM.add_periodic_timer(2) do
      EM.defer(operations, callback)      
  def update_operation
    @data["op"] = "update"

  def initialize_fetcher
    @fetcher ={ :consumer_key => "xxxxxxxxxxxxxxxx",
                     :consumer_secret => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
                     :oauth_token => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
                     :oauth_token_secret => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" 
   # puts "[LOGGING FETCHER INITIALIZED] #{@fetcher.inspect}"                


=begin do 
      puts "Starting echo server . . . ."
      EM.start_server('', 6789, Echo)
      puts "STARTED "
      EM.add_periodic_timer(5) do
        puts "Timer activated"