From Scalaz Streams to FS2

Hi everybody!

Last April I wrote this post about Scalaz Streams where I explained how we consume orders from Kafka, persist them to Cassandra, call a web service to update the prices and finally publish the result to RabbitMQ. And this was achieved by running two streams in parallel. Here’s the graphic to recap:

pricer_streams_parallel

Today I will introduce the same problem but solved with FS2: Functional Streams for Scala instead, the evolution of Scalaz Streams. If you are one of those impatient creatures here is the code 😉

MAIN DIFFERENCES

Our main flow was defined as follows:

def flow(consumer: ProcessT[Order],
           logger: Sink[Task, Order],
           storageEx: Exchange[Order, Order],
           pricer: Channel[Task, Order, Order],
           publisher: Sink[Task, Order])
          (implicit S: Strategy)= {
    merge.mergeN(
      Process(
        consumer          observe   logger    to  storageEx.write,
        storageEx.read    through   pricer    to  publisher
      )
    )
  }

And from this Scalaz Streams code we can name the changes in FS2:

  1. Process[F[_], +A] is now Stream[F[_], +A]
  2. Channel[F[_], +A, +B] is now Pipe[F[_], A, B] which is an alias type for Stream[F, A] => Stream[F, B]
  3. Sink[F[_], +A] is the same as Channel being the type of B Unit. Pipe[F[_], A, Unit]
  4. Exchange is not there anymore so we have to supply it using two streams.
  5. merge.mergeN is now fs2.concurrent.join

So this is how it looks the pricer flow with FS2:

def flow(consumer: StreamT[Order],
           logger: SinkT[Order],
           storage: OrderStorage,
           pricer: PipeT[Order, Order],
           publisher: SinkT[Order])(implicit S: fs2.Strategy) = {
    fs2.concurrent.join(2)(
      Stream(
        consumer      observe logger to storage.write,
        storage.read  through pricer to publisher
      )
    )

Note: All the functions are now defined with a generic effect type whereas before a lot of the functions were pretty tight to Scalaz Task. FS2 requires an implicit Async[F] where F is the effect type in order to run the stream. By default, an Async implementation for Task is provided implicitly.

TIMER, SCHEDULER: time package

One of the things that I improved in this new version is the Order Generator. Before it was a naive implementation using Task and blocking the thread for 2 seconds. Now it’s been implemented using the functions provided by the fs2.time package. Here’s the code:

  implicit val scheduler = fs2.Scheduler.fromFixedDaemonPool(2, "generator-scheduler")
  implicit val S         = fs2.Strategy.fromFixedDaemonPool(2, "generator-timer")

  private val defaultOrderGen: PipeT[Int, Order] = { orderIds =>
    val tickInterrupter = time.sleep[Task](11.seconds) ++ Stream(true)
    val orderTick       = time.awakeEvery[Task](2.seconds).interruptWhen(tickInterrupter)
    (orderIds zip orderTick) flatMap { case (id, t) =>
      val itemId    = Random.nextInt(500).toLong
      val itemPrice = Random.nextInt(10000).toDouble
      val newOrder  = Order(id.toLong, List(Item(itemId, s"laptop-$id", itemPrice)))
      Stream.emit(newOrder)
    }
  }

The function time.awakeEvery[Task] generates a Stream[Task, FiniteDuration]. And any Stream has the function interruptWhen(haltWhenTrue: Stream[F[_], Boolean] available to halt when the given stream of Boolean returns true. Based on this I used the function time.sleep[Task] wich generates a Stream[Task, Nothing] and append a Stream(true) to it. So after the given duration this combination will return what we need for the interrupter; a Stream[Task, Boolean].

QUEUES, SIGNALS AND TOPICS: async package

Another breaking change is the way queues, signals and topics, among others, are being generated. In Scalaz Stream they were tight to Task as the effect type whereas in FS2 you have to provide it. And the other difference you will notice is that on creation before you would get a Queue[A] whereas in FS2 you get a F[Queue[F, A]] as follows:

// Scalaz Streams
val oldQ: Queue[Int] = async.boundedQueue[Int](100) 
// FS2 
val newQ: Task[Queue[Task, Int]] = async.boundedQueue[Task, Int](100)

UTILS

The util functions channel.lift and sink.lift that I used in the first demo are no longer available so I created a FS2Utils file to provide similar functionality.

And finally one interesting function I’m using in the final combination of the streams is mergeHaltBoth wich works as merge but it will halt when one of the streams halt. In our case the Order Generator flow will halt after 11 seconds.

ADDITIONAL RESOURCES

I was inspired to write this post after watch the excellent series of videos published by Michael Pilquist and because at my work we are strongly using Scalaz Streams (maybe we migrate to FS2 sometime in the near future, I don’t know yet). Also the official guide and migration guide are really useful.

And again, the demo project for this post is available on GitHub. That’s all I have today hope you enjoyed it.

Until next post!
Gabriel.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s