linguistics, computers, and puns

TopQueue in Scala - Part 2

July 06, 2018 #Algorithms #Deep Dive #Interviews #Scala #TopQueue

In this post we'll be developing the solution that we began designing in the last post. As a refresher, here's the problem we're solving.

Design and implement a method to select the largest m values from a very large list of n numbers.

Last time we learned that sorting the entire sequence of n numbers doesn't scale. In some cases, the numbers may not even fit in memory. But with our solution, this won't be a problem.

Mind Your Trees and Queues

Enter the heap (not to be confused with the JVM's heap memory), also known as a priority queue.

A heap will store any number of values, but rather than dispensing values in the order in which they were added (i.e., "first"), heaps dispense the largest (in the case of a Max Heap) or smallest (for Min Heap) values.

Scala's standard library includes an implementation of the Heap in the PriorityQueue type.

scala> val queue = collection.mutable.PriorityQueue(1, 3, 5, 7, 9, 2, 4, 6, 8, 0)
queue: collection.mutable.PriorityQueue[Int] = PriorityQueue(9, 8, 5, 7, 3, 2, 4, 6, 1, 0)

scala> queue.head
res0: Int = 9

scala> queue.dequeue
res1: Int = 9

scala> queue.head
res2: Int = 8

scala> queue.dequeueAll
res3: collection.immutable.IndexedSeq[Int] = Vector(8, 7, 6, 5, 4, 3, 2, 1, 0)

There are a few things to notice here:

  1. The order in which values are dequeued is independent of the order of insertion.
  2. Values are not (necessarily) stored in-order internally.
  3. Values are returned in order when dequeued.

The heap is far more powerful (and efficient) than manually sorting a list. (See the Appendix for more on this.)

Thus, the heap is our infamous mystery collection. Let's call it TopQueue.

TopQueue Force Assemble!

Now that we have a design for our collection, it's time to start putting the pieces together.

Internally, TopQueue will maintain the biggest m values. Here's what the core logic looks like:

if pqueue has fewer than m values:
    insert new value
else if element > smallest value in pqueue:
    remove smallest value in pqueue
    insert new value

In the next few subsections we'll design an interface, devise tests for that interface, and (finally) fill in the code to make it all work.

Speak My Language

The essential operations we need from our TopQueue are:

Let's define our interface around these operations:

def add(element: Int): Unit
def size: Int
def result: Seq[Int]

Don't Test Me, Bro

Sweet. Now that we have an interface, we get to the fun part: testing!

Using the ScalaTest framework, we can write the following basic testcases:

import org.scalatest._

class TopQueueSpec extends FlatSpec with Matchers {
  "TopQueue" should "handle being empty" in {
    val emptyQueue = new TopQueue(5)
    emptyQueue.size should equal (0)
    emptyQueue.result.size should equal (0)
  }

  it should "handle less than capacity" in {
    val queue = new TopQueue(5)
    for (i <- Seq(1, 5, 3))
      queue.add(i)
    queue.size should equal (3)
    queue.result should equal (Seq(5, 3, 1))
  }

  it should "handle capacity" in {
    val queue = new TopQueue(5)
    for (i <- Seq(1, 5, 3, 4, 2))
      queue.add(i)
    queue.size should equal (5)
    queue.result should equal (Seq(5, 4, 3, 2, 1))
  }

  it should "handle more than capacity values" in {
    val queue = new TopQueue(5)
    for (i <- Seq(1, 5, 3, 4, 2, 7, 6))
      queue.add(i)
    queue.size should equal (5)
    queue.result should equal (Seq(7, 6, 5, 4, 3))
  }
}

TopQueue is using a Min Heap (i.e., a PriorityQueue with its ordering reversed). We'll see how this works in the next section.

Gotta Get Queue into My Life

Now that we've got a basic design and a test suite to specify the behavior we expect, let's write some code.

import collection.mutable.PriorityQueue

class TopQueue(val capacity: Int) {
  private[this] val queue = PriorityQueue.empty[Int](Ordering[Int].reverse)

  def add(element: Int): Unit = {
    if (queue.size < capacity) {
      queue.enqueue(element)
    } else if (element > queue.head) {
      val _ = queue.dequeue
      queue.enqueue(element)
    }
  }

  def size: Int = queue.length
  def result: Seq[Int] = queue.dequeueAll.reverse
}

That's it. Not much to look at, is it? Three public methods wrapping a private PriorityQueue member. There are two important things happening in this small snippet of code.

First, look at the PriorityQueue instantiation:

private[this] val queue = PriorityQueue.empty[Int](Ordering[Int].reverse)

Note the argument: Ordering[Int].reverse This tells the PriorityQueue to invert the comparison of its arguments; rather than ordering by the largest values, it orders by smallest, turning queue into a Min Heap.

Binary Min Heap

Second, notice the if expression in the add() method. This is the scala expansion of our pseudocode from before. But instead of using a Max Heap to store all the values and extracting only the first m, we use a Min Heap to store only the values that are larger than the smallest in the heap. This gives us the following performance expectations:

Given this implementation, we get the following output from our test class:

$ sbt test
...
[info] TopQueueSpec:
[info] TopQueue
[info] - should handle being empty
[info] - should handle less than capacity
[info] - should handle capacity
[info] - should handle more than capacity values
[info] Run completed in 336 milliseconds.
[info] Total number of tests run: 4
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 1 s

The Check's in the Scale

Remember our "very big n" from the first post? Let's write a test case to make sure our solution lives up to all the promises I've made!

it should "handle a lot of input values quickly" in {
  val (n, m) = (100000000, 100)
  val rng = scala.util.Random
  val queue = new TopQueue(m)

  failAfter(30 seconds) {
    for (i <- 0 until n)
      queue.add(rng.nextInt(Int.MaxValue))
  }
}

With a timeout of 30 seconds, I've gambled that TopQueue performs at least twice as fast as our naïve solution. Here goes...

[info] TopQueueSpec:
[info] TopQueue
[info] - should handle being empty
[info] - should handle less than capacity
[info] - should handle capacity
[info] - should handle more than capacity values
[info] - should handle a lot of input values
[info] Run completed in 1 second, 162 milliseconds.
[info] Total number of tests run: 5
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 5, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 1 s

The test passed, so we've obviously come in under the 30 second timer. And if we look at the scalatest output, we can see that the whole suite of 5 tests ran in just over 1 second. This amounts to approximately a 60x speed increase versus our first solution.

Hey, we didn't even need to increase the heap size to get this major performance win! In fact, I've verified on my machine that this runs with a heap well under 32MB, down from over 3GB. That's almost a 100x space efficiency improvement.

If you want to see how we achieve such a drastic improvement in both time and space, keep reading.

To see how we can give our minimalist collection some Scala flavor and make it more general and useful, check out part 3.


Appendix: Big O Analysis

How did we get such a drastic improvement in both time and space?

You might remember from part 1 that our first solution had a complexity of about O(n log n). What are the complexities of our TopQueue?

Given:

Method   Time   Space
Sorting n log n ~63.017 s n log n > 3000 MB
TopQueue n log m ~1.038 s m < 32 MB
  ~60x faster ~99% less space

The Time Complexity of TopQueue is n log m. This means that the time required is proportional to the total number of elements processed (to read, compare, and optionally store them) multiplied by the log of the number of elements stored. Because of the tree-based implementation of the heap, insertion (and removal) are logarithmic operations.

The Space Complexity of TopQueue is the big win here. We never need more space than it takes to store our 100 top values. This is how we're able to process all 100,000,000 values using less heap memory than it would take to even store all their raw 32-bit values.

Note that this notation is designed to help us analyze and compare relative complexities, not to give us concrete time/memory estimates.

For example, the log in the table doesn't equate to log2 or loge. It depends on several factors, including the type, size, and depth of the data structure, and the efficiency of the algorithm.

We'll see how we can make our super efficient code even more useful — and easy to use — in part 3.