Oct 26 2008

Parallelize by process

Performing computations in parallel is a popular technique for improving application performance and can be achieved in a number of ways, most commonly by employing threads or by splitting workload in a number of concurrent processes.

Memory usage is often a headache with large dataset computations. While memory optimization is something to be sought after, tracking down memory leaks can become tedious and time consuming. We can decrease the chances of a heavy job running a system's memory dry by coming up with a strategy for fragmenting the job into a number of shorter running processes. By doing so, any memory used by a worker process will be released the moment the process completes. Additionally, we can run job fragments in parallel, allow ourselves to harness the operating system's multi-core capabilities and potentially distribute worker processes over a number of physical hosts and scale out when the need arises. Smaller processes also dictate more manageable chunks of code which are easier to maintain, optimize and test.

Let's look at an example where a job involves fetching a large number of categorized products from various sources and processes them for use by our own application.

class Job
  def perform
    ADDRESSES.each do |address|
      category = load_category(address)
      category.products.each { |product| process(product) }
    end
  end
  
  def process(product)
    #some intensive computation
  end
  
  def load_category(address)
    #load an addressable category dataset
  end
end

Let's assume that the ADDRESSES constant in the example is a list consisting of entries such as example.com/toys, example.com/phones, example.org/guitars, etc. The job fetches the addressable by category product datasets, iterates over the products and performs a long processing operation on each. Supposing that after every possible optimization the job takes three hours to complete, we can at best run the job eight times a day. What happens if the product categories are updated more often than eight times a day and a requirement in order for our application to be successful suggests that it needs to deal with fresh data all the time?

One natural split can involve creating a worker process for each address entry. We can do so by extracting the majority of the code from the Job class into a Worker class meant to run as a standalone process.

class Worker
  def self.process_category(address)
    category = load_category(address)
    category.products.each { |product| process(product) }
  end
  
  def self.process(product)
    #some intensive computation
  end
  
  def self.load_category(address)
    #load an addressable category dataset
  end
end

Worker.process_category(ARGV[0]) if ARGV.size == 1

Each worker will operate on a significantly smaller dataset and will complete much faster than the initial long running job. Any memory used by each worker will be immediately released the moment the process finishes execution.

After the latest change, Job can take on the role of instrumenting the worker processes. We start by only allowing an arbitrary maximum number of concurrent workers, three in this case.

require "thread"

class Job
  def initialize
    @worker_count, @mutex = 3, Mutex.new
  end

  def perform
    ADRESSESES.each do |address|
      sleep 0.1 until @worker_count > 0
      @worker_count -= 1
      Thread.new do
        system("ruby worker.rb #{address}")
        @mutex.synchronize {@worker_count += 1}
      end
    end
  end
end

At this point it is a good idea to run the job and monitor the time it takes for it to complete while also measuring system resource usage. This way we can determine the optimal number of concurrent worker processes based on the system's specs. Once available resources have been exhausted and both Job and Worker have been sufficiently optimized, we can start thinking about running workers on separate physical nodes.