A small update on my last post which focuses on building a sample twitter feed application using eventmachine and the twitter gem. If you’ve already gone through the documentation and references, especially the introduction to eventmachine PDF, you would have noticed that the reactor is single threaded and the EM methods aren’t thread safe. It also describes a simple example on how to use EM.defer to run time consuming tasks on a separate thread as a background process by using a thread from the thread pool.
Though there isn’t significant benefit in running our twitter feed application’s ‘update fetch’ operation as a background process but for purely illustrative purposes lets make our server perform updates on a separate thread. EM.defer provides a callback option that allows us to fetch values returned by the background thread and return it to the user in the main thread.
Update your event_machine_server.rb file with the following changes and run both the client and server.
The code is also available here
require 'rubygems' require 'eventmachine' require 'socket' require 'json' require 'hashie' require File.expand_path("twitter_lib.rb", __FILE__ + "/..") require File.expand_path("tweet_Fetcher.rb", __FILE__ + "/..") class Echo < EM::Connection attr_reader :data, :response, :status, :fetcher def post_init @status = :inactive ip, port = Socket.unpack_sockaddr_in( get_peername) #peer.methods.inspect puts "Connection Established from #{ip} #{port}" end def receive_data(data) puts "[LOGGING: RECEIVED] #{data}" @data = JSON.parse(data) puts "[LOGGING: PARSED DATA ] #{@data} #{@data.class.to_s}" initialize_fetcher execute_request end def unbind puts "Connection Lost" + self.inspect end def respond send_data(@response) end def execute_request if @data["op"] == "fetch" puts "Please wait while we fetch the data ..." @status = :active response = @fetcher.fetch send_data(response.to_json) Echo.activate_periodic_timer(self) elsif @data["op"] == "update" puts "Fetching update . . ." response = @fetcher.fetch_update # send_data(response.to_json) end end def self.activate_event_machine(this = nil) EM.run do puts "Starting echo server . . . ." EM.start_server('0.0.0.0', 6789, Echo) puts "STARTED " end end def self.activate_periodic_timer(this = nil) response = nil operations = Proc.new do this.update_operation response = this.execute_request end callback = Proc.new{ this.send_data(response.to_json)} EM.add_periodic_timer(2) do EM.defer(operations, callback) end end def update_operation @data["op"] = "update" end private def initialize_fetcher @fetcher = Fetcher.new({ :consumer_key => "xxxxxxxxxxxxxxxx", :consumer_secret => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", :oauth_token => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", :oauth_token_secret => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }) # puts "[LOGGING FETCHER INITIALIZED] #{@fetcher.inspect}" end end Echo.activate_event_machine =begin EM.run do puts "Starting echo server . . . ." EM.start_server('0.0.0.0', 6789, Echo) puts "STARTED " EM.add_periodic_timer(5) do puts "Timer activated" end end =end