Also on twitter ( twitter.com/nutrun )

Archive for October, 2008

Parallelize by process

Sunday, October 26th, 2008

Performing computations in parallel is a popular technique for improving application performance and can be achieved in a number of ways, most commonly by employing threads or by splitting workload in a number of concurrent processes.

Memory usage is often a headache with large dataset computations. While memory optimization is something to be sought after, tracking down memory leaks can become tedious and time consuming. We can decrease the chances of a heavy job running a system’s memory dry by coming up with a strategy for fragmenting the job into a number of shorter running processes. By doing so, any memory used by a worker process will be released the moment the process completes. Additionally, we can run job fragments in parallel, allow ourselves to harness the operating system’s multi-core capabilities and potentially distribute worker processes over a number of physical hosts and scale out when the need arises. Smaller processes also dictate more manageable chunks of code which are easier to maintain, optimize and test.

Let’s look at an example where a job involves fetching a large number of categorized products from various sources and processes them for use by our own application.

class Job
  def perform
    ADDRESSES.each do |address|
      category = load_category(address)
      category.products.each { |product| process(product) }
    end
  end

  def process(product)
    #some intensive computation
  end

  def load_category(address)
    #load an addressable category dataset
  end
end

Let’s assume that the ADDRESSES constant in the example is a list consisting of entries such as example.com/toys, example.com/phones, example.org/guitars, etc. The job fetches the addressable by category product datasets, iterates over the products and performs a long processing operation on each. Supposing that after every possible optimization the job takes three hours to complete, we can at best run the job eight times a day. What happens if the product categories are updated more often than eight times a day and a requirement in order for our application to be successful suggests that it needs to deal with fresh data all the time?

One natural split can involve creating a worker process for each address entry. We can do so by extracting the majority of the code from the Job class into a Worker class meant to run as a standalone process.

class Worker
  def self.process_category(address)
    category = load_category(address)
    category.products.each { |product| process(product) }
  end

  def self.process(product)
    #some intensive computation
  end

  def self.load_category(address)
    #load an addressable category dataset
  end
end

Worker.process_category(ARGV[0]) if ARGV.size == 1

Each worker will operate on a significantly smaller dataset and will complete much faster than the initial long running job. Any memory used by each worker will be immediately released the moment the process finishes execution.

After the latest change, Job can take on the role of instrumenting the worker processes. We start by only allowing an arbitrary maximum number of concurrent workers, three in this case.

require "thread"

class Job
  def initialize
    @worker_count, @mutex = 3, Mutex.new
  end

  def perform
    ADRESSESES.each do |address|
      sleep 0.1 until @worker_count > 0
      @worker_count -= 1
      Thread.new do
        system("ruby worker.rb #{address}")
        @mutex.synchronize {@worker_count += 1}
      end
    end
  end
end

At this point it is a good idea to run the job and monitor the time it takes for it to complete while also measuring system resource usage. This way we can determine the optimal number of concurrent worker processes based on the system’s specs. Once available resources have been exhausted and both Job and Worker have been sufficiently optimized, we can start thinking about running workers on separate physical nodes.

Anarchic versus controlled scalability

Saturday, October 4th, 2008

With the number of websites at the time of this writing in the region of one hundred and sixty million and more than a trillion webpages, the Web is the largest network infrastructure to date. Figures like this are nothing short of enviable and so the web’s architecture has been increasingly influencing software authors’ design decisions to the extend of emergent trends that place this approach in habitats where it hasn’t traditionally been commonplace, such as that of “enterprise” middleware.

The Web’s possibly most notable triumph is offering its citizens the ability to exist and adapt in a context that is difficult to control or predict. The design has achieved its monumental scalability by following the set of constraints which compose the REST architectural style. Alongside other objectives, these constraints were put together in order for systems to effectively satisfy a need for anarchic scalability but – and this is something we must not forget – the benefits of these constraints come with associated trade-offs.

Architectural decisions should involve weighing the costs and benefits they introduce to the specific topic they attempt to address. There is no universal solution to every design problem and, while REST has proven successful in achieving anarchic scalability, not all systems exist in wild, disorderly environments. Introducing REST constraints in a system that doesn’t need to be as loosely controlled as the web can incur unnecessary overhead.

Section 5.1.3 Stateless from Roy Fielding’s seminal Architectural Styles and the Design of Network-based Software Architectures paper is a good example. Particular interest for this discussion lies in the second paragraph:

Like most architectural choices, the stateless constraint reflects a design trade-off. The disadvantage is that it may decrease network performance by increasing the repetitive data (per-interaction overhead) sent in a series of requests, since that data cannot be left on the server in a shared context.

Let’s consider an imaginary example, an auction service which publishes price updates and accepts bids on auctioned items. As a given – this is a private auction – 3000 consumers will interact with the service, each of those subscribing to price updates and placing bids whenever they see fit. These consumers must be authorized to interact with the service.

If we were to carry out the above over HTTP, a potential implementation would involve the service publishing an item’s current price as a feed, with the consumers subscribing to it and polling for updates. The service enforces a polling frequency of 10 seconds per consumer. For one item, this will result in 6 * 60 * 24 * 3000 = 25,920,000 requests/day. Consumers also need to be authorized to access the resource, so, respecting the statelessness constraint, 25,920,000 handshakes/day will take place. If we assume that an item receives 20,000 bids a day, the system becomes subject to 25,900,000 unnecessary requests and handshakes.

The 20,000 bids/day assumption suggests an average bid frequency of 86400/20000 = 4.32 seconds. The 10 second interval polling frequency is suboptimal when it comes to consumers being able to act on price updates in near real time.

We can optimize by making the consumers friendlier by respecting ETag, Last-Modified, conditional GET and partial GET instructions as proposed by the service. These manage to reduce some unnecessary network usage, but do not reduce the number of requests, nor do they decrease the number of handshakes. Caching and reverse proxies are also commonly employed for relieving server stress, although, due to the close to real time requirement of this scenario, configuring those effectively can be tricky.

In contrast, if we were to implement the example on top of an event driven, stateful transport such as XMPP, the service could publish updates on PubSub nodes, consumers would subscribe to those and receive updates as they happen. By doing so, we’re looking at 20,000 messages, equal to the number of bids and 3,000 handshakes, equal to the number of connections, equal to the number of consumers. The number of unnecessary requests/handshakes is reduced to zero.

The latter does not make a good candidate for an environment where the number of consumers interacting with the service is outside our control. With each consumer maintaining an open connection, the service never gets the opportunity to release system resources and there is a finite number of persistent connections a physical infrastructure can accommodate.

Adopting established, widely understood open standards introduces a plethora of benefits. HTTP, BitTorrent, XMPP, SMTP, FTP all have contributed to internet scale success stories and all come with associated merits and trade-offs. When faced with choice, we should examine the benefits and drawbacks of each, relative to the characteristics of the environment the system exists in. More interestingly, we should investigate combining available options so that one complements the others’ strengths while countering potential sacrifices.