tool monkey

adventures of an unfrozen caveman programmer

Monadic ‘break’ and ‘continue’ for Scala Sequence Comprehensions

| Feedback

Author’s note 0: I have come up with better, more correct designs for monadic objects that implement break and continue in Scala for-comprehensions. I’m leaving this blog post up for posterity, but I recommend using the ‘breakable’ project if you are interested in break and continue in a Scala framework.

Author’s note: I’ve since received some excellent feedback from the Scala community, which I included in some end notes.

Author’s note the 2nd: I later realized I could apply an implicit conversion and mediator class to preserve the traditional ordering: the code has been updated with that approach.

Author’s note the 3rd: This concept has been submitted to the Scala project as JIRA SI-9120 (PR #4275)

Scala sequence comprehensions are an excellent functional programming idiom for looping in Scala. However, sequence comprehensions encompass much more than just looping – they represent a powerful syntax for manipulating all monadic structures[1].

The break and continue looping constructs are a popular framework for cleanly representing multiple loop halting and continuation conditions at differing stages in the execution flow. Although there is no native support for break or continue in Scala control constructs, it is possible to implement them in a clean and idiomatic way for sequence comprehensions.

In this post I will describe a lightweight and easy-to-use implementation of break and continue for use in Scala sequence comprehensions (aka for statements). The entire implementation is as follows:

object BreakableGenerators {
  import scala.language.implicitConversions

  type Generator[+A] = Iterator[A]
  type BreakableGenerator[+A] = BreakableIterator[A]

  // Generates a new breakable generator from any traversable object.
  def breakable[A](t1: TraversableOnce[A]): Generator[BreakableGenerator[A]] =
    List(new BreakableIterator(t1.toIterator)).iterator

  // Mediates boolean expression with 'break' and 'continue' invocations
  case class BreakableGuardCondition(cond: Boolean) {
    // Break the looping over one or more breakable generators, if 'cond' 
    // evaluates to true.
    def break(b: BreakableGenerator[_], bRest: BreakableGenerator[_]*): Boolean = {
      if (cond) {
        b.break
        for (x <- bRest) { x.break }
      }
      !cond
    }

    // Continue to next iteration of enclosing generator if 'cond' 
    // evaluates to true.
    def continue: Boolean = !cond
  }

  // implicit conversion of boolean values to breakable guard condition mediary
  implicit def toBreakableGuardCondition(cond: Boolean) =
    BreakableGuardCondition(cond)

  // An iterator that can be halted via its 'break' method.  Not invoked directly
  class BreakableIterator[+A](itr: Iterator[A]) extends Iterator[A] {
    private var broken = false
    private[BreakableGenerators] def break { broken = true }

    def hasNext = !broken && itr.hasNext
    def next = itr.next
  }
}

The approach is based on a simple subclass of IteratorBreakableIterator – that can be halted by ‘breaking’ it. The function breakable(<traversable-object>) returns an Iterator over a single BreakableIterator object. Iterators are monad-like structures in that they implement map and flatMap, and so its output can be used with <- at the start of a for construct in the usual way. Note that this means the result of the for statement will also be an Iterator.

Whenever the boolean expression for an if guard is followed by either break or continue, it is implicitly converted to a “breakable guard condition” that supports those methods. The function break accepts one or more instances of BreakableIterator. If it evaluates to true, the loops embodied by the given iterators are immediately halted via the associated if guard, and the iterators are halted via their break method. The continue function is mostly syntactic sugar for a standard if guard, simply with the condition inverted.

Here is a simple example of break and continue in use:

object Main {
  import BreakableGenerators._

  def main(args: Array[String]) {

    val r = for (
      // generate a breakable sequence from some sequential input
      loop <- breakable(1 to 1000);
      // iterate over the breakable sequence
      j <- loop;
      // print out at each iteration
      _ = { println(s"iteration j= $j") };
      // continue to next iteration when 'j' is even
      if { j % 2 == 0 } continue;
      // break out of the loop when 'j' exceeds 5
      if { j > 5 } break(loop)
    ) yield {
      j
    }
    println(s"result= ${r.toList}")
  }
}

We can see from the resulting output that break and continue function in the usual way. The continue clause ignores all subsequent code when j is even. The break clause halts the loop when it sees its first value > 5, which is 7. Only odd values <= 5 are output from the yield statement:

$ scalac -d /home/eje/class monadic_break.scala
$ scala -classpath /home/eje/class Main
iteration j= 1
iteration j= 2
iteration j= 3
iteration j= 4
iteration j= 5
iteration j= 6
iteration j= 7
result= List(1, 3, 5)

Breakable iterators can be nested in the way one would expect. The following example shows an inner breakable loop nested inside an outer one:

object Main {
  import BreakableGenerators._

  def main(args: Array[String]) {
    val r = for (
      outer <- breakable(1 to 7);
      j <- outer;
      _ = { println(s"outer  j= $j") };
      if { j % 2 == 0 } continue;
      inner <- breakable(List("a", "b", "c", "d", "e"));
      k <- inner;
      _ = { println(s"    inner  j= $j  k= $k") };
      if { k == "d" } break(inner);
      if { j == 5  &&  k == "c" } break(inner, outer)
    ) yield {
      (j, k)
    }
    println(s"result= ${r.toList}")
  }
}

The output demonstrates that the inner loop breaks whenever k=="d", and so "e" is never present in the yield result. When j==5 and k=="c", both the inner and outer loops are broken, and so we see that there is no (5,"c") pair in the result, nor does the outer loop ever iterate over 6 or 7:

$ scalac -d /home/eje/class monadic_break.scala
$ scala -classpath /home/eje/class Main
outer  j= 1
    inner  j= 1  k= a
    inner  j= 1  k= b
    inner  j= 1  k= c
    inner  j= 1  k= d
outer  j= 2
outer  j= 3
    inner  j= 3  k= a
    inner  j= 3  k= b
    inner  j= 3  k= c
    inner  j= 3  k= d
outer  j= 4
outer  j= 5
    inner  j= 5  k= a
    inner  j= 5  k= b
    inner  j= 5  k= c
result= List((1,a), (1,b), (1,c), (3,a), (3,b), (3,c), (5,a), (5,b))

Using break and continue with BreakableIterator for sequence comprehensions is that easy. Enjoy!

Notes

The helpful community on freenode #scala made some excellent observations:

1: Iterators in Scala are not strictly monadic – it would be more accurate to say they’re “things with a flatMap and map method, also they can use filter or withFilter sometimes.” However, I personally still prefer to think of them as “monadic in spirit if not law.”

2: The break function, as described in this post, is not truly functional in the sense of referential transparency, as the invocation if break(loop) { condition } involves a side-effect on the variable loop. I would say that it does maintain “scoped functionality.” That is, the break in non-referential transparency is scoped by the variables in question. The for statement containing them is referentially transparent with respect to its inputs (provided no other code is breaking referential transparency, of course).

References

[1] Functional Programming in Scala, Paul Chiusano and Runar Bjarnason, (section 6.6)

Faster Random Samples With Gap Sampling

| Feedback

Update (April 4, 2016): my colleague RJ Nowling ran across a paper by J.S. Vitter that shows Vitter developed the trick of accelerating sampling with a sampling-gap distribution in 1987 – I re-invented Vitter’s wheel 30 years after the fact! I’m surprised it never caught on, as it is not much harder to implement than the naive version.

Generating a random sample of a collection is, logically, a O(np) operation, where (n) is the sample size and (p) is the sampling probability. For example, extracting a random sample, without replacement, from an array might look like this in pseudocode:

sample(data: array, p: real) {
    n = length(data)
    m = floor(p * n)
    for j = 0 to m-1 {
        k = random(j, n-1)
        swap(data[j], data[k])
    }
    emit the first m elements of 'data' to output
}

We can see that this sampling algorithm is indeed O(np). However, it makes some nontrivial assumptions about its input data:

  • It is random access
  • It is writable
  • Its size is known
  • It can be destructively modified

These assumptions can be violated in several ways. The input data might not support random access, for example it might be a list, or stream, or an iterator over the same. We might not know its size a priori. It might be read-only. It might be up-cast to some superclass where knowledge about these assumed properties is no longer available.

In cases such as this, there is another common sampling algorithm:

sample(data: sequence, p: real) {
    while not end(data) {
        v = next(data)
        if random(0.0, 1.0) < p then emit v to output
    }
}

The above algorithm enjoys all the advantage in flexibility. It requires only linear access, does not require writable input, and makes no assumptions about input size. However it comes at a price: this algorithm is no longer O(np), it is O(n). Each element must be traversed directly, and worse yet the random number generagor (RNG) must be invoked on each element. O(n) invocation of the RNG is a substantial cost – random number generation is typically very expensive compared to the cost of iterating to the next element in a sequence.

But… does linear sampling truly require us to invoke our RNG on every element? Consider the pattern of data access, divorced from code. It looks like a sequence of choices: for each element we either (skip) or (sample):

(skip) (skip) (sample) (skip) (sample) (skip) (sample) (sample) (skip) (skip) (sample) ...

The number of consecutive (skip) events between each (sample) – the sampling gap – can itself be modeled as a random variable. Each (skip)/(sample) choice is an independent Bernoulli trial, where the probability of (skip) is (1-p). The PMF of the sampling gap for gap of {0, 1, 2, …} is therefore a geometric distribution: P(k) = p(1-p)k

This suggests an alternative algorithm for sampling, where we only need to randomly choose sample gaps instead of randomly choosing whether we sample each individual element:

// choose a random sampling gap 'k' from P(k) = p(1-p)^k
// caution: this explodes for p = 0 or p = 1
random_gap(p: real) {
    u = max(random(0.0, 1.0), epsilon)
    return floor(log(u) / log(1-p))
}

sample(data: sequence, p: real) {
    advance(data, random_gap(p))
    while not end(data) {
        emit next(data) to output
        advance(data, random_gap(p))
    }
}

The above algorithm calls the RNG only once per actual collected sample, and so the cost of RNG calls is O(np). Note that the algorithm is still O(n), but the cost of the RNG tends to dominate the cost of sequence traversal, and so the resulting efficiency improvement is substantial. I measured the following performance improvements with gap sampling, compared to traditional linear sequence sampling, on a Scala prototype testing rig:

Type p linear gap
Array 0.001 2833 29
Array 0.01 2825 76
Array 0.1 2985 787
Array 0.5 3526 3478
Array 0.9 3023 6081
List 0.001 2213 230
List 0.01 2220 265
List 0.1 2337 796
List 0.5 2794 3151
List 0.9 2513 4849

In the results above, we see that the gap sampling times are essentially linear in (p), as expected. In the case of the linear-access List type, there is a higher baseline time (230 vs 29) due to the constant cost of actual data traversal. Efficiency improvements are substantial at small sampling probabilities.

We can also see that the cost of gap sampling begins to meet and then exceed the cost of traditinal linear sampling, in the vicinnity (p) = 0.5. This is due to the fact that the gap sampling logic is about twice the cost (in my test environment) of simply calling the RNG once. For example, the gap sampling invokes a call to the numeric logarithm code that isn’t required in traditional sampling. And so at (p) = 0.5 the time spent doing the gap sampling approximates the time spent invoking the RNG once per sample, and at higher values of (p) the cost is greater.

This suggests that one should in fact fall back to traditional linear sampling when the sampling probability (p) >= some threshold. That threshold appears to be about 0.5 or 0.6 in my testing rig, but is likely to depend on underlying numeric libraries, the particular RNG being used, etc, and so I would expect it to benefit from customized tuning on a per-environment basis. With this in mind, a sample algorithm as deployed would look like this:

// threshold is a tuning parameter
threshold = 0.5

sample(data: sequence, p: real) {
    if (p < threshold) {
        gap_sample(data, p)
    } else {
        traditional_linear_sample(data, p)
    }
}

The gap-sampling algorithm described above is for sampling without replacement. However, the same approach can be modified to generate sampling with replacement.

When sampling with replacement, it is useful to consider the replication factor of each element (where a replication factor of zero means the element wasn’t sampled). Pretend for the moment that the actual data size (n) is known. The sample size (m) = (n)(p). The probability that each element gets sampled, per trial, is 1/n, with (m) independent trials, and so the replication factor (r) for each element obeys a binomial distribution: Binomial(m, 1/n). If we substitute (n)(p) for (m), we have Binomial(np, 1/n). As the (n) grows, the Binomial is well approximated by a Poisson distribution Poisson(L), where (L) = (np)(1/n) = (p). And so for our purposes we may sample from Poisson(p), where P(r) = (pr / r!)e(-p), for our sampling replication factors. Note that we have now discarded any dependence on sample size (n), as we desire.

In our gap-sampling context, the sampling gaps are now elements whose replication factor is zero, which occurs with probability P(0) = e(-p). And so our sampling gaps are now drawn from geometric distribution P(k) = (1-q)(q)k, where q = e(-p). When we do sample an element, its replication factor is drawn from Poisson(p), however conditioned such that the value is >= 1. It is straightforward to adapt a standard Poisson generator, as shown below.

Given the above, gap sampling with replacement in pseudocode looks like:

// sample 'k' from Poisson(p), conditioned to k >= 1
poisson_ge1(p: real) {
    q = e^(-p)
    // simulate a poisson trial such that k >= 1
    t = q + (1-q)*random(0.0, 1.0)
    k = 1

    // continue standard poisson generation trials
    t = t * random(0.0, 1.0)
    while (t > q) {
        k = k + 1
        t = t * random(0.0, 1.0)
    }
    return k
}

// choose a random sampling gap 'k' from P(k) = p(1-p)^k
// caution: this explodes for p = 0 or p = 1
random_gap(p: real) {
    u = max(random(0.0, 1.0), epsilon)
    return floor(log(u) / -p)
}

sample(data: sequence, p: real) {
    advance(data, random_gap(p))
    while not end(data) {
        rf = poisson_ge1(p)
        v = next(data)
        emit (rf) copies of (v) to output
        advance(data, random_gap(p))
    }
}

The efficiency improvements I have measured for gap sampling with replacement are shown here:

Type p linear gap
Array 0.001 2604 45
Array 0.01 3442 117
Array 0.1 3653 1044
Array 0.5 5643 5073
Array 0.9 7668 8388
List 0.001 2431 233
List 0.01 2450 299
List 0.1 2984 1330
List 0.5 5331 4752
List 0.9 6744 7811

As with the results for sampling without replacement, we see that gap sampling cost is linear with (p), which yields large cost savings at small sampling, but begins to exceed traditional linear sampling at higher sampling probabilities.

The Scala Iterator ‘drop’ Method Generates a Matryoshka Class Nesting

| Feedback

The Scala Iterator drop method has a complexity bug that shows up when one calls drop repeatedly, for example when traversing over an iterator in a loop.

The nature of the problem is that drop, under the hood, invokes slice, which returns a new anonymous subclass of AbstractIterator containing an instance of the input class, which can be seen in this code excerpt from Iterator.scala:

def drop(n: Int): Iterator[A] = slice(n, Int.MaxValue)

// ... comments excised ...

def slice(from: Int, until: Int): Iterator[A] = {
  val lo = from max 0
  var toDrop = lo
  while (toDrop > 0 && self.hasNext) {
    self.next()
    toDrop -= 1
  }

  // I am a ticking quadratic time bomb:
  new AbstractIterator[A] {
    private var remaining = until - lo
    def hasNext = remaining > 0 && self.hasNext
    def next(): A =
      if (remaining > 0) {
        remaining -= 1
        self.next()
      }
      else empty.next()
  }
}

In the case where one is only calling drop once, this is not very consequential, but when the same method is used in a loop, the nesting is repeated, generating a nesting of anonymous classes that is ever-deeper – rather like Matryoshka dolls:

This can be a substantial problem, as it generates quadratic complexity in what is logically a linear operation. A simple example of looping code that can cause this nesting:

def process_nth_elements[T](itr: Iterator[T], n: Int = 1) {
  var iter = itr
  while (iter.hasNext) {
    val nxt = iter.next
    // ... process next element ...

    // skip to next element
    iter = iter.drop(n-1)
    // this becomes more and more expensive as iterator classes
    // become nested deeper
  }
}

A simple example program, which can be found here, demonstrates this nesting directly:

import java.io.{StringWriter, PrintWriter}
import scala.reflect.ClassTag

def tracehead(e: Exception, substr: String = "slice"): String = {
  val sw = new StringWriter()
  e.printStackTrace(new PrintWriter(sw))
  sw.toString.split('\n').takeWhile((s:String)=> !s.contains(substr)).drop(1).mkString("\n")  
}

class TestIterator[T: ClassTag](val iter: Iterator[T]) extends Iterator[T] {
  override def hasNext = iter.hasNext
  override def next = {
    println(tracehead(new Exception))
    iter.next
  }
}

def drop_test[T](itr: Iterator[T]) {
  var n = 0
  var iter = itr
  while (iter.hasNext) {
    n += 1
    println(s"\ndrop # $n")
    iter = iter.drop(1)
  }
}

When the drop_test function is run on an instance of TestIterator, the stack trace output shows the Matryoshka nesting directly:

scala> drop_test(new TestIterator(List(1,2,3,4,5).iterator))

drop # 1
    at $line18.$read$$iw$$iw$$iw$$iw$TestIterator.next(<console>:19)

drop # 2
    at $line18.$read$$iw$$iw$$iw$$iw$TestIterator.next(<console>:19)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

drop # 3
    at $line18.$read$$iw$$iw$$iw$$iw$TestIterator.next(<console>:19)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

drop # 4
    at $line18.$read$$iw$$iw$$iw$$iw$TestIterator.next(<console>:19)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

drop # 5
    at $line18.$read$$iw$$iw$$iw$$iw$TestIterator.next(<console>:19)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

One would expect this quadratic behavior to show up in benchmarking, and it does. Consider this simple timing test:

def drop_time[T](itr: Iterator[T]) {
  val t0 = System.currentTimeMillis()
  var iter = itr
  while (iter.hasNext) {
    iter = iter.drop(1)
  }
  println(s"Time: ${System.currentTimeMillis() - t0}")
}

One would expect this function to be linear in the length of the iterator, but we see the following behavior:

scala> drop_time((1 to 5000 * 1).toList.iterator)
Time: 106

scala> drop_time((1 to 5000 * 2).toList.iterator)
Time: 475

scala> drop_time((1 to 5000 * 3).toList.iterator)
Time: 1108

scala> drop_time((1 to 5000 * 4).toList.iterator)
Time: 2037

scala> drop_time((1 to 5000 * 5).toList.iterator)
Time: 3234

scala> drop_time((1 to 5000 * 6).toList.iterator)
Time: 4717

scala> drop_time((1 to 5000 * 7).toList.iterator)
Time: 6447

scala> drop_time((1 to 5000 * 8).toList.iterator)
java.lang.StackOverflowError
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

The corresponding plot shows the quadratic cost:

"image"

Given the official semantics of drop, which state that the method invalidates the iterator it was called on, this nesting problem should be avoidable by implementing the method more like this:

def drop(n: Int): Iterator[A] = {
  var j = 0
  while (j < n) {
    this.next
    j += 1
  }
  this
}

Implementing Parallel Prefix Scan as a Spark RDD Transform

| Feedback

In my previous post, I described how to implement the Scala scanLeft function as an RDD transform. By definition scanLeft invokes a sequential-only prefix scan algorithm; it does not assume that either its input function f or its initial-value z can be applied in a parallel fashion. Its companion function scan, however, computes a parallel prefix scan. In this post I will describe an implementation of parallel prefix scan as an RDD transform.

As was the case with scanLeft, a basic strategy is to begin by applying scan to each RDD partition. Provided that appropriate “offsets” {z1, z2, ...} can be computed for each partition, these can be applied to the partial, per-partition results to yield the output. In fact, the desired {z1, z2, ...} are the parallel prefix scan of the last element in each per-partition scan. The following diagram illustrates:

image

The diagram above glosses over the details of computing scan to obtain {z1, z2, ...}. I will first describe the implementation I currently use, and then also discuss a possible alternative. The current implementation takes the approach of encoding the logic of a parallel prefix scan directly into an RDD computation DAG. Each iteration, or “ply,” of the parallel algorithm is represented by an RDD. Each element resides in its own partition, and so the computation dependency for each element is directly representable in the RDD dependency substructure. This construction is illustrated in the following schematic (for a vector of 8 z-values):

image

The parallel prefix scan algorithm executes O(log(n)) plies, which materializes as O(log(n)) RDDs shown in the diagram above. In this context, (n) is the number of input RDD partitions, not to be confused with the number of data rows in the RDD. There are O((n)log(n)) partitions, each having a single row containing the z-value for a corresponding output partition. Some z-values are determined earlier than others. For example z1 is immediately available in ply(0), and ply(3) can refer directly back to that ply(0) partition in the interest of efficiency, as called out by the red DAG arcs.

This scheme allows each final output partition to obtain its z-value directly from a single dedicated partition, which ensures that minimal data needs to be transferred across worker processes. Final output partitions can be computed local to their corresponding input partitions. Data transfer may be limited to the intermediate z-values, which are small single-row affairs by construction.

The code implementing the logic above can be viewed here.

I will conclude by noting that there is an alternative to this highly distributed computation of {z1, z2, ...}, which is to collect the last-values in the per-partition intermediate scan ouputs into a single array, and run scan directly on that array. This has the advantage of avoiding the construction of log(n) intermediate RDDs. It does, however, require a monolithic ‘fan-in’ of data into a single RDD to receive the collection of values. That is followed by a fan-out of the array, where each output partition picks its single z-value from the array. It is for this reason I suspect this alternative incurs substantially more transfer overhead across worker processes. However, one might also partition the resulting z-values in some optimal way, so that each final output partition needs to request only the partition that contains its z-value. Future experimentation might show that this can out-perform the current fully-distributed implementation.

Implementing an RDD scanLeft Transform With Cascade RDDs

| Feedback

In Scala, sequence (and iterator) data types support the scanLeft method for computing a sequential prefix scan on sequence elements:

// Use scanLeft to compute the cumulative sum of some integers
scala> List(1, 2, 3).scanLeft(0)(_ + _)
res0: List[Int] = List(0, 1, 3, 6)

Spark RDDs are logically a sequence of row objects, and so scanLeft is in principle well defined on RDDs. In this post I will describe how to cleanly implement a scanLeft RDD transform by applying an RDD variation called Cascade RDDs.

A Cascade RDD is an RDD having one partition which is a function of an input RDD partition and an optional predecessor Cascade RDD partition. You can see that this definition is somewhat recursive, where the basis case is a Cascade RDD having no precedessor. The following diagram illustrates both cases of Cascade RDD:

image

As implied by the above diagram, a series of Cascade RDDs falling out of an input RDD will have as many Cascade RDDs as there are input partitions. This situation begs for an abstraction to re-assemble the cascade back into a single output RDD, and so the method cascadePartitions is defined, as illustrated:

image

The cascadePartitions method takes a function argument f, with the signature:

f(input: Iterator[T], cascade: Option[Iterator[U]]): Iterator[U]

in a manner somewhat analogous to mapPartitions. The function f must address the fact that cascade is optional and will be None in case where there is no predecessor Cascade RDD. The interested reader can examine the details of how the CascadeRDD class and its companion method cascadePartitions are implemented here.

With Cascade RDDs it is now straightforward to define a scanLeft transform for RDDs. We wish to run scanLeft on each input partition, with the condition that we want to start where the previous input partition left off. The Scala scanLeft function makes this easy, as the starting point is its first parameter (z): scanLeft(z)(f). The following figure illustrates what this looks like:

image

As the above schematic demonstrates, almost all the work is accomplished with a single call to cascadePartitions, using a thin wrapper around f which determines where to start the next invocation of Scala scanLeft – either the input parameter z, or the last output element of the previous cascade. One final transform must be applied to remove the initial element that Scala scanLeft inserts into its output, excepting the first output partition, where it is kept to be consistent with the scanLeft definition.

All computation is accomplished in the standard RDD formalism, and so scanLeft is a proper lazy RDD transform.

The actual implementation is as compact as the above description implies, and you can see the code here.

Deferring Spark Actions to Lazy Transforms With the Promise RDD

| Feedback

In a previous post I described a method for implementing the Scala drop transform for Spark RDDs. That implementation came at a cost of subverting the RDD lazy transform model; it forced the computation of one or more input RDD partitions at call time instead of deferring partition computation, and so behaved more like a Spark action than a transform.

In this followup post I will describe how to implement drop as a true lazy RDD transform, using a new RDD subclass: the Promise RDD. A Promise RDD can be used to embed computations in the lazy transform formalism that otherwise would require non-lazy actions.

The Promise RDD (aka PromiseRDD subclass) is designed to encapsulate a single expression value in an RDD having exactly one row, to be evaluated only if and when its single partition is computed. It behaves somewhat analogously to a Scala promise structure, as it abstracts the expression such that any requests for its value (and hence its actual computation) may be deferred.

The definition of PromiseRDD is compact:

class PromisePartition extends Partition {
  // A PromiseRDD has exactly one partition, by construction:
  override def index = 0
}

/**
 * A way to represent the concept of a promised expression as an RDD, so that it
 * can operate naturally inside the lazy-transform formalism
 */
class PromiseRDD[V: ClassTag](expr: => (TaskContext => V),
                              context: SparkContext, deps: Seq[Dependency[_]])
  extends RDD[V](context, deps) {

  // This RDD has exactly one partition by definition, since it will contain
  // a single row holding the 'promised' result of evaluating 'expr' 
  override def getPartitions = Array(new PromisePartition)

  // compute evaluates 'expr', yielding an iterator over a sequence of length 1:
  override def compute(p: Partition, ctx: TaskContext) = List(expr(ctx)).iterator
}

A PromiseRDD is constructed with the expression of choice, embodied as a function from a TaskContext to the implied expression type. Note that only the task context is a parameter; Any other inputs needed to evaluate the expression must be present in the closure of expr. This allows the expression to be of very general form: its value may depend on a single input RDD, or multiple RDDs, or no RDDs at all. It receives an arbitrary sequence of partition dependencies which is the responsibility of the calling code to assemble. Again, this allows substantial generality in the form of the expression: the PromiseRDD dependencies can correspond to any arbitrary input dependencies assumed by the expression. The dependencies can be tuned to exactly what input partitions are required.

As a motivating example, consider how a PromiseRDD can be used to promote drop to a true lazy transform. The aspect of computing drop that threatens laziness is the necessity of determining the location of the boundary partition (see previous discussion). However, this portion of the computation can in fact be encapsulated in a PromiseRDD. The details of constructing such a PromiseRDD can be viewed here. The following illustration summarizes the topology of the dependency DAG that is constructed:

image

As the dependency diagram shows, the PromiseRDD responsible for locating the boundary partition depends on each partition of the original input RDD. The actual computation is likely to only request the first input partition, but all partitions might be required to handle all possible arguments to drop. In turn, the location information given by the PromiseRDD is depended upon by each output partition. Input partitions are either passed to the output, or used to compute the boundary, and so none of the partition computation is wasted.

Observe that the scheduler remains in charge of when partitions are computed. An advantage to using a PromiseRDD is that it works within Spark’s computational model, instead of forcing it.

The following brief example demonstrates that drop implemented using a PromiseRDD satisfies the lazy transform model:

// create data rdd with values 0 thru 9
scala> val data = sc.parallelize(0 until 10)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12

// drop the first 3 rows
// note that no action is performed -- this transform is lazy
scala> val rdd = data.drop(3)
rdd: org.apache.spark.rdd.RDD[Int] = $anon$1[2] at drop at <console>:14

// collect the values.  This action kicks off job scheduling and execution
scala> rdd.collect
14/07/28 12:16:13 INFO SparkContext: Starting job: collect at <console>:17
... job scheduling and execution output ...

res0: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9)

scala>

In this post, I have described the Promise RDD, an RDD subclass that can be used to encapsulate computations in the lazy transform formalism that would otherwise require non-lazy actions. As an example, I have outlined a lazy transform implementation of drop that uses PromiseRDD.

Some Implications of Supporting the Scala drop Method for Spark RDDs

| Feedback

In Scala, sequence data types support the drop method for skipping (aka “dropping”) the first elements of the sequence:

// drop the first element of a list
scala> List(1, 2, 3).drop(1)
res1: List[Int] = List(2, 3)

Spark RDDs also support various standard sequence methods, for example filter, as they are logically a sequence of row objects. One might suppose that drop could be a useful sequence method for RDDs, as it would support useful idioms like:

// Use drop (hypothetically) to skip the header of a text file:
val data = sparkContext.textFile("data.txt").drop(1)

Implementing drop for RDDs is possible, and in fact can be done with a small amount of code, however it comes at the price of an impact to the RDD lazy computing model.

To see why, recall that RDDs are composed of partitions, and so in order to drop the first (n) rows of an RDD, one must first identify the partition that contains the (n-1),(n) row boundary. In the resulting RDD, this partition will be the first one to contain any data. Identifying this “boundary” partition cannot have a closed-form solution, because partition sizes are not in general equal; the partition interface does not even support the concept of a count method. In order to obtain the size of a partition, one is forced to actually compute its contents. The diagram below illustrates one example of why this is so – the contents of the partitions in the filtered RDD on the right cannot be known without actually running the filter on the parent RDD:

image

Given all this, the structure of a drop implementation is to compute the first partition, find its length, and see if it contains the requested (n-1),(n) boundary. If not, compute the next partition, and so on, until the boundary partition is identified. All prior partitions are ignored in the result. All subsequent partitions are passed on with no change. The boundary partition is passed through its own drop to eliminate rows up to (n).

The code implementing the concept described above can be viewed here: https://github.com/erikerlandson/spark/compare/erikerlandson:rdd_drop_blogpost_base…rdd_drop_blogpost

The following diagram illustrates the relation between input and output partitions in a call to drop:

image

Arguably, this represents a potential subversion of the RDD lazy compute model, as it forces the computation of at least one (and possibly more) partitions. It behaves like a “partial action”, instead of a transform, but an action that returns another RDD.

In many cases, the impact of this might be relatively small. For example, dropping the first few rows in a text file is likely to only force computation of a single partition, and it is a partition that will eventually be computed anyway. Furthermore, such a use case is generally not inside a tight loop.

However, it is not hard to construct cases where computing even the first partition of one RDD recursively forces the computation of all the partitions in its parents, as in this example:

image

Whether the benefits of supporting drop for RDDs outweigh the costs is an open question. It is likely to depend on whether or not the Spark community yields any compelling use cases for drop, and whether a transform that behaves like a “partial action” is considered an acceptable addition to the RDD formalism.

RDD support for drop has been proposed as issue SPARK-2315, with corresponding pull request 1254.

A Bi-directional Variation of the O(NP) Edit Distance Algorithm

| Feedback

The O(ND) edit distance algorithm [1] is a standard for efficient computation of the edit distance between two sequences, appearing in applications such as the GNU diff tool. There is also a variation [2] that operates in O(NP) time, where P is the number of deletions in the shortest edit path, and which has lower computational complexity, since P <= D (and may be << D in some circumstances). In order to apply these algorithms to obtain an edit script in linear space, they must be adapted into a bidirectional form that enables recursive divide-and-conquer. The basic principles of a bidirectional adaptation of the O(ND) algorithm are described in [1]. However, no such discussion of a bidirectional O(NP) algorithm is provided in [2]. Understanding this adaptation involves some observations that aren’t immediately obvious. In this post, I will describe these key observations.

Notation

My code segments are written as C/C++, however written in a simplified style I hope will be clear regardless of what languages the reader is familiar with. If you wish to port this (pseudo-ish)code, it may be worth keeping in mind that indexing is zero-based in C/C++.

Sequence Lengths

A brief note on the O(NP) algorithm and sequence lengths: the algorithm assumes that the length of its second sequence argument is >= its first (that is, N >= M). In the following discussions, I will be making the same assumption, however the modification to address N < M is relatively easy, and can be seen in the references to actual source code below.

Indexing

A note on naming: In [2], the authors use ‘fp’ for the name of the array holding path endpoints. I will use ‘Vf’ for the array holding forward endpoints, and ‘Vr’ for the corresponding array holding reverse endpoints.

The O(ND) and O(NP) algorithms operate by iteratively extending the frontier of edit paths through the implicit graph of possible paths, where each iteration is computed as a function of the previous. In the O(NP) algorithm, this computation has to proceed from the outside in, as described in the paper:

for (k = -P;  k < delta;  k += 1) {
    y = max(Vf[k-1] + 1, Vf[k+1]);
    Vf[k] = snake(y-k, y);
}
for (k = P + delta;  k >= delta;  k -= 1) {
    y = max(Vf[k-1] + 1, Vf[k+1]);
    Vf[k] = snake(y-k, y);
}

In order to implement a bi-directional algorithm, we must also run the algorithm in reverse, beginning at the “lower right corner” of the graph (M,N) and working backward to the origin (0,0). The indexing is the mirror image of the above:

for (k = P+delta;  k > 0;  k -= 1) {
    y = min(Vr[k-1], Vr[k+1] - 1);
    Vr[k] = rsnake(y-k, y);
}
for (k = -P;  k <= 0;  k += 1) {
    y = min(Vr[k-1], Vr[k+1] - 1);
    Vr[k] = rsnake(y-k, y);
}

In the above, ‘rsnake’ is the reverse-direction snake function. A note on initialization: whereas the forward algorithm initializes its Vf array to (-1), the symmetric initial value for the reverse algorithm Vr array is (N+1) (In the general case, 1 plus the length of the longest sequence).

Detecting Path Overlap

The uni-directional O(NP) algorithm halts when Vf[delta] == N. However, the bi-directional algorithms halt when shortest opposing paths meet – or overlap – each other, as described in the O(ND) paper [1]. The semantics of storing paths in working arrays is the same in both algorithms, with the exception that in the O(NP) algorithm it is the (y) values that are stored. Myers describes the predicate for detecting meeting paths in O(ND) as: (x >= u) && (x-y == u-v), where (x,y) are forward endpoints and (u,v) are reverse endpoints. Observe that since y = x-k, then (x-y == u-v) is equivalent to “forward-k == reverse-k”. However, in operation one always checks the opposing path with the same k index, and so this clause is redundant. It is sufficient to check that (x >= u), or in the case of O(NP), that (y >= v). In the code, this looks something like:

y = max(Vf[k-1] + 1, Vf[k+1]);
if (y >= Vr[k]) {
    // overlapping paths detected 
}

The other checks for forward and reverse are similar. Note that these checks happen at the beginning of each ‘snake’, that is prior to invoking the snake extension logic. The semantic is that the opposing path overlaps the run (snake) one is about to start.

Computing Distance

When two overlapping paths are detected, we must compute the path distance associated with their union. In the O(ND) algorithm, we know that distance implicitly, as the paths are extended over successive iterations of D. In the O(NP) algorithm, however, the current path endpoints are associated with a particular value of P, and so we must consider how to obtain the actual distance.

A little algebra comes to the rescue. At iteration P, consider the number of deletions along the forward-path at the kth endpoint, which I will denote as ‘vf’ (the authors refer to it as V(x,y)). In [2], the authors show that P == vf when k < delta, and P == vf+k-delta, when k > delta (note that either formula applies for k == delta). Solving for vf, we have: vf == P for k < delta and vf == P+delta-k for k > delta. The authors also show that: vf = (df-k)/2, where df is the total edit distance along the path up to the current endpoint (the authors refer to df as D(x,y)). Therefore, we have: df = 2(vf)+k, where we can obtain vf from the expression we just derived.

It remains to derive the expressions for the reverse direction, where we want ‘vr’ and ‘dr’. Here, I note that the mirror-image indexing of the reverse algorithm implies that the expressions above work if we transform k –> delta-k. Making that transform gives us: vr == P for k > 0 and vr == P+k for k < 0 (again, either applies for k == 0). And dr = 2(vr)+delta-k.

And so the actual edit distance covered by our overlapping paths is: d == (df+dr) == 2(vf+vr)+delta. Note now pleasing this is, as vf+vr is the number of deletions of the combined paths, and so this corresponds to the original formula D == 2P+delta, where P is the number of deletions over the entire pathway. We also see from the above that at a given Pth iteration, P does not equal the number of deletions in all paths with endpoints at the current iteration. The true number of deletions for a given endpoint is a function of P, k and delta.

A note on implementation: when one is advancing forward paths, an overlapping reverse-path will be from previous iteration (P-1), as the reverse paths for (P) have not happened yet. That will show up in the distance formula for (vr) by using (P-1) in place of P, as in this example code:

y = max(Vf[k-1] + 1, Vf[k+1]);
if (y >= Vr[k]) {
    // we found overlapping path, so compute corresponding edit distance
    vf = (k>delta) ? (P + delta - k) : P;
    // use (P-1) for reverse paths:
    vr = (k<0) ? (P-1 + k) : P-1;
    d = 2*(vf+vr)+delta;
}

// ....

y = min(Vr[k-1], Vr[k+1] - 1);
if (y <= Vf[k]) {
    // we can use P for both since forward-paths have been advanced:
    vf = (k>delta) ? (P + delta - k) : P;
    vr = (k<0) ? (P + k) : P;
    d = 2*(vf+vr)+delta;
}

Shortest Path

With respect to halting conditions, the O(NP) algorithm differs in one imporant way from the O(ND) algorithm: The O(ND) algorithm maintains path endpoints corresponding to increasing distance (D) values. Therefore, when two paths meet, they form a shortest-distance path by definition, and the algorithm can halt on the first such overlap it detects.

The same is not true for the O(NP) algorithm. It stores endpoints at a particular P value. However, at a given value of P, actual distances may vary considerably. On a given iteration over P, actual path distances may vary from 2(P-1)+delta up to 4P+delta.

This problem is dealt with by maintaining a best-known distance, ‘Dbest’, which is initialized to its maximum possible value of N+M, the sum of both sequence lengths. Whenever two overlapping paths are detected, their corresponding distance ‘d’ is computed as described earlier, and the running minimum is maintainted: Dbest = min(Dbest,d). As mentioned above, we know that the mimimum possible distance at a given iteration is Dmin = 2(P-1)+delta, and so when Dmin >= Dbest, we halt and return Dbest as our result.

Loop Bounding

Some important computational efficiency can be obtained by reorganizing the looping over the endpoints. As mentioned above, conceptually the looping proceeds from the outside, inward. Suppose we organize the looping over k values such that we explore k = {-P, P+delta, -P+1, P+delta-1, -P+2, P+delta-2 … } Note that the symmetry breaks a bit when we get to k==delta, as here we stop iterating backward, but continue iterating forward until we hit delta from below. In the code, this looping pattern looks something like:

// advance forward paths: reverse path looping is similar
for (ku = -P, kd = P+delta;  ku <= delta;  ku += 1) {
    // advance diagonals from -P, upwards:
    y = max(1+Vf[ku-1], Vf[ku+1]);

    // check for overlapping path

    Vf[ku] = snake(y-ku, y);

    // stop searching backward past here:
    if (kd <= delta) continue;

    // advance diagonals from P+delta, downwards:
    y = max(1+Vf[kd-1], Vf[kd+1]);

    // check for overlapping path

    Vf[kd] = snake(y-kd, y);
    kd -= 1;
}

There is method to this madness. Observe that for any particular P value, the smallest edit distances are at the outside, and get larger as one moves inward. The minimum distance 2P+delta is always when k == -P, and k == P+delta. As we proceed inward, the corresponding edit distance increases towards its maximum of 4P+delta. This allows two optimizations. The first is that if we hit an overlapping path, we can now exit the loop immediately, as we know that any other such overlapping paths to our inside will have a larger edit distance, and so do not need to be considered.

The second optimization is to recall that path distances are a function of P, k and delta. We can use this information to solve for k and obtain a useful adaptive bound on how far we loop. From previous sections, also recall we are keeping a best-known distance Dbest. We know that we do not have to explore any paths whose distance is >= Dbest. So, we can set up the following inequality: 2(vf+vr)+delta < Dbest, where vf = P, and vr = (P-1)+k, where k < 0, which is the region where distance is growing. Therefore, we have 2(P+(P-1)+k)+delta < Dbest. Solving for k, we have: k < ((Dbest-delta)/2)-2P+1. The looping wants to use ‘<=’, so we can rewrite as: k <= ((Dbest-delta-1)/2)-2P+1. For the reverse-path looping, we can set up a similar inequality: 2(P+P+delta-k)+delta < Dbest, which yields: k >= ((1+delta-Dbest)/2)+delta+2P.

Note that if these bound expressions evaluate to a value past the nominal bound, then the nominal bound remains in effect: e.g. the operative forward looping bound = min(delta, ((Dbest-delta)/2)-2P). Also note that these constraints do not break the computation of the endpoints, because when the bounds move, they always retreat toward the outside by 2 on each iteration of P. Since computation proceeds outside in, that means the necessary values are always correctly populated from the previous iteration.

In the code, the forward path looping looks like this:

// compute our adaptive loop bound (using P-1 for reverse)
bound = min(delta, ((Dbest-delta-1)/2)-(2*P)+1);

// constrain our search by bound:
for (ku = -P, kd = P+delta;  ku <= bound;  ku += 1) {
    y = max(1+Vf[ku-1], Vf[ku+1]);
    if (y >= Vr[k]) {
        vf = (k>delta) ? (P + delta - k) : P;
        vr = (k<0) ? (P-1 + k) : P-1;

        // maintain minimum distance:
        Dbest = min(Dbest, 2*(vf+vr)+delta);

        // we can now halt this loop immediately:
        break;
    }

    Vf[ku] = snake(y-ku, y);

    if (kd <= delta) continue;

    y = max(1+Vf[kd-1], Vf[kd+1]);
    if (y >= Vr[k]) {
        vf = (k>delta) ? (P + delta - k) : P;
        vr = (k<0) ? (P-1 + k) : P-1;

        // maintain minimum distance:
        Dbest = min(Dbest, 2*(vf+vr)+delta);

        // we can now halt this loop immediately:
        break;
    }

    Vf[kd] = snake(y-kd, y);
    kd -= 1;
}

Implementation

In conclusion, I will display a code segment with all of the ideas presented above, coming together. This segment was taken from my working prototype code, with some syntactic clutter removed and variable names changed to conform a bit more closely to [2]. The implementation of O(NP) below is performing about 25% faster than the corresponding O(ND) algorithm in my benchmarking tests, and also uses substantially less memory.

// initialize this with the maximum possible distance:
Dbest = M+N;

P = 0;
while (true) {
    // the minimum possible distance for the current P value
    Dmin = 2*(P-1) + delta;

    // if the minimum possible distance is >= our best-known distance, we can halt
    if (Dmin >= Dbest) return Dbest;

    // adaptive bound for the forward looping
    bound = min(delta, ((Dbest-delta-1)/2)-(2*P)+1);

    // advance forward diagonals
    for (ku = -P, kd = P+delta;  ku <= bound;  ku += 1) {
        y = max(1+Vf[ku-1], Vf[ku+1]);
        x = y-ku;

        // path overlap detected
        if (y >= Vr[ku]) {
            vf = (ku>delta) ? (P + delta - ku) : P;
            vr = (ku<0) ? (P-1 + ku) : P-1;
            Dbest = min(Dbest, 2*(vf+vr)+delta);
            break;
        }

        // extend forward snake
        if (N >= M) {
            while (x < M  &&  y < N  &&  equal(S1[x], S2[y])) { x += 1;  y += 1; }
        } else {
            while (x < N  &&  y < M  &&  equal(S1[y], S2[x])) { x += 1;  y += 1; }
        }

        Vf[ku] = y;

        if (kd <= delta) continue;

        y = max(1+Vf[kd-1], Vf[kd+1]);
        x = y-kd;

        // path overlap detected
        if (y >= Vr[kd]) {
            vf = (kd>delta) ? (P + delta - kd) : P;
            vr = (kd<0) ? (P-1 + kd) : P-1;
            Dbest = min(Dbest, 2*(vf+vr)+delta);
            break;
        }

        // extend forward snake
        if (N >= M) {
            while (x < M  &&  y < N  &&  equal(S1[x], S2[y])) { x += 1;  y += 1; }
        } else {
            while (x < N  &&  y < M  &&  equal(S1[y], S2[x])) { x += 1;  y += 1; }
        }

        Vf[kd] = y;
        kd -= 1;
    }

    // adaptive bound for the reverse looping
    bound = max(0, ((1+delta-Dbest)/2)+delta+(2*P));

    // advance reverse-path diagonals:
    for (kd=P+delta, ku=-P;  kd >= bound;  kd -= 1) {
        y = min(Vr[kd-1], Vr[kd+1]-1);
        x = y-kd;

        // path overlap detected
        if (y <= Vf[kd]) {
            vf = (kd>delta) ? (P + delta - kd) : P;
            vr = (kd<0) ? (P + kd) : P;
            Dbest = min(Dbest, 2*(vf+vr)+delta);
            break;
        }

        // extend reverse snake
        if (N >= M) {
            while (x > 0  &&  y > 0  &&  equal(S1[x-1], S2[y-1])) { x -= 1;  y -= 1; }
        } else {
            while (x > 0  &&  y > 0  &&  equal(S1[y-1], S2[x-1])) { x -= 1;  y -= 1; }
        }

        Vr[kd] = y;

        if (ku >= 0) continue;

        y = min(Vr[ku-1], Vr[ku+1]-1);
        x = y-ku;

        // path overlap detected
        if (y <= Vf[ku]) {
            vf = (ku>delta) ? (P + delta - ku) : P;
            vr = (ku<0) ? (P + ku) : P;
            Dbest = min(Dbest, 2*(vf+vr)+delta);
            break;
        }

        // extend reverse snake
        if (N >= M) {
            while (x > 0  &&  y > 0  &&  equal(S1[x-1], S2[y-1])) { x -= 1;  y -= 1; }
        } else {
            while (x > 0  &&  y > 0  &&  equal(S1[y-1], S2[x-1])) { x -= 1;  y -= 1; }
        }

        Vr[ku] = y;
        ku += 1;
    }
}

References

[1] An O(ND) Difference Algorithm and its Variations, Eugene W. Myers
[2] An O(NP) Sequence Comparison Algorithm, Sun Wu, Udi Manber, Gene Myers

The Impact of Negotiator Cycle Cadence on Slot Loading

| Feedback

The HTCondor negotiator assigns jobs (resource requests) to slots (compute resources) at regular intervals, configured by the NEGOTIATOR_INTERVAL parameter. This interval (the cycle cadence) has a fundamental impact on a pool loading factor – the fraction of time that slots are being productively utilized.

Consider the following diagram, which illustrates the utilization of a slot over the lifetime of a job. When a job completes, its slot will remain empty until it can be assigned a new job on the next negotiation cycle.

As the diagram above shows, the loading factor for a slot can be expressed as D/Z, where D is the duration of the job, and Z is the total time until the next cycle occurring after the job completes. We can also write Z = D+I, where I is the “idle time” from job completion to the start of the next negotiation cycle. Loading factor is always <= 1, where a value of 1 corresponds to ideal loading – every slot is utilized 100% of the time. In general, loading will be < 1, as jobs rarely complete exactly on a cycle boundary.

It is worth briefly noting that the claim reuse feature was developed to help address this problem. However, claim re-use is not compatible with all other features – for example enabling claim re-use can cause accounting group starvation – and so what follows remains relevant to many HTCondor configurations.

Given a particular negotiation cycle cadence, how does a slot’s loading factor behave, as a function of job duration? The loading factor can be expressed as:

\[ \text{Loading Factor} = \frac{D}{C \left( q + \lceil r \rceil \right)} \\ \\ \text{where:} \\ D = \text{job duration} \\ C = \text{cycle cadence} \\ q = \lfloor D / C \rfloor \\ r = \left( D / C \right) - q \\ \]

The following plot illustrates how the loading factor changes with job duration, assuming a cadence of 300 seconds (5 minutes):

We immediately see that there is a saw-tooth pattern to the plot. As the job duration increases towards the boundary of a cycle, there is less and less idle time until the next cycle, and so the loading approaches 1.0. However, once the job’s end crosses the thresold to just past the start of the cycle, it immediately drops to the worse possible case: the slot will be idle for nearly an entire cycle.

The other important pattern is that the bottom of the saw-tooth gradually increases. As a job’s duration occupies more whole negotiation cycles, the idle time at the end of the last cycle represents a decreasing fraction of the total time.

Observe that the most important ‘unit’ in this plot is the number of negotiation cycles. Since the saw-toothing scales with the cycle interval, we can express the same plot in units of cycles instead of seconds:

The results above suggest a couple possible approaches for tuning negotiator cycle cadence to optimize slot loading in an HTCondor pool. The first is to configure the negotiator interval to be small relative to a typical job duration, as the lower-bound on loading factor increases with the number of cycles a job’s duration occupies. For example, if a typical job duration is 10 minutes, then a cycle cadence of 60 seconds ensures that in general 9 out of 10 cycles will be fully utilized, and so loading will be around 90%. However, if one has mostly very short jobs, this can be difficult, as negotiation cycle cadences much less than 60 seconds may risk causing performance problems even on a moderately loaded pool.

A second approach is to try and tune the cadence so that as many jobs as possible complete near the end of a cycle, thus minimizing delay until the next cycle. For example, if job durations are relatively consistent, say close to 90 seconds, then setting the negotiator interval to something like 50 seconds will induce those jobs to finish near the end of the 2nd negotiation cycle (at t+100 seconds), for a loading factor around 90%. The caveat here is that job durations are frequently not that consistent, and as job duration spread increases, one’s ability to play this game rapidly evaporates.

In this post, I have focused on the behavior of individual jobs and individual slots. An obvious next question is what happens to aggregate pool loading when job durations are treated as population sampling from random variables, which I plan to explore in future posts.

Smooth Gradients for Cubic Hermite Splines

| Feedback

One of the advantages of cubic Hermite splines is that their interval interpolation formula is an explicit function of gradients \( m_0, m_1, … m_{n-1} \) at knot-points:

\[ y(t) = h_{00}(t) y_j + h_{10}(t) m_j + h_{01}(t) y_{j+1} + h_{11}(t) m_{j+1} \\ \]

where the Hermite bases are:

\[ h_{00} = 2t^3 - 3t^2 + 1 \\ h_{10} = t^3 - 2t^2 + t \\ h_{01} = -2t^3 + 3t^2 \\ h_{11} = t^3 - t^2 \\ \]

(For now, I will be using the unit-interval form of the interpolation, where t runs from 0 to 1 on each interval. I will also discuss the non-uniform interval equations below)

This formulation allows one to explicitly specify the interpolation gradient at each knot point, and to choose from various gradient assignment policies, for example those listed here, even supporting policies for enforcing monotonic interpolations.

One important caveat with cubic Hermite splines is that although the gradient \( y’(t) \) is guaranteed to be continuous, it is not guaranteed to be smooth (that is, differentiable) across the knots (it is of course smooth inside each interval). Therefore, another useful category of gradient policy is to obtain gradients \( m_0, m_1, … m_{n-1} \) such that \( y’(t) \) is also smooth across knots.

(I feel sure that what follows was long since derived elsewhere, but my attempts to dig the formulation up on the internet failed, and so I decided the derivation might make a useful blog post)

To ensure smooth gradient across knot points, we want the 2nd derivative \( y”(t) \) to be equal at the boundaries of adjacent intervals:

\[ h_{00}^”(t) y_{j-1} + h_{10}^”(t) m_{j-1} + h_{01}^”(t) y_j + h_{11}^”(t) m_j \\ = \\ h_{00}^”(t) y_j + h_{10}^”(t) m_j + h_{01}^”(t) y_{j+1} + h_{11}^”(t) m_{j+1} \]

or substituting the 2nd derivative of the basis definitions above:

\[ \left( 12 t - 6 \right) y_{j-1} + \left( 6 t - 4 \right) m_{j-1} + \left( 6 - 12 t \right) y_j + \left( 6 t - 2 \right) m_j \\ = \\ \left( 12 t - 6 \right) y_{j} + \left( 6 t - 4 \right) m_{j} + \left( 6 - 12 t \right) y_{j+1} + \left( 6 t - 2 \right) m_{j+1} \]

Observe that t = 1 on the left hand side of this equation, and t = 0 on the right side, and so we have:

\[ 6 y_{j-1} + 2 m_{j-1} - 6 y_j + 4 m_j = -6 y_j - 4 m_j + 6 y_{j+1} - 2 m_{j+1} \]

which we can rearrange as:

\[ 2 m_{j-1} + 8 m_j + 2 m_{j+1} = 6 \left( y_{j+1} - y_{j-1} \right) \]

Given n knot points, the above equation holds for j = 1 to n-2 (using zero-based indexing, as nature intended). Once we define equations for j = 0 and j = n-1, we will have a system of equations to solve. There are two likely choices. The first is to simply specify the endpoint gradients \( m_0 = G \) and \( m_{n-1} = H \) directly, which yields the following tri-diagonal matrix equation:

\[ \left( \begin{array} {ccccc} 1 & & & & \\ 2 & 8 & 2 & & \\ & 2 & 8 & 2 & \\ & & \vdots & & \\ & & 2 & 8 & 2 \\ & & & & 1 \\ \end{array} \right) \left( \begin{array} {c} m_0 \\ m_1 \\ \\ \vdots \\ \\ m_{n-1} \end{array} \right) = \left( \begin{array} {c} G \\ 6 \left( y_2 - y_0 \right) \\ 6 \left( y_3 - y_1 \right) \\ \vdots \\ 6 \left( y_{n-1} - y_{n-3} \right) \\ H \\ \end{array} \right) \]

The second common endpoint policy is to set the 2nd derivative equal to zero – the “natural spline.” Setting the 2nd derivative to zero at the left-end knot (and t = 0) gives us:

\[ 4 m_0 + 2 m_1 = 6 \left( y_1 - y_0 \right) \]

Similarly, at the right-end knot (t = 1), we have:

\[ 2 m_0 + 4 m_1 = 6 \left( y_{n-1} - y_{n-2} \right) \]

And so for a natural spline endpoint policy the matrix equation looks like this:

\[ \left( \begin{array} {ccccc} 4 & 2 & & & \\ 2 & 8 & 2 & & \\ & 2 & 8 & 2 & \\ & & \vdots & & \\ & & 2 & 8 & 2 \\ & & & 2 & 4 \\ \end{array} \right) \left( \begin{array} {c} m_0 \\ m_1 \\ \\ \vdots \\ \\ m_{n-1} \end{array} \right) = \left( \begin{array} {c} 6 \left( y_1 - y_0 \right) \\ 6 \left( y_2 - y_0 \right) \\ 6 \left( y_3 - y_1 \right) \\ \vdots \\ 6 \left( y_{n-1} - y_{n-3} \right) \\ 6 \left( y_{n-1} - y_{n-2} \right) \\ \end{array} \right) \]

The derivation above is for uniform (and unit) intervals, where t runs from 0 to 1 on each knot interval. I’ll now discuss the variation where knot intervals are non-uniform. The non-uniform form of the interpolation equation is:

\[ y(x) = h_{00}(t) y_j + h_{10}(t) d_j m_j + h_{01}(t) y_{j+1} + h_{11}(t) d_j m_{j+1} \\ \text{ } \\ \text{where:} \\ \text{ } \\ d_j = x_{j+1} - x_j \text{ for } j = 0, 1, … n-2 \\ t = (x - x_j) / d_j \]

Taking \( t = t(x) \) and applying the chain rule, we see that 2nd derivative equation now looks like:

\[ y”(x) = \frac { \left( 12 t - 6 \right) y_{j} + \left( 6 t - 4 \right) d_j m_{j} + \left( 6 - 12 t \right) y_{j+1} + \left( 6 t - 2 \right) d_j m_{j+1} } { d_j^2 } \]

Applying a derivation similar to the above, we find that our (interior) equations look like this:

\[ \frac {2} { d_{j-1} } m_{j-1} + \left( \frac {4} { d_{j-1} } + \frac {4} { d_j } \right) m_j + \frac {2} {d_j} m_{j+1} = \frac { 6 \left( y_{j+1} - y_{j} \right) } { d_j^2 } + \frac { 6 \left( y_{j} - y_{j-1} \right) } { d_{j-1}^2 } \]

and natural spline endpoint equations are:

\[ \text{left: } \frac {4} {d_0} m_0 + \frac {2} {d_0} m_1 = \frac {6 \left( y_1 - y_0 \right)} {d_0^2} \\ \text{right: } \frac {2} {d_{n-2}} m_0 + \frac {4} {d_{n-2}} m_1 = \frac {6 \left( y_{n-1} - y_{n-2} \right)} {d_{n-2}^2} \]

And so the matrix equation for specified endpoint gradients is:

\[ \scriptsize \left( \begin{array} {ccccc} \normalsize 1 \scriptsize & & & & \\ \frac{2}{d_0} & \frac{4}{d_0} {+} \frac{4}{d_1} & \frac{2}{d_1} & & \\ & \frac{2}{d_1} & \frac{4}{d_1} {+} \frac{4}{d_2} & \frac{2}{d_2} & \\ & & \vdots & & \\ & & \frac{2}{d_{n-3}} & \frac{4}{d_{n-3}} {+} \frac{4}{d_{n-2}} & \frac{2}{d_{n-2}} \\ & & & & \normalsize 1 \scriptsize \\ \end{array} \right) \left( \begin{array} {c} m_0 \\ m_1 \\ \\ \vdots \\ \\ m_{n-1} \end{array} \right) = \left( \begin{array} {c} G \\ 6 \left( \frac{y_2 {-} y_1}{d_1^2} {+} \frac{y_1 {-} y_0}{d_0^2} \right) \\ 6 \left( \frac{y_3 {-} y_2}{d_2^2} {+} \frac{y_2 {-} y_1}{d_1^2} \right) \\ \vdots \\ 6 \left( \frac{y_{n-1} {-} y_{n-2}}{d_{n-2}^2} {+} \frac{y_{n-2} {-} y_{n-3}}{d_{n-3}^2} \right) \\ H \\ \end{array} \right) \normalsize \]

And the equation for natural spline endpoints is:

\[ \scriptsize \left( \begin{array} {ccccc} \frac{4}{d_0} & \frac{2}{d_0} & & & \\ \frac {2} {d_0} & \frac {4} {d_0} {+} \frac {4} {d_1} & \frac{2}{d_1} & & \\ & \frac{2}{d_1} & \frac{4}{d_1} {+} \frac{4}{d_2} & \frac{2}{d_2} & \\ & & \vdots & & \\ & & \frac{2}{d_{n-3}} & \frac{4}{d_{n-3}} {+} \frac{4}{d_{n-2}} & \frac{2}{d_{n-2}} \\ & & & \frac{2}{d_{n-2}} & \frac{4}{d_{n-2}} \\ \end{array} \right) \left( \begin{array} {c} m_0 \\ m_1 \\ \\ \vdots \\ \\ m_{n-1} \end{array} \right) = \left( \begin{array} {c} \frac{6 \left( y_1 {-} y_0 \right)}{d_0^2} \\ 6 \left( \frac{y_2 {-} y_1}{d_1^2} {+} \frac{y_1 {-} y_0}{d_0^2} \right) \\ 6 \left( \frac{y_3 {-} y_2}{d_2^2} {+} \frac{y_2 {-} y_1}{d_1^2} \right) \\ \vdots \\ 6 \left( \frac{y_{n-1} {-} y_{n-2}}{d_{n-2}^2} {+} \frac{y_{n-2} {-} y_{n-3}}{d_{n-3}^2} \right) \\ \frac{6 \left( y_{n-1} {-} y_{n-2} \right)}{d_{n-2}^2} \\ \end{array} \right) \normalsize \]