Archive for September, 2008

Efficient data imports

Tuesday, September 23rd, 2008

An application’s performance is affected, among other things, by the performance of its parts. A large number of current applications contain a database layer which I’ve noticed become neglected more often than it deserves. This is unfortunate because there are a lot of quick performance victories that can be achieved by harnessing a database’s strong points.

Let’s think of an application which periodically collects large amounts of data, adapts it from a foreign structure into its native domain and stores the results in a database for further use. Data units must be unique, something we need to enforce each time a new import takes place.

One way of achieving this would be to construct domain native objects or structures by parsing the external data feeds and check against the existence of duplicates in the database, using a custom hashcode identity mechanism. We can store the hashcode values in a UNIQUE database column to ensure data integrity.

DATA.each {|e| DB[:entries] << e rescue nil}

This code iterates over the adapted object enumeration and attempts a database insert for each entry, ignoring any exceptions due to uniqueness violations. It also introduces the significant overhead of performing a number of database queries equal to the number of entries included in the imported collection.

Bulk inserts are nothing new and most, if not all, modern databases offer this functionality, which is also supported by the majority of database access application libraries. Ruby’s Sequel, for instance, allows bulk insert operations with the multi_insert method.

DB[:entries].multi_insert(DATA)

There’s a caveat here, as this operation will terminate the moment a duplicate entry violation error occurs. MySQL offers the INSERT IGNORE construct which is particularly useful in this scenario. Using the IGNORE keyword will cause errors that occur while executing the INSERT statement to be treated as warnings.

Looking to investigate the performance boost associated with the above technique, I’ve put together a small extension for Sequel, enabling the toolkit to make use of INSERT IGNORE.

module InsertIgnore
  def ignore_duplicates!
    @ignore = true
    self
  end

  def multi_insert_sql(columns, values)
    columns = column_list(columns)
    values = values.map {|r| literal(Array(r))}.join(Sequel::MySQL::Dataset::COMMA_SEPARATOR)
    ignore = @ignore ? " IGNORE " : ' '
    ["INSERT#{ignore}INTO #{source_list(@opts[:from])} (#{columns}) VALUES #{values}"]
  end
end

This can be used like this:

Sequel::MySQL::Dataset.send(:include, InsertIgnore)
DB[:entries].ignore_duplicates!.multi_insert(DATA)

Inserting 100,000 records, some of them duplicates, using the application loop approach which issues an insert query for each entry took about 49 seconds on my laptop. Its INSERT IGNORE counterpart took about 4 seconds.

There are things to watch out for when using the latter approach. We can potentially construct very large queries, depending on the number of records we intend to insert. MySQL sets the maximum length of packets with the max_allowed_packet system variable which defaults to 1 kilobyte and can be increased up to 1 gigabyte. Loading such large datasets in memory can prove problematic, so slicing the import in chunks is probably a good idea.

In like manner, it’s worth mentioning MySQL’s ON DUPLICATE KEY UPDATE, which updates an existing column subsequent to a failed insert due to a duplicate value violation.

EventMachine MapReduce

Tuesday, September 9th, 2008

MapReduce is a parallel computation strategy useful for scaling large data set processing by distributing workload over multiple worker nodes. The distributed nature of MapReduce suggests network communication and, with that in mind, I thought I’d put together a demonstration employing EventMachine, a library which makes efficient network programming relatively simple in Ruby.

Before going any further, I should mention that the code examples have not been optimized for production use, they only illustrate what’s possible. Also, it’s worth bringing up two established Ruby libraries for tackling similar problems, Starfish and Skynet. It’s advisable that these existing options are investigated before delving into custom alternatives.

MapReduce essentially consists of two steps (although intermediate phases usually need be present for real world implementations), map and reduce. map refers to the higher order function also known as transform or collect and is the operation that is typically distributed and involves a number of nodes performing the transformation of a data set into another set of data. reduce refers to the higher order function, sometimes called fold, inject or other, which is in this case used for collecting the results of map to build a return value.

Counting the number of word occurrences in a large number of documents is one of the examples most commonly used for describing MapReduce. A number of distributed jobs is spawned, splitting document contents into words. The results of these operations are passed to a reduce process whose job is to sum its input.

Map processes can be EventMachine servers. We can have an arbitrary number of those running on a number of physical nodes.

module Map
  def receive_data(path)
    document = File.read(path)
    word_counts = document.split(' ').map { |word| [word, 1] }
    send_data(Marshal.dump(word_counts))
    close_connection_after_writing
  end
end

EM.run {EM.start_server("localhost", 5555, Map)}

A reduce process can send job requests to those servers, receive and process the results.

class Reduce < EM::Connection
  @@all = []

  def initialize(*args)
    super
    @doc, @data = args[0], ''
  end

  def post_init
    send_data(@doc)
  end

  def receive_data(data)
    @data << data
  end

  def unbind
    Reduce.job_completed
    @@all += Marshal.load(@data)
    unless Reduce.pending_jobs?
      groups = @@all.group_by {|word| word[0] }
      groups.each { |g| p "#{g[0]} : #{g[1].size}" }
      EM.stop
    end
  end

  def self.send_map_job(port, doc)
    @job_count ||= 0
    increment_job_count
    EM.connect("localhost", port, Reduce, doc)
  end

  def self.increment_job_count
    @job_count += 1
  end

  def self.pending_jobs?
    @job_count != 0
  end

  def self.job_completed
    @job_count -= 1
  end
end

EM.run do
  {
    5555 => 'docs/america.txt',
    6666 => 'docs/da-vinci.txt'
  }.each { |port, doc| Reduce.send_map_job(port, doc) }
end

The example lacks plumbing code which would make things flexible enough and, as you might have noticed, works on a single node (localhost), but hopefully illustrates a mechanism for distributing workload over a networked farm.