Sep 09 2008

EventMachine MapReduce

MapReduce is a parallel computation strategy useful for scaling large data set processing by distributing workload over multiple worker nodes. The distributed nature of MapReduce suggests network communication and, with that in mind, I thought I'd put together a demonstration employing EventMachine, a library which makes efficient network programming relatively simple in Ruby.

Before going any further, I should mention that the code examples have not been optimized for production use, they only illustrate what's possible. Also, it's worth bringing up two established Ruby libraries for tackling similar problems, Starfish and Skynet. It's advisable that these existing options are investigated before delving into custom alternatives.

MapReduce essentially consists of two steps (although intermediate phases usually need be present for real world implementations), map and reduce. map refers to the higher order function also known as transform or collect and is the operation that is typically distributed and involves a number of nodes performing the transformation of a data set into another set of data. reduce refers to the higher order function, sometimes called fold, inject or other, which is in this case used for collecting the results of map to build a return value.

Counting the number of word occurrences in a large number of documents is one of the examples most commonly used for describing MapReduce. A number of distributed jobs is spawned, splitting document contents into words. The results of these operations are passed to a reduce process whose job is to sum its input.

Map processes can be EventMachine servers. We can have an arbitrary number of those running on a number of physical nodes.

module Map
  def receive_data(path)
    document =
    word_counts = document.split(' ').map { |word| [word, 1] }
end {EM.start_server("localhost", 5555, Map)}

A reduce process can send job requests to those servers, receive and process the results.

class Reduce < EM::Connection
  @@all = []
  def initialize(*args)
    @doc, @data = args[0], ''
  def post_init
  def receive_data(data)
    @data << data
  def unbind
    @@all += Marshal.load(@data)
    unless Reduce.pending_jobs?
      groups = @@all.group_by {|word| word[0] }
      groups.each { |g| p "#{g[0]} : #{g[1].size}" }
  def self.send_map_job(port, doc)
    @job_count ||= 0
    EM.connect("localhost", port, Reduce, doc)
  def self.increment_job_count
    @job_count += 1
  def self.pending_jobs?
    @job_count != 0
  def self.job_completed
    @job_count -= 1
end do
    5555 => 'docs/america.txt', 
    6666 => 'docs/da-vinci.txt'
  }.each { |port, doc| Reduce.send_map_job(port, doc) }

The example lacks plumbing code which would make things flexible enough and, as you might have noticed, works on a single node (localhost), but hopefully illustrates a mechanism for distributing workload over a networked farm.