In a previous post I discussed some scenarios where traditional map-reduce (directly applying a map function, followed by some monoidal reduction) could be inefficient. To review, the source of inefficiency is in situations where the map operation is creating some non-trivial monoid that represents a single element of the input type. For example, if the monoidal type is Set[Int], then the mapping function (‘prepare’ in algebird) maps every input integer k into Set(k), which is somewhat expensive.

In that discussion, I was focusing on map-reduce as embodied by the algebird Aggregator type, where map appears as the prepare function. However, it is easy to see that any map-reduce implementation may be vulnerable to the same inefficiency.

I wondered if there were a way to represent map-reduce using some alternative formulation that avoids this vulnerability. There is such a formulation, which I will talk about in this post.

I’ll begin by reviewing a standard map-reduce implementation. The following scala code sketches out the definition of a monoid over a type B and a map-reduce interface. As this code suggests, the map function maps input data of some type A into some monoidal type B, which can be reduced (aka “aggregated”) in a way that is amenable to parallelization:

trait Monoid[B] {
  // aka 'combine' aka '++'
  def plus: (B, B) => B

  // aka 'empty' aka 'identity'
  def e: B
}

trait MapReduce[A, B] {
  // monoid embodies the reducible type
  def monoid: Monoid[B]

  // mapping function from input type A to reducible type B
  def map: A => B

  // the basic map-reduce operation
  def apply(data: Seq[A]): B = data.map(map).fold(monoid.e)(monoid.plus)

  // map-reduce parallelized over data partitions
  def apply(data: ParSeq[Seq[A]]): B =
    data.map { part =>
      part.map(map).fold(monoid.e)(monoid.plus)
    }
    .fold(monoid.e)(monoid.plus)
}

In the parallel version of map-reduce above, you can see that map and reduce are executed on each data partition (which may occur in parallel) to produce a monoidal B value, followed by a final reduction of those intermediate results. This is the classic form of map-reduce popularized by tools such as Hadoop and Apache Spark, where inidividual data partitions may reside across highly parallel commodity clusters.

Next I will present an alternative definition of map-reduce. In this implementation, the map function is replaced by a foldL function, which executes a single “left-fold” of an input object with type A into the monoid object with type B:

// a map reduce operation based on a monoid with left folding
trait MapReduceLF[A, B] extends MapReduce[A, B] {
  def monoid: Monoid[B]

  // left-fold an object with type A into the monoid B
  // obeys type law: foldL(b, a) = b ++ foldL(e, a)
  def foldL: (B, A) => B

  // foldL(e, a) embodies the role of map(a) in standard map-reduce
  def map = (a: A) => foldL(monoid.e, a)

  // map-reduce operation is now a single fold-left operation
  override def apply(data: Seq[A]): B = data.foldLeft(monoid.e)(foldL)

  // map-reduce parallelized over data partitions
  override def apply(data: ParSeq[Seq[A]]): B =
    data.map { part =>
      part.foldLeft(monoid.e)(foldL)
    }
    .fold(monoid.e)(monoid.plus)
}

As the comments above indicate, the left-folding function foldL is assumed to obey the law foldL(b, a) = b ++ foldL(e, a). This law captures the idea that folding a into b should be the analog of reducing b with a monoid corresponding to the single element a. Referring to my earlier example, if type A is Int and B is Set[Int], then foldL(b, a) => b + a. Note that b + a is directly inserting single element a into b, which is significantly more efficient than b ++ Set(a), which is how a typical map-reduce implementation would be required to operate.

This law also gives us the corresponding definition of map(a), which is foldL(e, a), or in my example: Set.empty[Int] ++ a or just: Set(a)

In this formulation, the basic map-reduce operation is now a single foldLeft operation, instead of a mapping followed by a monoidal reduction. The parallel version is analoglous. Each partition uses the new foldLeft operation, and the final reduction of intermediate monoidal results remains the same as before.

The foldLeft function is potentially a much more general operation, and it raises the question of whether this new encoding is indeed parallelizable as before. I will conclude with a proof that this encoding is also parallelizable; Note that the law foldL(b, a) = b ++ foldL(e, a) is a significant component of this proof, as it represents the constraint that foldL behaves like an analog of reducing b with a monoidal representation of element a.

In the following proof I used a scala-like pseudo code, described in the introduction:

// given an object mr of type MapReduceFL[A, B]
// and using notation:
// f <==> mr.foldL
// for b1,b2 of type B: b1 ++ b2 <==> mr.plus(b1, b2)
// e <==> mr.e
// [...] <==> Seq(...)
// d1, d2 are of type Seq[A]

// Proof that map-reduce with left-folding is parallelizable
// i.e. mr(d1 ++ d2) == mr(d1) ++ mr(d2)
mr(d1 ++ d2)
== (d1 ++ d2).foldLeft(e)(f)  // definition of map-reduce operation
== d1.foldLeft(e)(f) ++ d2.foldLeft(e)(f)  // Lemma A
== mr(d1) ++ mr(d2)  // definition of map-reduce (QED)

// Proof of Lemma A
// i.e. (d1 ++ d2).foldLeft(e)(f) == d1.foldLeft(e)(f) ++ d2.foldLeft(e)(f)

// proof is by induction on the length of data sequence d2

// case d2 where length is zero, i.e. d2 == []
(d1 ++ []).foldLeft(e)(f)
== d1.foldLeft(e)(f)  // definition of empty sequence []
== d1.foldLeft(e)(f) ++ e  // definition of identity e
== d1.foldLeft(e)(f) ++ [].foldLeft(e)(f)  // definition of foldLeft

// case d2 where length is 1, i.e. d2 == [a] for some a of type A
(d1 ++ [a]).foldLeft(e)(f)
== f(d1.foldLeft(e)(f), a)  // definition of foldLeft and f
== d1.foldLeft(e)(f) ++ f(e, a)  // the type-law f(b, a) == b ++ f(e, a)
== d1.foldLeft(e)(f) ++ [a].foldLeft(e)(f)  // definition of foldLeft

// inductive step, assuming proof for d2' of length <= n
// consider d2 of length n+1, i.e. d2 == d2' ++ [a], where d2' has length n
(d1 ++ d2).foldLeft(e)(f)
== (d1 ++ d2' ++ [a]).foldLeft(e)(f)  // definition of d2, d2', [a]
== f((d1 ++ d2').foldLeft(e)(f), a)  // definition of foldLeft and f
== (d1 ++ d2').foldLeft(e)(f) ++ f(e, a)  // type-law f(b, a) == b ++ f(e, a)
== d1.foldLeft(e)(f) ++ d2'.foldLeft(e)(f) ++ f(e, a)  // induction
== d1.foldLeft(e)(f) ++ d2'.foldLeft(e)(f) ++ [a].foldLeft(e)(f)  // def'n of foldLeft
== d1.foldLeft(e)(f) ++ (d2' ++ [a]).foldLeft(e)(f)  // induction
== d1.foldLeft(e)(f) ++ d2.foldLeft(e)(f)  // definition of d2 (QED)