Parallelize by process
Performing computations in parallel is a popular technique for improving application performance and can be achieved in a number of ways, most commonly by employing threads or by splitting workload in a number of concurrent processes.
Memory usage is often a headache with large dataset computations. While memory optimization is something to be sought after, tracking down memory leaks can become tedious and time consuming. We can decrease the chances of a heavy job running a system's memory dry by coming up with a strategy for fragmenting the job into a number of shorter running processes. By doing so, any memory used by a worker process will be released the moment the process completes. Additionally, we can run job fragments in parallel, allow ourselves to harness the operating system's multi-core capabilities and potentially distribute worker processes over a number of physical hosts and scale out when the need arises. Smaller processes also dictate more manageable chunks of code which are easier to maintain, optimize and test.
Let's look at an example where a job involves fetching a large number of categorized products from various sources and processes them for use by our own application.
class Job def perform ADDRESSES.each do |address| category = load_category(address) category.products.each { |product| process(product) } end end def process(product) #some intensive computation end def load_category(address) #load an addressable category dataset end end
Let's assume that the
ADDRESSES
constant in the example is a list consisting of entries such as
example.com/toys
,
example.com/phones
,
example.org/guitars
,
etc. The job fetches the addressable by category product datasets, iterates over the products and performs a long processing operation on each. Supposing that after every possible optimization the job takes three hours to complete, we can at best run the job eight times a day. What happens if the product categories are updated more often than eight times a day and a requirement in order for our application to be successful suggests that it needs to deal with fresh data all the time?
One natural split can involve creating a worker process for each address entry. We can do so by extracting the majority of the code from the
Job
class into a
Worker
class meant to run as a standalone process.
class Worker def self.process_category(address) category = load_category(address) category.products.each { |product| process(product) } end def self.process(product) #some intensive computation end def self.load_category(address) #load an addressable category dataset end end Worker.process_category(ARGV[0]) if ARGV.size == 1
Each worker will operate on a significantly smaller dataset and will complete much faster than the initial long running job. Any memory used by each worker will be immediately released the moment the process finishes execution.
After the latest change,
Job
can take on the role of instrumenting the worker processes. We start by only allowing an arbitrary maximum number of concurrent workers, three in this case.
require "thread" class Job def initialize @worker_count, @mutex = 3, Mutex.new end def perform ADRESSESES.each do |address| sleep 0.1 until @worker_count > 0 @worker_count -= 1 Thread.new do system("ruby worker.rb #{address}") @mutex.synchronize {@worker_count += 1} end end end end
At this point it is a good idea to run the job and monitor the time it takes for it to complete while also measuring system resource usage. This way we can determine the optimal number of concurrent worker processes based on the system's specs. Once available resources have been exhausted and both
Job
and
Worker
have been sufficiently optimized, we can start thinking about running workers on separate physical nodes.