I want to make an argument that the Algebird Aggregator design, in particular its use of the prepare operation in a map-reduce context, has substantial inefficiencies, compared to an equivalent formulation that is more directly suited to taking advantage of Scala’s aggregate method on collections method.

Consider the definition of aggregation in the Aggregator class:

def apply(inputs: TraversableOnce[A]): C = present(reduce(inputs.map(prepare)))


You can see that it is a standard map/reduce operation, where reduce is defined as a monoidal (or semigroup – more on this later) operation. Under the hood, it boils down to an invocation of Scala’s reduceLeft method. The key thing to notice is that the role of prepare is to map a collection of data elements into the required monoids, which are then aggregated using that monoid’s plus operation. In other words, prepare converts data elements into “singleton” monoids each representing a data element.

Now, if the monoid in question is simple, say some numeric type, this conversion is free, or nearly so. For example, the conversion of an integer into the “integer monoid” is a no-op. However, there are other kinds of “non-trivial” monoids, for which the conversion of a data element into its corresponding monoid may be costly. In this post, I will be using the monoid defined by Scala Set[Int], where the monoid plus operation is set union, and of course the zero element is the empty set.

Consider the process of defining an Algebird aggregator for the task of generating the set of unique elements in a data set. The corresponding prepare operation is: prepare(e: Int) = Set(e). A monoid trait that encodes this idea might look like the following. (the code I used in this post can be found here)

// an algebird-like monoid with the 'prepare' operation
trait PreparedMonoid[M, E] {
val zero: M
def plus(m1: M, m2: M): M
def prepare(e: E): M
}

// a PreparedMonoid for a set of integers.  monoid operator is set union.
object intSetPrepared extends PreparedMonoid[Set[Int], Int] {
val zero = Set.empty[Int]
def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2
def prepare(e: Int) = Set(e)
}

implicit class SeqWithMapReduce[E](seq: Seq[E]) {
// algebird map/reduce Aggregator model
def mrPrepared[M](mon: PreparedMonoid[M, E]): M = {
seq.map(mon.prepare).reduceLeft(mon.plus)
}
}


If we unpack the above code, as applied to intSetPrepared, we are instantiating a new Set object, containing a single value, for every single input data element.

But there is a potentially better model of aggregation, exemplified by the Scala aggregate method. This method does not use a prepare operation. It uses a zero value and a monoidal operator, which the Scala docs refer to as combop, but it also uses an “update” operation, that defines how to update the monoid object, directly, with a single element, referred to as seqop in Scala’s documentation. This idea can also be encoded as a flavor of monoid, enhanced with an update method:

// an algebird-like monoid with 'update' operation
trait UpdatedMonoid[M, E] {
val zero: M
def plus(m1: M, m2: M): M
def update(m: M, e: E): M
}

// an equivalent UpdatedMonoid for a set of integers
object intSetUpdated extends UpdatedMonoid[Set[Int], Int] {
val zero = Set.empty[Int]
def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2
def update(m: Set[Int], e: Int) = m + e
}

implicit class SeqWithMapReduceUpdated[E](seq: Seq[E]) {
// map/reduce logic, taking advantage of scala 'aggregate'
def mrUpdatedAggregate[M](mon: UpdatedMonoid[M, E]): M = {
seq.aggregate(mon.zero)(mon.update, mon.plus)
}
}


This arrangement promises more efficiency when aggregating w.r.t. nontrivial monoids, by avoiding the construction of “singleton” monoids for each data element. The following demo confirms that for the Set-based monoid, it is over 10 times faster:

scala> :load /home/eje/scala/prepare.scala
defined module prepare

scala> import prepare._
import prepare._

scala> val data = Vector.fill(1000000) { scala.util.Random.nextInt(10) }
data: scala.collection.immutable.Vector[Int] = Vector(7, 9, 4, 2, 7,...

// Verify that output is the same for both implementations:
scala> data.mrPrepared(intSetPrepared)
res0: Set[Int] = Set(0, 5, 1, 6, 9, 2, 7, 3, 8, 4)

// results are the same
scala> data.mrUpdatedAggregate(intSetUpdated)
res1: Set[Int] = Set(0, 5, 1, 6, 9, 2, 7, 3, 8, 4)

// Compare timings of prepare-based versus update-based aggregation
// (benchmark values are returned in seconds)
scala> benchmark(10) { data.mrPrepared(intSetPrepared) }
res2: Double = 0.2957673056

// update-based aggregation is 10 times faster
scala> benchmark(10) { data.mrUpdatedAggregate(intSetUpdated) }
res3: Double = 0.027041249300000004


It is also possible to apply Scala’s aggregate to a monoid enhanced with prepare:

implicit class SeqWithMapReducePrepared[E](seq: Seq[E]) {
// using 'aggregate' with prepared op
def mrPreparedAggregate[M](mon: PreparedMonoid[M, E]): M = {
seq.aggregate(mon.zero)((m, e) => mon.plus(m, mon.prepare(e)), mon.plus)
}
}


Although this turns out to be measurably faster than the literal map-reduce implementation, it is still not nearly as fast as the variation using update:

scala> benchmark(10) { data.mrPreparedAggregate(intSetPrepared) }
res2: Double = 0.1754636707


Readers familiar with Algebird may be wondering about my use of monoids above, when the Aggregator interface is actually based on semigroups. This is important, since building on Scala’s aggregate function requires a zero element that semigroups do not have. Although I believe it might be worth considering changing Aggregator to use monoids, another sensible option is to change the internal logic for the subclass AggregatorMonoid, which does require a monoid, or possibly just define a new AggregatorMonoidUpdated subclass.

A final note on compatability: note that any monoid enhanced with prepare can be converted into an equivalent monoid enhanced with update, as demonstrated by this factory function:

object UpdatedMonoid {
// create an UpdatedMonoid from a PreparedMonoid
def apply[M, E](mon: PreparedMonoid[M, E]) = new UpdatedMonoid[M, E] {
val zero = mon.zero
def plus(m1: M, m2: M) = mon.plus(m1, m2)
def update(m: M, e: E) = mon.plus(m, mon.prepare(e))
}
}