Skip to main content

No leftovers: Working with pulls in fs2

1 hour 

Introducing pulls

Pulls are the plumbing underlying streams. All streams and stream transformations are built using pulls.

Perhaps the best way to understand them is to build a few ourselves.

Outputting values

Pulls can output values. For instance, take a stream which outputs a single "hello" value.

val helloStream = Stream("hello")

We could use the output1 function on the fs2.Pull object to build a pull that does the same.

val helloPull = Pull.output1("hello")

We can then convert it into a stream using the stream function.

helloPull.stream

Pulls are Streams

The helloPull produces the same stream as helloStream. We can run them both to check:

helloStream.compile.toList
// res8: List[String] = List("hello")
helloPull.stream.compile.toList
// res9: List[String] = List("hello")

Empty streams

The simplest pull is the one that does nothing.

Pull.done.stream

This is equivalent to Stream.empty.

Streams are pulls

We can convert from a stream to a pull using a variety of functions. These are conveniently accessed under the pull function.

For instance, echo converts a stream to its corresponding pull representation:

helloStream.pull.echo

And of course, we can convert back and forth.

helloStream.pull.echo.stream.compile.toList
// res12: List[String] = List("hello")
helloStream.pull.echo.stream.pull.echo.stream.compile.toList
// res13: List[String] = List("hello")
helloStream.compile.toList
// res14: List[String] = List("hello")

Composing pulls

We can output multiple elements by composing pulls together using the >> function, spoken as ‘then’.

Our helloPull is sorely missing a "world". Let’s use >> to append it:

val helloWorldPull = helloPull >> Pull.output1("world")

Which is the equivalent of helloStream ++ Stream("world").

helloWorldPull.stream.toList
// res15: List[String] = List("hello", "world")
(helloStream ++ Stream("world")).toList
// res16: List[String] = List("hello", "world")

This gives us a clue as to how to write stream transformations. With a bit of thought we can code this up as a pipe.

def world: Pipe[Pure, String, String] =
  in => (in.pull.echo >> Pull.output1("world")).stream

Our mental picture of a pull is a bit like this:

Food for thought

These examples might seem simple, but they raise a lot of questions.

We can tackle all of these by looking at a pull’s type parameters.

The result type

The Stream[F, O] datatype has two type parameters: an effect F and an output type O. On the other hand Pull[F, O, R] has three. What is that extra R type for?

Unlike streams, pulls have a result.

The pulls you’ve seen so far all have a result type of Unit. To demonstrate, let’s examine the type of the helloPull:

helloPull
// res17: Pull[Nothing, String, Unit] = Output(values = Chunk(hello))

It has an output type of String, as we expect, and a result of Unit.

We can build a pull with a more useful result using Pull.pure:

val helloResult = Pull.pure("hello")
// helloResult: Pull[Nothing, Nothing, String] = Succeeded(r = "hello")

This has an output type of INothing, meaning it doesn’t output any values, and a result of String.

To emphasise, the helloResult pull doesn’t output "hello" when run. In fact if we try and convert it to a stream, we’ll find we can’t.

helloResult.stream
// error:
// value stream is not a member of fs2.Pull[Nothing, fs2.INothing, String]

A pull can only be represented as a stream if it has a result of Unit — meaning its result can be discarded.

This brings us back to our original question: what is the result for?

Enter the Monad

Pulls are indeed monads. For me and many functional programming enthusiasts this is a subject for profound thought. But for our current purposes, this means they have a flatMap function. It looks a bit like this:

class Pull[F, O, R] {
  def flatMap[R1](f: R => Pull[F, O, R1]): Pull[F, O, R1] = ???
}

The function f passed to flatMap uses the result to create another pull.

We could pass in a function that creates a pull outputting the result.

val outputHello = helloResult.flatMap { (text: String) =>
  Pull.output1(text)
}
// outputHello: Pull[[x >: Nothing <: Any] => Nothing, String, Unit] = <function1>

The outputHello pull has a result type of Unit, so can be converted to a stream.

outputHello.stream.compile.toList
// res18: List[String] = List("hello")

You can think of the result as a temporary value that can be used to construct other pulls.

Modifying streams

We can convert a stream into a variety of pulls with meaningful results.

The most useful pull is one created by uncons1. This creates a pull with a result of the first element outputted by the stream and another stream of the remaining elements.

helloStream.pull.uncons1
// res19: Pull[Nothing, Nothing, Option[Tuple2[String, Stream[Nothing, String]]]] = <function1>

Let’s break down that hefty type signature:

Let’s try and use uncons1 to write some more interesting stream transformations. Suppose we want to write a take1 pipe to take the first element in a stream. We could start by using uncons1 and working with the result:

def take1: Pipe[Pure, String, String] = { in =>
  in.pull.uncons1.flatMap {
     ???
  }
}

The result is one of two things:

  1. If the stream is non-empty, the result is a Some of a tuple of the first element and the rest of the stream. The first element is usually referred to as the “head” of the stream, so we’ll name this h. If the head is present we output it using Pull.output1.

  2. If the stream is empty, the result is a None. We terminate with Pull.done.

def take1: Pipe[Pure, String, String] = { in =>
  in.pull.uncons1.flatMap {
    case Some((h, _)) => Pull.output1(h) // Ⓐ
    case None => Pull.done // Ⓑ
  }.stream
}

Let’s try it out:

helloStream.through(take1).compile.toList
// res20: List[String] = List("hello")
Stream.empty.through(take1).compile.toList
// res21: List[String] = List()
Stream("hello", "world").through(take1).compile.toList
// res22: List[String] = List("hello")

Updating our mental picture with results and uncons1 we have:

If you’re still a bit suspicious of uncons1 it might help to compare it to the act of pattern matching on a List. Pattern matching on a list has a non‑empty case ::, which gives the first element and the rest of the list; and an empty case Nil. Gavin Bisesi’s excellent gist on pulls goes into this in detail.

Recursion

If you’re mathematically inclined, you’ll be bursting to write a recursive takeN function that takes a given number of elements. Go for it!

If you aren’t too keen on the idea, fear not. fs2 has another handy pull that does this for us — pull.take.

helloStream.pull.take(1)
// res23: Pull[Nothing, String, Option[Stream[Nothing, String]]] = <function1>

Take a moment to examine that type signature.

Which is exactly what we need to write our serveThen pipe.

Serving jiaozi

The serveThen pipe should take a given number of jiaozi from the stream and store the rest.

def serveThen(n: Int,
              store: Pipe[IO, Jiaozi, Nothing]
              ): Pipe[IO, Jiaozi, Jiaozi] = { in =>
  in.pull.take(n).flatMap {
    case Some(rest) => ???
    case None => Pull.done
  }.stream
}

If there are no elements left then we have nothing to do — we can terminate with Pull.done. On the other hand, if there are still jiaozi left over, we want to pass them through the store pipe.

def serveThen(n: Int,
              store: Pipe[IO, Jiaozi, Nothing]
              ): Pipe[IO, Jiaozi, Jiaozi] = { in =>
  in.pull.take(n).flatMap {
    case Some(rest) => rest.through(store)
                       ???
    case None => Pull.done
  }.stream
}

We then compose the resulting stream with the rest of the code by converting it back to a pull.

def serveThen(n: Int,
              store: Pipe[IO, Jiaozi, Nothing]
              ): Pipe[IO, Jiaozi, Jiaozi] = { in =>
  in.pull.take(n).flatMap {
    case Some(rest) => rest.through(store).pull.echo
    case None => Pull.done
  }.stream
}

And there we have it. Our pipe is complete.

Let’s try it out. As a reminder here’s our final sim function.

def sim(numberOfRolls: Int,
        jiaoziToServe: Int): IO[(Bowl, Leftovers)] = {
  for {
    box <- emptyBox
    bowl <- roll(numberOfRolls)
        .through(cook)
        .through(serveThen(jiaoziToServe, store(box)))
        .compile
        .toList
    leftovers <- box.get
  } yield (bowl, leftovers)
}

If we give it a spin we do indeed see leftovers.

val (bowl, leftovers) = sim(2, 4).unsafeRunSync()
// bowl: List[Jiaozi] = List(0, 1, 2, 3)
// leftovers: List[Jiaozi] = List(5, 4)