EventMachine MapReduce
MapReduce is a parallel computation strategy useful for scaling large data set processing by distributing workload over multiple worker nodes. The distributed nature of MapReduce suggests network communication and, with that in mind, I thought I'd put together a demonstration employing EventMachine, a library which makes efficient network programming relatively simple in Ruby.
Before going any further, I should mention that the code examples have not been optimized for production use, they only illustrate what's possible. Also, it's worth bringing up two established Ruby libraries for tackling similar problems, Starfish and Skynet. It's advisable that these existing options are investigated before delving into custom alternatives.
MapReduce essentially consists of two steps (although intermediate phases usually need be present for real world implementations), map and reduce. map refers to the higher order function also known as transform or collect and is the operation that is typically distributed and involves a number of nodes performing the transformation of a data set into another set of data. reduce refers to the higher order function, sometimes called fold, inject or other, which is in this case used for collecting the results of map to build a return value.
Counting the number of word occurrences in a large number of documents is one of the examples most commonly used for describing MapReduce. A number of distributed jobs is spawned, splitting document contents into words. The results of these operations are passed to a reduce process whose job is to sum its input.
Map processes can be EventMachine servers. We can have an arbitrary number of those running on a number of physical nodes.
module Map def receive_data(path) document = File.read(path) word_counts = document.split(' ').map { |word| [word, 1] } send_data(Marshal.dump(word_counts)) close_connection_after_writing end end EM.run {EM.start_server("localhost", 5555, Map)}
A reduce process can send job requests to those servers, receive and process the results.
class Reduce < EM::Connection @@all = [] def initialize(*args) super @doc, @data = args[0], '' end def post_init send_data(@doc) end def receive_data(data) @data << data end def unbind Reduce.job_completed @@all += Marshal.load(@data) unless Reduce.pending_jobs? groups = @@all.group_by {|word| word[0] } groups.each { |g| p "#{g[0]} : #{g[1].size}" } EM.stop end end def self.send_map_job(port, doc) @job_count ||= 0 increment_job_count EM.connect("localhost", port, Reduce, doc) end def self.increment_job_count @job_count += 1 end def self.pending_jobs? @job_count != 0 end def self.job_completed @job_count -= 1 end end EM.run do { 5555 => 'docs/america.txt', 6666 => 'docs/da-vinci.txt' }.each { |port, doc| Reduce.send_map_job(port, doc) } end
The example lacks plumbing code which would make things flexible enough and, as you might have noticed, works on a single node (localhost), but hopefully illustrates a mechanism for distributing workload over a networked farm.