Skip to main content

Understanding thread pools through cats-effect

1 hour 

Why have thread pools?

A thread pool, also known as an execution context is a way of managing parallelism.

To demonstrate, let’s have a look at a simple task: snooze.

val snooze: IO[Unit] = IO(Thread.sleep(2000L))

snooze does absolutely nothing. More precisely, it does absolutely nothing for two seconds. We can double check this by running it using our handy time function:

time(snooze).unsafeRunSync()
// error:
// Could not find an implicit IORuntime.
// 
// Instead of calling unsafe methods directly, consider using cats.effect.IOApp, which
// runs your IO. If integrating with non-functional code or experimenting in a REPL / Worksheet,
// add the following import:
// 
// import cats.effect.unsafe.implicits.global
// 
// Alternatively, you can create an explicit IORuntime value and put it in implicit scope.
// This may be useful if you have a pre-existing fixed thread pool and/or scheduler which you
// wish to use to execute IO programs. Please be sure to review thread pool best practices to
// avoid unintentionally degrading your application performance.

Whoops! We need an IORuntime. Let’s use our own basicRuntime explicitly:

time(snooze).unsafeRunSync()(basicRuntime)
// res1: String = "The task took 2 seconds."

As expected, it took two seconds to run.

What if we have multiple snooze tasks?

val snoozes: List[IO[Unit]] = List(snooze, snooze)

We can combine a list of tasks using parSequence:

val parallelSnoozes: IO[Unit] = snoozes.parSequence.void

The parSequence function produces an IO that runs multiple tasks in parallel.

time(parallelSnoozes).unsafeRunSync()(basicRuntime)
// res2: String = "The task took 2 seconds."

Both tasks were run at the same time, so the total elapsed time was still only two seconds.

If you’re used to parallel computations, you may look at parSequence with a degree of suspicion. It lets us run many tasks in parallel, but how many?

For instance, we can declare a thousand snooze tasks:

val lotsOfSnoozes = List.fill(1000)(snooze).parSequence.void

Will they really only take two seconds?

time(lotsOfSnoozes).unsafeRunSync()(basicRuntime)
// res3: String = "The task took 2 seconds."

Nice! There seems to be no upper limit on the tasks we can run in parallel.

Knowing our limits

Unlimited parallelism seems like a great idea, but it has significant downsides. Let’s have a look at a different task to demonstrate.

val factorial: IO[Unit] = {
  @scala.annotation.tailrec
  def go(n: Long, total: Long): Long =
    if (n > 1) go(n - 1, total * n) else total
  IO(go(2000000000L, 1)).void
}

Woah! That’s an odd bit of code.

If you’re a functional programming enthusiast, you’re probably so fond of factorials that you compute them in your sleep.

Those skills, elegant as they are, aren’t too important here. Don’t worry if you’ve never heard of a factorial, haven’t seen @scala.annotation.tailrec before, or get a headache reading this Escher-like code.

The key part of factorial is that, unlike snooze, it does a lot of multiplication.

Running this on my rusty old laptop takes approximately two seconds.

time(factorial).unsafeRunSync()(basicRuntime)
// res4: String = "The task took 2 seconds."

The functional programmer within you might point out that this code is pure: there’s no reason to wrap it in an IO. While that’s true, doing so lets us parallelize it with parSequence:

val tenFactorials: IO[Unit] = List.fill(10)(factorial).parSequence.void

It took two seconds to run factorial once, so it should also take two seconds to run in parallel, shouldn’t it?

time(tenFactorials).unsafeRunSync()(basicRuntime)
// res5: String = "The task took 3 seconds."

If you ran the code above, you probably felt your laptop heat up a bit. You might have also found that the code didn’t take two seconds — it took longer.

This is different to our snooze task, which always took two seconds regardless of whether we ran one, two or a thousand in parallel.

Why would that be?

To answer that question, we need to take a closer look at our computers.

The processor beneath

Despite what we might wish, our laptops are not magical boxes with unlimited compute power: they’re made of physical devices, and those devices have limits. A computer has a limited number of processors, each of which can compute one thing at once.

We can check that number in Scala by taking a look at the Runtime object:

val numProcessors = Runtime.getRuntime().availableProcessors()
// numProcessors: Int = 8

My humble laptop has eight processors: it can execute a maximum of eight computations at once. Even if I ask it to calculate ten factorials in parallel, it won’t actually do so.

You might rightly wonder: why didn’t we hit this limit for the snooze task? This is because the Thread.sleep operation in snooze didn’t occupy a processor as it ran.

Setting our limits

We can take a closer look at how our factorial task is getting run by timing each task:

val timedFactorial: IO[String] = time(factorial)
val timedFactorials: IO[List[String]] =
  List.fill(20)(timedFactorial).parSequence

This gives a string description for each of the twenty tasks corresponding to how long the task took to run.

timedFactorials.unsafeRunSync()(basicRuntime)
// res6: List[String] = List(
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 5 seconds.",
//   "The task took 5 seconds.",
//   "The task took 5 seconds.",
//   "The task took 5 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 5 seconds.",
//   "The task took 5 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 4 seconds.",
//   "The task took 6 seconds.",
//   "The task took 5 seconds.",
//   "The task took 6 seconds."
// )

Strange! We see times of anywhere between two and six.

All tasks are fired off at the same time, but our processors switch between them as they run. A processor might start computing a task, but put it on hold in order to compute a different one, switching back to it at a later time. Tasks are started, halted and restarted as they all compete for processors.

The more tasks we parallelize, the more switching each processor has to do. This is problematic for a few reasons:

It’s generally much more useful to limit the number of tasks that can be run in parallel. We can do this using thread pools.

Bounded and unbounded thread pools

Our current limit, or lack thereof, is specified by our thread pool. The cats-effect IORuntime has a thread pool under the hood. The basicRuntime we’ve been using has an unbounded thread pool: it can execute an unlimited number of tasks in parallel.

In our Threading setup code, we declared another boundedRuntime function. Let’s give it a spin.

We can pick a bound of two for ten factorial tasks:

time(tenFactorials).unsafeRunSync()(boundedRuntime(2))
// res7: String = "The task took 12 seconds."

It’s much slower than before — only two tasks are run at once.

timedFactorials.unsafeRunSync()(boundedRuntime(2))
// res8: List[String] = List(
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds."
// )

Unlike the previous unbounded thread pool, each task takes two seconds ⸺ the tasks might be scheduled at once, but they’re fired off over time, once a thread is free to compute them.

What if we set the bound higher?

timedFactorials.unsafeRunSync()(boundedRuntime(20))
// res9: List[String] = List(
//   "The task took 6 seconds.",
//   "The task took 5 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 5 seconds.",
//   "The task took 6 seconds.",
//   "The task took 5 seconds.",
//   "The task took 6 seconds.",
//   "The task took 5 seconds.",
//   "The task took 5 seconds.",
//   "The task took 4 seconds.",
//   "The task took 5 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 5 seconds.",
//   "The task took 5 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds.",
//   "The task took 6 seconds."
// )

The timedFactorials task behaves as if were running on the basicRuntime: it’s as if we didn’t have a bound at all.

If you think about it, this makes sense: if we have more computations running than the number of processors, each processor will still need to switch between them. Our factorial tasks will end up being paused by the processor and taking longer.

So far, we’ve experimented with bounds of two and twenty. Having two tasks run at once gets around our thread-switching problem: each processor can focus on a single task. But having only two isn’t too useful: most of our processors aren’t doing anything Scala-related.

The best limit probably corresponds to the number of processors. Let’s check:

timedFactorials.unsafeRunSync()(boundedRuntime(numProcessors))
// res10: List[String] = List(
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds.",
//   "The task took 2 seconds."
// )

Sure enough, each task takes two seconds.

Snoozing

A thread pool bounded at numProcessors is the best option for the factorial task. But what about snooze?

We know that we can run an unlimited number of parallel snooze tasks using the basicRuntime — this had an unbounded thread pool. What about our boundedRuntime?

Let’s test it by running more tasks than processors. We can construct an IO that runs ten tasks in parallel:

val tenSnoozes: IO[Unit] = List.fill(10)(snooze).parSequence.void

Let’s try running this our bounded thread pool.

time(tenSnoozes).unsafeRunSync()(boundedRuntime(numProcessors))
// res11: String = "The task took 4 seconds."

Not two, but four seconds.

The Thread.sleep call might not hog a processor, but it does hog a thread in our pool.

By choosing a bounded thread pool for our tenSnoozes task, we cause it to take longer. If we want to get our task to complete as fast as possible, it seems better to have an unbounded pool.