A tale of Tagless Final, Cats Effect and Streaming: Fs2 Rabbit v0.1

There’s no doubts “Tagless Final” is a hot topic these days. Many FP developers (including myself) have discovered this technique of encoding algebras quite recently. I used it in Smart Backpacker’s core where the programs are also abstracting over the effect type using the Cats Effect library. It’s been a nice challenge and I’m pretty happy with the results so far. Moreover, I’ve been recently working on refactoring the internals of Fs2 Rabbit in order to be able to unit test the library in a simple way (and finally get rid of this issue). It’s been quite a journey that now I’m going to share with you, just right after publishing the first official release.

Note: If you don’t know much about Tagless Final you’ll find many blog posts talking about it by just a simple search, but I totally recommend you to watch the Luka Jacobowitz’s talk at Scale by the Bay.

Motivation

In the past I tried to encode this algebra in a Free Monad style and failed miserably. Here’s why: Fs2 Rabbit’s API works on three different types – the core of Fs2 – namely Stream[F, O], Pipe[F, I, O] and Sink[F, I]. The last two are just type aliases for a stream transformation function, more specifically Stream[F, I] => Stream[F, O]. And I guess at that time my knowledge wasn’t sufficient to get around these types while writing a nice algebra and I couldn’t find any examples either. But I think I finally get it.

As far as I remember, everything worked when the algebra needed to be interpreted to a single effect. The main algebra was defined as sealed trait AMQPBrokerEffect[A] and I wrote a stream interpreter as a natural transformation AMQPBrokerEffect ~> Stream[F, ?] but I couldn’t figured out how to solve the few cases when the algebra needed to be interpreted as a stream transformation, specifically to a Pipe[F, I, O] so I left it there.

Encoding algebras in a “finally tagless” way (another way to call it) is a bit different as one would normally do with the Free Monad, so I decided to give it a shot but later I found a similar problem and all the examples out there seem to be very simple and at least I haven’t found any examples combining this technique while working with Fs2 streams. But I didn’t give up this time.

The How To

I found out the trick was to parametrize the algebras with more than one type constructor. For instance, let’s look at the definition of the Consuming algebra:

trait Consuming[F[_], G[_]] {

  def createAckerConsumer(channel: Channel,
                          queueName: QueueName,
                          basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
                          consumerArgs: Option[ConsumerArgs] = None): F[(G[AckResult], F[AmqpEnvelope])]

  def createAutoAckConsumer(channel: Channel,
                            queueName: QueueName,
                            basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
                            consumerArgs: Option[ConsumerArgs] = None): F[F[AmqpEnvelope]]

}

Here two type constructors, F[_] and G[_], are defined and combined in the proper way to be used as the returning type of the methods. This allows you to create a very powerful program that uses all the core types of Fs2. Let’s look at how the program implementation looks like:

class ConsumingProgram[F[_]: Async](implicit C: AckerConsumer[Stream[F, ?], Sink[F, ?]], SE: StreamEval[F])
    extends Consuming[Stream[F, ?], Sink[F, ?]] {

  override def createAckerConsumer(
      channel: Channel,
      queueName: QueueName,
      basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
      consumerArgs: Option[ConsumerArgs] = None): Stream[F, (StreamAcker[F], StreamConsumer[F])] = ????

  override def createAutoAckConsumer(channel: Channel,
                                     queueName: QueueName,
                                     basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
                                     consumerArgs: Option[ConsumerArgs] = None): Stream[F, StreamConsumer[F]] = ???

}

Here the most important thing is that we are defining our algebra’s F[_] as Stream[F, ?] and G[_] as Sink[F, ?]. And by definition the return type of the methods createAckerConsumer and createAutoAckConsumer are now Stream[F, (StreamAcker[F], StreamConsumer[F])] and Stream[F, StreamConsumer[F]] respectively. Also note how we are abstracting our program over the effect type by using cats.effect.Async.

One of the things I also got thanks to Luka‘s talk was that you don’t necessarily need to have a program for every algebra. Some algebras ought to just have an interpreter. And that’s exactly the case for the AMQPClient and Connection algebras. These two represent the low level parts, working directly with the Java AMQP Client and wrap it in a nice way for the other algebras to interact with.

Getting all the pieces together

After all the pieces were aligned in harmony I could finally write a single interpreter, namely Fs2Rabbit, dependent on implementations for both AMQPClient and Connection that I could use to expose the final API and at the same time for testing purposes. So here’s how the definition looks like:

class Fs2Rabbit[F[_]](config: F[Fs2RabbitConfig],
                      connectionStream: Connection[Stream[F, ?]],
                      internalQ: Queue[IO, Either[Throwable, AmqpEnvelope]])
                     (implicit F: Effect[F], amqpClient: AMQPClient[Stream[F, ?]])

So for unit tests, I just needed to write an implementation of these two algebras and pass it as arguments to the main interpreter. Bingo!

And we are still abstracting over the effect type. As it’s shown in the examples, we can have one interpreter that uses cats.effect.IO and another that uses monix.eval.Task.

Final Thoughts

Although I’m pretty happy with this first release as well as with the final results, I’m pretty sure I’ll be learning something new that will take me back to rethink this code and find out whether I can improve things or not. But it’s certainly been a great challenge and I’m more than happy to have it solved 🙂

If you have some remarks or questions, I’d be happy to hear your feedback!

Cheers,
Gabriel.