EventMachine: Deferring Time consuming tasks to a separate thread

A small update on my last post which focuses on building a sample twitter feed application using eventmachine and the twitter gem. If you’ve already gone through the documentation and references, especially the introduction to eventmachine PDF, you would have noticed that the reactor is single threaded and the EM methods aren’t thread safe. It also describes a simple example on how to use EM.defer to run time consuming tasks on a separate thread as a background process by using a thread from the thread pool.

Though there isn’t significant benefit in running our twitter feed application’s ‘update fetch’ operation as a background process but for purely illustrative purposes lets make our server perform updates on a separate thread. EM.defer provides a callback option that allows us to fetch values returned by the background thread and return it to the user in the main thread.

Update your event_machine_server.rb file with the following changes and run both the client and server.

The code is also available here

 
require 'rubygems'
require 'eventmachine'
require 'socket'
require 'json'
require 'hashie'
require File.expand_path("twitter_lib.rb", __FILE__ + "/..")
require File.expand_path("tweet_Fetcher.rb", __FILE__ + "/..")

class Echo < EM::Connection

 attr_reader :data, :response, :status, :fetcher   
 
 def post_init   
   @status = :inactive 
    ip, port = Socket.unpack_sockaddr_in( get_peername) #peer.methods.inspect
    puts  "Connection Established from #{ip} #{port}"
  end

  def receive_data(data)
    puts "[LOGGING: RECEIVED] #{data}"
    @data = JSON.parse(data)
    puts "[LOGGING: PARSED DATA ] #{@data} #{@data.class.to_s}"
    initialize_fetcher
    execute_request
    
  end

  def unbind
    puts "Connection Lost" + self.inspect
  end
  
  def respond
    send_data(@response)
  end
  
  def execute_request
    if @data["op"] == "fetch"
      puts "Please wait while we fetch the data ..."
      @status = :active
      response = @fetcher.fetch      
      send_data(response.to_json)
      Echo.activate_periodic_timer(self)    
    elsif @data["op"] == "update"
      puts "Fetching update . . ."
      response = @fetcher.fetch_update
    #  send_data(response.to_json)      
    end
  end
  
  def self.activate_event_machine(this = nil)
    EM.run do 
        puts "Starting echo server . . . ."
        EM.start_server('0.0.0.0', 6789, Echo)
        puts "STARTED "
    end    
  end
  
  def self.activate_periodic_timer(this = nil)
    response = nil
    operations = Proc.new do     
      this.update_operation
      response = this.execute_request
    end
    
    callback = Proc.new{ this.send_data(response.to_json)}
    
    EM.add_periodic_timer(2) do
      EM.defer(operations, callback)      
    end    
  end
  
  def update_operation
    @data["op"] = "update"
  end
  
  private  

  def initialize_fetcher
    @fetcher =  Fetcher.new({ :consumer_key => "xxxxxxxxxxxxxxxx",
                     :consumer_secret => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
                     :oauth_token => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
                     :oauth_token_secret => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" 
                    })
                    
   # puts "[LOGGING FETCHER INITIALIZED] #{@fetcher.inspect}"                
  end
  
end

Echo.activate_event_machine

=begin
  EM.run do 
      puts "Starting echo server . . . ."
      EM.start_server('0.0.0.0', 6789, Echo)
      puts "STARTED "
    
      EM.add_periodic_timer(5) do
        puts "Timer activated"
      end    
  end
=end




Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s