EventMachine MapReduce
George Malamidis, September 9th, 2008MapReduce is a parallel computation strategy useful for scaling large data set processing by distributing workload over multiple worker nodes. The distributed nature of MapReduce suggests network communication and, with that in mind, I thought I’d put together a demonstration employing EventMachine, a library which makes efficient network programming relatively simple in Ruby.
Before going any further, I should mention that the code examples have not been optimized for production use, they only illustrate what’s possible. Also, it’s worth bringing up two established Ruby libraries for tackling similar problems, Starfish and Skynet. It’s advisable that these existing options are investigated before delving into custom alternatives.
MapReduce essentially consists of two steps (although intermediate phases usually need be present for real world implementations), map and reduce. map refers to the higher order function also known as transform or collect and is the operation that is typically distributed and involves a number of nodes performing the transformation of a data set into another set of data. reduce refers to the higher order function, sometimes called fold, inject or other, which is in this case used for collecting the results of map to build a return value.
Counting the number of word occurrences in a large number of documents is one of the examples most commonly used for describing MapReduce. A number of distributed jobs is spawned, splitting document contents into words. The results of these operations are passed to a reduce process whose job is to sum its input.
Map processes can be EventMachine servers. We can have an arbitrary number of those running on a number of physical nodes.
module Map
def receive_data(path)
document = File.read(path)
word_counts = document.split(' ').map { |word| [word, 1] }
send_data(Marshal.dump(word_counts))
close_connection_after_writing
end
end
EM.run {EM.start_server("localhost", 5555, Map)}
A reduce process can send job requests to those servers, receive and process the results.
class Reduce < EM::Connection
@@all = []
def initialize(*args)
super
@doc, @data = args[0], ''
end
def post_init
send_data(@doc)
end
def receive_data(data)
@data << data
end
def unbind
Reduce.job_completed
@@all += Marshal.load(@data)
unless Reduce.pending_jobs?
groups = @@all.group_by {|word| word[0] }
groups.each { |g| p "#{g[0]} : #{g[1].size}" }
EM.stop
end
end
def self.send_map_job(port, doc)
@job_count ||= 0
increment_job_count
EM.connect("localhost", port, Reduce, doc)
end
def self.increment_job_count
@job_count += 1
end
def self.pending_jobs?
@job_count != 0
end
def self.job_completed
@job_count -= 1
end
end
EM.run do
{
5555 => 'docs/america.txt',
6666 => 'docs/da-vinci.txt'
}.each { |port, doc| Reduce.send_map_job(port, doc) }
end
The example lacks plumbing code which would make things flexible enough and, as you might have noticed, works on a single node (localhost), but hopefully illustrates a mechanism for distributing workload over a networked farm.

September 13th, 2008 at 9:13 am
EventMachine MapReduce…
[...]MapReduce is a parallel computation strategy useful for scaling large data set processing by distributing workload over multiple worker nodes. The distributed nature of MapReduce suggests network communication and, with that in mind, I thought I…
September 19th, 2008 at 6:16 am
[...] EventMachine MapReduce [...]