May 04 2008

Distributed programming with Jabber and EventMachine

Jabber and its underlying protocol XMPP are typically associated with instant messaging applications, although the breadth and flexibility of the technology allows for implementations that can span further from traditional online chatting.

ejabberd is a fault tolerant and clusterable Jabber/XMPP server written in Erlang and presents an interesting option as a simple, lightweight and scalable message transport for distributed applications.

EventMachine is a simple and fast library for lightweight concurrency in Ruby. Its use mainly involves, but is not limited to, spawning lightweight processes whose execution can be programatically scheduled, easy and fast socket abstractions and an implementation of the Deferrable pattern as introduced by the Twisted event-driven Python networking engine.

When a Ruby class includes the EventMachine::Deferrable module, it is provided with the ability to accept arbitrary callbacks and errbacks that will get executed when its deferred status changes, in particular when it is set to either :succeeded or :failed. Let's look at a deferrable Worker class which performs a potentially long running operation.

class Worker
  include EM::Deferrable

  def heavy_lifting
    30.times do |i|
      puts "Lifted #{i}"
      sleep 0.1
    end
    set_deferred_status :succeeded
  end
end

Inside an EventMachine loop, we can add callbacks to a Worker instance and dispatch the expensive operation to a separate thread, or an evented process. The program's execution will continue, with any callbacks attached to Worker executed once its deferred status is set.

EM.run do
  worker = Worker.new
  worker.callback {p "done!"}
  Thread.new {worker.heavy_lifting; EM.stop}
  puts "resuming remaining program operations"
end

Now, let's look at combining Worker with Jabber to trigger long running jobs. For Jabber server duties, I am using ejabberd on an old laptop running Debian, but there's no reason why a mass online Jabber service like Google Talk could not be used for playing around with the example. Also, I'm using the xmpp4r-simple Ruby library, which is a wrapper around xmpp4r.

jabber = Jabber::Simple.new("bot@thrash", "password")
at_exit{jabber.status(:away, "jabot down")}

EM.run do
  EM::PeriodicTimer.new(1) do
    jabber.received_messages do |message|
      case message.body
      when "exit" : EM.stop
      when "lift" :
        EM.spawn do
          worker = Worker.new
          worker.callback {jabber.deliver(message.from, "Done lifting")}
          worker.heavy_lifting
        end.notify
        jabber.deliver(message.from, "Scheduled heavy job...")
      else jabber.deliver(message.from, "Dunno how to #{message.body}")
      end
    end
  end
end

Inside an EventMachine loop, we check for new messages every second. The program understands two commands, exit and lift. The first quits the EventMachine loop and ultimately terminates the program's execution. When lift is received, we instantiate a new Worker inside a spawned process and add a callback so that the Worker will notify the command issuer when the job has completed. Worth noting is the use of notify to schedule the spawned process. notify returns immediately making work dispatch non-blocking - upon issuing a lift command twice, a "Scheduled heavy job..." message will be sent to the job issuer twice before the first job completes.

I use Adium to send commands to the program - an interesting way of remote controlling or interacting with applications. Of course, the real interest lies in using the setup under discussion for inter-app communication. With multicast options, presence discovery, node status updates and more, there is lot to explore in terms of distributed application development, if simple and lightweight are two keywords to be found on the highest ranks of your list.