Sep 09 2008

EventMachine MapReduce

MapReduce is a parallel computation strategy useful for scaling large data set processing by distributing workload over multiple worker nodes. The distributed nature of MapReduce suggests network communication and, with that in mind, I thought I'd put together a demonstration employing EventMachine, a library which makes efficient network programming relatively simple in Ruby.

Before going any further, I should mention that the code examples have not been optimized for production use, they only illustrate what's possible. Also, it's worth bringing up two established Ruby libraries for tackling similar problems, Starfish and Skynet. It's advisable that these existing options are investigated before delving into custom alternatives.

MapReduce essentially consists of two steps (although intermediate phases usually need be present for real world implementations), map and reduce. map refers to the higher order function also known as transform or collect and is the operation that is typically distributed and involves a number of nodes performing the transformation of a data set into another set of data. reduce refers to the higher order function, sometimes called fold, inject or other, which is in this case used for collecting the results of map to build a return value.

Counting the number of word occurrences in a large number of documents is one of the examples most commonly used for describing MapReduce. A number of distributed jobs is spawned, splitting document contents into words. The results of these operations are passed to a reduce process whose job is to sum its input.

Map processes can be EventMachine servers. We can have an arbitrary number of those running on a number of physical nodes.

module Map
  def receive_data(path)
    document = File.read(path)
    word_counts = document.split(' ').map { |word| [word, 1] }
    send_data(Marshal.dump(word_counts))
    close_connection_after_writing
  end
end

EM.run {EM.start_server("localhost", 5555, Map)}

A reduce process can send job requests to those servers, receive and process the results.

class Reduce < EM::Connection
  @@all = []

  def initialize(*args)
    super
    @doc, @data = args[0], ''
  end

  def post_init
    send_data(@doc)
  end

  def receive_data(data)
    @data << data
  end

  def unbind
    Reduce.job_completed
    @@all += Marshal.load(@data)
    unless Reduce.pending_jobs?
      groups = @@all.group_by {|word| word[0] }
      groups.each { |g| p "#{g[0]} : #{g[1].size}" }
      EM.stop
    end
  end

  def self.send_map_job(port, doc)
    @job_count ||= 0
    increment_job_count
    EM.connect("localhost", port, Reduce, doc)
  end

  def self.increment_job_count
    @job_count += 1
  end

  def self.pending_jobs?
    @job_count != 0
  end

  def self.job_completed
    @job_count -= 1
  end
end

EM.run do
  {
    5555 => 'docs/america.txt',
    6666 => 'docs/da-vinci.txt'
  }.each { |port, doc| Reduce.send_map_job(port, doc) }
end

The example lacks plumbing code which would make things flexible enough and, as you might have noticed, works on a single node (localhost), but hopefully illustrates a mechanism for distributing workload over a networked farm.