Skip to main content

No leftovers: Working with pulls in fs2

1 hour 

Mis en place

Our simulation has four parts: rolling, cooking, serving and storing.

We’re going to give it a couple of parameters: the number of rolls of dough to make and the number of jiaozi to serve.

It should output a serving bowl of jiaozi and the leftovers.

def sim(numberOfRolls: Int,
        jiaoziToServe: Int): (Bowl, Leftovers) = ???

To keep calculations simple, we’ll assume that exactly three jiaozi can be made from each roll.

If we make two rolls and serve four jiaozi, we should have a couple of jiaozi left over.

The data model

Let’s define some datatypes! The dough and jiaozi don’t really have any properties, so can be modeled simply as integer ids.

type Dough = Int
type Jiaozi = Int
type Bowl = List[Jiaozi]
type Leftovers = List[Jiaozi]

The rolling stage can be coded as a stream of dough. Cooking and serving are both transformations on that dough, so can be coded as pipes.

def roll(rollsToMake: Int): Stream[Pure, Dough] = ???

val cook: Pipe[Pure, Dough, Jiaozi] = ???

def serve(jiaoziToServe: Int): Pipe[Pure, Jiaozi, Jiaozi] = ???

Storing is a bit more tricky — we’ll get to it, but first let’s warm up by filling in these question marks. They can all be implemented using existing functions on streams.

Rolling

Rolling should produce a stream of dough with incremental ids, limited to the number of rolls to make. We can implement roll using any number of functions on the Stream datatype. Let’s go with iterate and take.

def roll(rollsToMake: Int): Stream[Pure, Dough] =
  Stream.iterate(0)(_ + 1).take(rollsToMake)

We can check it works by running the stream:

roll(2).compile.toList
// res0: List[Dough] = List(0, 1)

Cooking

We next want to split each roll of dough into three jiaozi, each with a unique integer id. Since we’re using integers to model both the dough and jiaozi, this involves some basic arithmetic.

val cook: Pipe[Pure, Dough, Jiaozi] = _.flatMap { dough =>
  Stream(
    dough * 3,
    dough * 3 + 1,
    dough * 3 + 2
  )
}

We should get three jiaozi per roll of dough, each with its own unique id. Let’s check:

roll(2).through(cook).compile.toList
// res1: List[Int] = List(0, 1, 2, 3, 4, 5)

Serving

To serve, we take a specified number of jiaozi from the stream. As a first shot we can try using the take function.

def serve(jiaoziToServe: Int): Pipe[Pure, Jiaozi, Jiaozi] =
 _.take(jiaoziToServe)

Let’s give it a spin:

roll(2).through(cook).through(serve(4)).compile.toList
// res2: List[Int] = List(0, 1, 2, 3)

Storing

Finally, we want to store the leftover jiaozi.

This can be coded in various ways. We’re going to go for using a mutable store with a Cats Effect Ref.

type Box = Ref[IO, Leftovers]

A brief rundown of Ref

For our purposes, a Ref is a functional way of working with mutable state. You can think of it as a box containing a value. The value can be modified with an effect, which in our case is the Cats Effect IO.

We can create a box with no leftovers using Ref.of

val emptyBox: IO[Box] = Ref.of(Nil)
val box: Box = emptyBox.unsafeRunSync()

We can update it using the aptly named update function. For instance, to add the fourth jiaozi to the box:

box.update(leftovers => 3 :: leftovers).unsafeRunSync()

Finally, we can extract the leftovers using get

box.get.unsafeRunSync()
// res4: List[Jiaozi] = List(3)

A store pipe should add the remaining leftovers to a box. We can check the box once we’ve finished running the stream.

def store(box: Box): Pipe[IO, Jiaozi, Nothing] = ???

Let’s take a closer look at that signature:

We can implement this using evalMap function, removing all outputs with drain.

def store(box: Box): Pipe[IO, Jiaozi, Nothing] =
  _.evalMap(jiaozi => box.update(jiaozi :: _))
    .drain

Let’s try it out:

{
  for {
    box <- emptyBox
    _ <- Stream(1, 2, 3).through(store(box)).compile.drain
    leftovers <- box.get
  } yield leftovers
}.unsafeRunSync()
// res5: List[Jiaozi] = List(3, 2, 1)

Which is just what we want.

Putting it together

Our entire sim function will also be effectful. We’ll change the signature to reflect that.

def sim(numberOfRolls: Int,
        jiaoziToServe: Int): IO[(Bowl, Leftovers)] = ???

Finally, let’s try and write it:

def sim(numberOfRolls: Int,
        jiaoziToServe: Int): IO[(Bowl, Leftovers)] = {
  for {
    box <- emptyBox
    bowl = roll(numberOfRolls)
        .through(cook)
        .through(serve(jiaoziToServe))
        .compile
        .toList
    leftovers <- box.get
  } yield (bowl, leftovers)
}

Does that work?

sim(2, 4).unsafeRunSync()
// res6: Tuple2[Bowl, Leftovers] = (List(0, 1, 2, 3), List())

Unfortunately not — there are no leftovers.

The keen eyed will notice that we’re not in fact storing any jiaozi. We haven’t plumbed our store pipe into the rest of the code.

There’s no easy way of doing this. If you’re skeptical, have a go yourself. Can you make use of store without drastically changing the pipe signatures? If not, why?

The crux of the problem

The problem lies within our serve pipe. We’re using take to serve the jiaozi.

take outputs a specified number of elements from a stream, but discards the rest. For our purposes, we do want to output elements, but we want to send the remaining jiaozi through a different store pipe. We need a function that looks like this:

def serveThen(n: Int,
              store: Pipe[IO, Jiaozi, Nothing]
              ): Pipe[IO, Jiaozi, Jiaozi] = ???

Let’s think a bit about how serveThen should behave.

serveThen should take a number of jiaozi from the stream, just as take, but should send the remaining jiaozi down the store pipe.

We can string it into the rest of our sim function as follows:

def sim(numberOfRolls: Int,
        jiaoziToServe: Int): IO[(Bowl, Leftovers)] = {
  for {
    box <- emptyBox
    bowl <- roll(numberOfRolls)
        .through(cook)
        .through(serveThen(jiaoziToServe, store(box)))
        .compile
        .toList
    leftovers <- box.get
  } yield (bowl, leftovers)
}

To write it, we’ll need to learn about fs2’s primitives. Read on to cook up some pulls.