A tale of Tagless Final, Cats Effect and Streaming: Fs2 Rabbit v0.1

There’s no doubts “Tagless Final” is a hot topic these days. Many FP developers (including myself) have discovered this technique of encoding algebras quite recently. I used it in Smart Backpacker’s core where the programs are also abstracting over the effect type using the Cats Effect library. It’s been a nice challenge and I’m pretty happy with the results so far. Moreover, I’ve been recently working on refactoring the internals of Fs2 Rabbit in order to be able to unit test the library in a simple way (and finally get rid of this issue). It’s been quite a journey that now I’m going to share with you, just right after publishing the first official release.

Note: If you don’t know much about Tagless Final you’ll find many blog posts talking about it by just a simple search, but I totally recommend you to watch the Luka Jacobowitz’s talk at Scale by the Bay.

Motivation

In the past I tried to encode this algebra in a Free Monad style and failed miserably. Here’s why: Fs2 Rabbit’s API works on three different types – the core of Fs2 – namely Stream[F, O], Pipe[F, I, O] and Sink[F, I]. The last two are just type aliases for a stream transformation function, more specifically Stream[F, I] => Stream[F, O]. And I guess at that time my knowledge wasn’t sufficient to get around these types while writing a nice algebra and I couldn’t find any examples either. But I think I finally get it.

As far as I remember, everything worked when the algebra needed to be interpreted to a single effect. The main algebra was defined as sealed trait AMQPBrokerEffect[A] and I wrote a stream interpreter as a natural transformation AMQPBrokerEffect ~> Stream[F, ?] but I couldn’t figured out how to solve the few cases when the algebra needed to be interpreted as a stream transformation, specifically to a Pipe[F, I, O] so I left it there.

Encoding algebras in a “finally tagless” way (another way to call it) is a bit different as one would normally do with the Free Monad, so I decided to give it a shot but later I found a similar problem and all the examples out there seem to be very simple and at least I haven’t found any examples combining this technique while working with Fs2 streams. But I didn’t give up this time.

The How To

I found out the trick was to parametrize the algebras with more than one type constructor. For instance, let’s look at the definition of the Consuming algebra:

trait Consuming[F[_], G[_]] {

  def createAckerConsumer(channel: Channel,
                          queueName: QueueName,
                          basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
                          consumerArgs: Option[ConsumerArgs] = None): F[(G[AckResult], F[AmqpEnvelope])]

  def createAutoAckConsumer(channel: Channel,
                            queueName: QueueName,
                            basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
                            consumerArgs: Option[ConsumerArgs] = None): F[F[AmqpEnvelope]]

}

Here two type constructors, F[_] and G[_], are defined and combined in the proper way to be used as the returning type of the methods. This allows you to create a very powerful program that uses all the core types of Fs2. Let’s look at how the program implementation looks like:

class ConsumingProgram[F[_]: Async](implicit C: AckerConsumer[Stream[F, ?], Sink[F, ?]], SE: StreamEval[F])
    extends Consuming[Stream[F, ?], Sink[F, ?]] {

  override def createAckerConsumer(
      channel: Channel,
      queueName: QueueName,
      basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
      consumerArgs: Option[ConsumerArgs] = None): Stream[F, (StreamAcker[F], StreamConsumer[F])] = ????

  override def createAutoAckConsumer(channel: Channel,
                                     queueName: QueueName,
                                     basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
                                     consumerArgs: Option[ConsumerArgs] = None): Stream[F, StreamConsumer[F]] = ???

}

Here the most important thing is that we are defining our algebra’s F[_] as Stream[F, ?] and G[_] as Sink[F, ?]. And by definition the return type of the methods createAckerConsumer and createAutoAckConsumer are now Stream[F, (StreamAcker[F], StreamConsumer[F])] and Stream[F, StreamConsumer[F]] respectively. Also note how we are abstracting our program over the effect type by using cats.effect.Async.

One of the things I also got thanks to Luka‘s talk was that you don’t necessarily need to have a program for every algebra. Some algebras ought to just have an interpreter. And that’s exactly the case for the AMQPClient and Connection algebras. These two represent the low level parts, working directly with the Java AMQP Client and wrap it in a nice way for the other algebras to interact with.

Getting all the pieces together

After all the pieces were aligned in harmony I could finally write a single interpreter, namely Fs2Rabbit, dependent on implementations for both AMQPClient and Connection that I could use to expose the final API and at the same time for testing purposes. So here’s how the definition looks like:

class Fs2Rabbit[F[_]](config: F[Fs2RabbitConfig],
                      connectionStream: Connection[Stream[F, ?]],
                      internalQ: Queue[IO, Either[Throwable, AmqpEnvelope]])
                     (implicit F: Effect[F], amqpClient: AMQPClient[Stream[F, ?]])

So for unit tests, I just needed to write an implementation of these two algebras and pass it as arguments to the main interpreter. Bingo!

And we are still abstracting over the effect type. As it’s shown in the examples, we can have one interpreter that uses cats.effect.IO and another that uses monix.eval.Task.

Final Thoughts

Although I’m pretty happy with this first release as well as with the final results, I’m pretty sure I’ll be learning something new that will take me back to rethink this code and find out whether I can improve things or not. But it’s certainly been a great challenge and I’m more than happy to have it solved 🙂

If you have some remarks or questions, I’d be happy to hear your feedback!

Cheers,
Gabriel.

Open Sourcing Smart Backpacker App backend

I’m happy to announce I’m releasing as open source the backend of the application I’ve been working on in the past 5 months 🙂

Introduction

Smart Backpacker is an App, available for Android and iOS, where you can easily find Visa Requirements, Currency Exchange, Health Information and Airline’s Baggage Policy, no matter what your nationality is! And it’s worth mentioning it’s 100% Free with No Advertisements ;).

Tech Stack

As you might know from previous posts, my default choice is the Typelevel stack, but for this project I’m also using some other libraries, mainly the ones part of the Cats ecosystem.

The project consists of four sbt modules abstracting over the effect type F[_] using the Cats Effect library:

  • Airlines: Uses mainly fs2.io to read airline’s information from files and persist it to the DB using Doobie in a streaming fashion way.
  • Api: It’s the core of the backend and exposes the main HTTP API using Http4s, Tsec, Circe and Doobie.
  • Common: Contains utils for IO, Stream, logging, testing, etc.
  • Scraper: It has all the scrapers and HTML parsers, in addition to the DB Repositories. It uses scala-scraper and Doobie.

Find out more at the GitHub project’s page.

Why Open Source?

I believe in the power of the open source community and I have benefited from it since I started writing code (+15 years ago). So this is my way to give something back. Also, by making it open source, I’m exposed to the criticism of the masses that will inevitably force me to become a better developer.

Running Environments

At the moment there are two environments: Development and Production. The first one runs on a free instance of Heroku and the productive one on a CentOS 7.x in the cloud, using the SBT Native Packager to generate the executable files and HA Proxy as a load balancer and TLS/SSL manager.

Find below some metrics of the Production server:

  • Average HTTP Request – Response Throughput of 320 ms over TLS/SSL.
  • Scraping + Persisting the visa requirements of all the countries runs in an average time of 45 seconds.
  • Scraping + Persisting the health information of all the countries runs in an average time of 1 minute.

Other Projects

In addition to the backend, both the website and the documentation are generated using Open Source tools such as Hugo and Slate. These two projects have been open source since day one. Feel free to look into the GitHub organization’s page. Oh… and I hope to open source (soon) the Android App code as well so stay tuned 🙂

Special Mentions

Last but not least, I want to thank all the folks in the community that have been extremely helpful in providing feedback as well as direct help:

Spread the word!

Cheers,
Gabriel.

Typelevel Ecosystem

With the imminent upcoming release of Cats v1.0 is incredibly amazing how the Scala’s pure FP ecosystem around it has matured. It’s worth mentioning and remarking the hard work of the entire community behind these Typelevel projects such as Cats Effect, Fs2, Http4s and Circe. And I must confess, it’s been and currently is my favorite tech stack in Scala 🙂

Cats Effect

It is described as the standard IO for the Cats ecosystem in order to manage both synchronous and asynchronous (callback-driven) effects. The current version is v0.5 dependent on Cats v1.0.0-RC1.

It currently is the main effect type F[_] : Effect used by the latest versions of Fs2, Http4s and Circe.

Fs2

Or Functional Streams for Scala (formerly Scalaz-Stream), is a pure functional streaming I/O library, currently focusing the effort on the upcoming release of the version 0.10 (current version is v0.10.0-M8 dependent on Cats Effect v0.5). It has suffered many transformations but it has now reached a mature state which I consider production ready.

The main effect type used in previous versions was fs2.Task, and previously scalaz.concurrent.Task when it was Scalaz Stream, but it has now been removed in favor of the new standard cats.effect.Effect.

Circe

Described as “A Json library for Scala powered by Cats”, has also upgraded to the latest version of Cats v1.0.0-RC1 offering the milestone v0.9.0-M2, and on the way to release its stable version. It is the default library chosen by Fs2 and Http4s for Json manipulation.

Doobie

“A pure functional JDBC layer for Scala and Cats” that provides a principled way to construct programs (and higher-level libraries) that use JDBC. The latest version is v0.5.0-M9 dependent on the latest Fs2 v0.10.0-M8 and Cats 1.0.0-RC1.

Http4s

“A minimal and idiomatic Scala interface for HTTP”, as described in the main web site. In an incredible team effort, they are currently maintaining four different versions (0.15.x, 0.16.x, 0.17.x and 0.18.x), hats off to the team! The latest version is v0.18.0-M5 dependent on the latest Fs2 v0.10.0-M8 and Cats Effect v0.5.

Use Case Scenarios

In the last few months I’ve been working on Smart Backpacker, an application for travelers where you can find information such as Visa Requirements, Currency Exchange and Airline’s Baggage Policy. If you are an Android user you can try it out right now (download it here). However, iOS users must wait until the end of the year where I hope we can have it ready (currently being developed by my friend @sheinix).

The backend is written in Scala (as you might already guessed) using a few of the libraries described above such as Http4s, Cats Effect and Circe. The Android App has been written 100% by me (I must say I hate dealing with XML and Android layouts) but it wasn’t so bad to do it while learning Kotlin at the same time. I plan to release both projects as open source next year so stay tuned!

Also, recently I’ve been challenged to solve an exercise involving the creation of a User Http API on top of an existent backend service. The requirements, in my understanding, require authentication so I went for Http4s Auth (first time using it) and as I expected works like a charm! Special thanks to @rossabaker who was extremely helpful and friendly in the Http4s Gitter channel when I asked for help regarding the Auth feature.

And last but not least, my personal project fs2-rabbit has also been updated to the latest Fs2 & Cats Effect versions. Oh! And it’s now making use of the latest sbt v1.0.3 🙂

Final Thoughts

I’m just super excited about all of what’s going on in the Scala ecosystem right now and currently looking for a new challenge in my professional career. The future looks brighter than ever… Keep on rocking Scalars!

Cheers,
Gabriel.

Dependent Types & Type Level Programming

Before we start you need to know that this post is intended for a public already familiar with type classes and implicit resolution in Scala. If you’re not, please have a read at this blog post.

As the title suggests, I’m going to go through these concepts and develop them in pure Scala code with no libraries involved. However, code in other languages will be shown for demonstration purposes.

At the end of the post you will find the links to all the code that is going to be shown.

Summary

  • Dependent Types
    • Path Dependent Types
      • Aux Type
    • Dependent Function Type (Π-Type)
    • Dependent Pair Type (Σ-Type)
  • Phantom Types
  • Type Level Programming (vs. Value Level)

Dependent Types

Quoted from its definition in Wikipedia:

A dependent type is a type whose definition depends on a value.

Let’s see a code example in a pure functional programming language with dependent types like Idris.

isSingleton : Bool -> Type
isSingleton True = Nat
isSingleton False = List Nat

This simple function has one input of type Bool and one output type dependent on the input value. So, in case of True the output type is Nat (Natural Number) and in case of False is a List Nat (List of natural numbers). This is possible because types are first class in the Idris language.

Here’s a simple use case of this type function:

mkSingle : (x : Bool) -> isSingleton x
mkSingle True = 0
mkSingle False = []

Here we are making use of the type definition isSingleton to return either a natural number zero or an empty list of natural numbers.

All these examples are taken from the Idris official documentation.

You can even determine what the type of the second parameter will be based on the input type. See the following code:

sum : (single : Bool) -> isSingleton single -> Nat
sum True x = x
sum False [] = 0
sum False (x :: xs) = x + sum False xs

And something very cool are Vectors, which are lists that include the length in its type.

(++) : Vect n a -> Vect m a -> Vect (n + m) a
(++) Nil ys = ys
(++) (x :: xs) ys = x :: xs ++ ys

For instance, in this function we have two inputs: one Vector of value type a and length n and another Vector of value type a and length m. And we are saying that the output type will be another Vector of the same value type a and length (n + m). This is mind blowing!

If we try to implement it like this it won’t type check since the size of the expected Vector could be different from what it’s expected:

(++) : Vect n a -> Vect m a -> Vect (n + m) a
(++) Nil ys = ys
(++) (x :: xs) ys = x :: xs ++ xs -- BROKEN

Path Dependent Types

Although Scala it is not a fully dependent type language, it is still possible to emulate dependent types either with path dependent types or with type level programming. We’ll soon see how.

We can define nested classes definitions and traits for example. See the following code:

class A {
 class B {
   class C {
     class D
   }
 }
}

And we can define some instances of B for example:

val a1: A = new A
val a2: A = new A
val b1: a1.B = new a1.B
val b2: a2.B = new a2.B

But what happens here?

val b3: a2.B = new a1.B
val b4: a1.B = new a2.B

This won’t compile since the type B is dependent on the instance of A, so the type B has to belong to the same instance of A. This is very useful but sometimes you just want to have less constraints and accept any type B that belongs to any instance of A. In this case, what you need is type projections (using #):

val b5: A#B = new a1.B
val b6: A#B = new a2.B

And we can do exactly the same with the classes C and D:

val c: A#B#C = new b1.C
val d: A#B#C#D = new c.D

Okay, now let’s see a more useful case where path dependent types shine. Let’s define a type class Foo[A] with a type member B:

trait Foo[A] {
  type B
  def value: B
}

And we also define two implicit instances of Foo for the types Int and String and define the type member B in both cases:

object Foo {
  implicit def fooInt = new Foo[Int] {
    override type B = String
    override def value = "Hey!"
  }

  implicit def fooString = new Foo[String] {
    override type B = Boolean
    override def value = true
  }
}

Now we can have a function like this where both return type and result depend on the implicit instance of Foo:

def foo[T](t: T)(implicit f: Foo[T]): f.B = f.value

val a: String = foo(1) // "Hey!"
val b: Boolean = foo("qwe") // true

Here we see, how depending on the value passed to foo we get a different output type based on the implicit instance. f.B is also called an existential type since we don’t know what the concrete type will be but we do know it exists. This is a very cool trick but it has some limitations. Let’s make it a bit more complicated by adding a dependency on the Identity Monad with a type dependent on the Foo instance:

def fooAux[T](t: T)(implicit f: Foo[T], id: Identity[f.B]): f.B = f.value

The compiler will throw an error like this:

Error:(32, 29) illegal dependent method type: parameter may only be referenced in a subsequent parameter section
def fooAux[T](t: T)(implicit f: Foo[T], id: Identity[f.B]): f.B = f.value

This is not possible because the dependent type is in the same list of parameters! So here is where the Aux type comes in handy.

Aux Type

type Aux[A0, B0] = Foo[A0] { type B = B0 }

We will define this Aux type inside the Foo companion object. The type A0 from Aux is mapped to A from Foo and B0 from Aux is mapped to A#B from Foo. The mind blowing thing here is that B0 will also get the type assigned to B, even though one would think that B0 is being assigned to B and not otherwise (it works both ways), that’s how we can keep track of the type member B and help the type inference system.

After that we can redefine our method fooAux in the following way:

def fooAux[T, R](t: T)(implicit f: Foo.Aux[T, R], id: Identity[R]): R = f.value


val a: String = fooAux(2) // "Hey!"
val b: Boolean = fooAux("asd") // true

This technique is widely used in the amazing type level programming library Shapeless.

Dependent Function Type (Π-Type)

Also called Pi Type or Dependent Product Type, it defines a function that resolves the output type based on the input value. Let’s see how it can be done in the Idris language:

depType : Int -> Type
depType 0 = Int
depType 1 = String
depType _ = Bool

We have dependent function type and we also define our data type DepProduct:

data DepProduct : (a : Type) -> (P : a -> Type) -> Type where
    MakeDepProduct : {P : a -> Type} -> ((x : a) -> P x) -> DepProduct a P

Now we can make use of it, for example:

x : DepProduct Int (\n => depType n)
x = MakeDepProduct (\n => case n of
                               0 => 10
                               1 => "aaa"
                               2 => True)

What we are saying here is that when the input value is 0 the output value will be 10 (type Int). In case of 1 it will be “aaa” (type String). And in case of 2 it will be True (type Bool).

Let’s try to translate this into Scala.

Firstly, we need to have a definition of a Natural Number:

sealed trait Nat {
  type This >: this.type <: Nat
  type ++ = Succ[This]
}

object Zero extends Nat {
  type This = Zero
}
type Zero = Zero.type

class Succ[N <: Nat] extends Nat {
  type This = Succ[N]
}

Secondly, we define some type aliases and values:

type One = Zero# ++
type Two = One# ++
type Three = Two# ++

val _0: Zero = Zero
val _1: One = new Succ[Zero]
val _2: Two = new Succ[One]
val _3: Three = new Succ[Two]

Now we define our depType function for Zero, One and any other natural number:

sealed trait DepType[N <: Nat] {
  type T
  def apply(x: N): T
}

implicit object depType0 extends DepType[Zero] {
  type T = Int
  override def apply(x: Zero) = 10
}

implicit object depType1 extends DepType[One] {
  type T = String
  override def apply(x: One) = "abc"
}

implicit def depType[N <: Nat] = new DepType[Succ[Succ[N]]] {
  type T = Boolean
  override def apply(x: Succ[Succ[N]]) = true
}

And finally we have an apply method to create an instance of a dependent function type:

object DepFunction {
  def apply[N <: Nat](x: N)(implicit depType: DepType[N]): depType.T = depType(x)
}

Now let’s try it out!

val x: Int = DepFunction(_0)
val y: String = DepFunction(_1)
val z: Boolean = DepFunction(_2)

//val t: Boolean = DepFunction(_1) // This does not compile!

And again we see how the output type depends on the input value of the dependent function type.

Dependent Pair Type (Σ-Type)

Also called Sigma Type or Dependent Sum Type, it defines a pair where the second type is dependent on the first value. Let’s see how this is represented in the Idris language:

import Data.Vect

vec : DPair Nat (\n => Vect n Int)
vec = MkDPair 2 [3, 4]

Here we are defining a Pair of type Nat and Vect n Int. Note that ‘n’ is the first input value. This same definition can be represented using the syntax sugar (a : A ** P):

vec2 : (n : Nat ** Vect n Int)
vec2 = (2 ** [3, 4])

Cool thing is that both the input value and type can be inferred based on the second value!

-- Type inference of first argument
vec3 : (n : Nat ** Vect n Int)
vec3 = (_ ** [3, 4])

-- We can even omit the type of the argument n
vec4 : (n ** Vect n Int)
vec4 = (_ ** [3, 4])

This is how we can define it ourselves (In Idris they are called DPair an MkDPair respectively):

data DepPair : (a : Type) -> (P : a -> Type) -> Type where
    MakeDepPair : {P : a -> Type} -> (x : a) -> P x -> DepPair a P

You don’t really need to understand this definition. You can think of it as a Tuple in Scala where the second argument’s type depends on the value of the first argument. Something like (Int, Type) with implementations like (1, Boolean), (2, Int), (3, String) and so on.

Here we define again a function that will be used to determine the type of the second argument of the pair based on the first value:

depType : Int -> Type
depType 0 = Int
depType 1 = String
depType _ = Bool
x : DepPair Int (\n => depType n)
x = MakeDepPair 0 10

y : DepPair Int (\n => depType n)
y = MakeDepPair 1 "abc"

z : DepPair Int (\n => depType n)
z = MakeDepPair 2 True

Here the second type of the pair will be dependent on the definition of depType. Let’s translate this last example into Scala.

Assuming that we have the definition of Nat in scope plus the types and values we defined previously, we can now define our depType function for Zero, One and any other natural number:

sealed trait DepType[N <: Nat] { type T }
implicit object depType0 extends DepType[Zero] { type T = Int }
implicit object depType1 extends DepType[One] { type T = String }
implicit def depType[N <: Nat] = new DepType[Succ[Succ[N]]] { type T = Boolean }

And finally our DepPair type:

case class DepPair[N <: Nat, V](x: N, value: V)(implicit depType: DepType[N] { type T = V })

Let’s try it out!

DepPair(_0, 10)
DepPair(_1, "a")
DepPair(_2, true)
//DepPair(_3, "b") //This does not compile!

Again we are seeing how powerful dependent types are.

Phantom Types

Phantom Types are such a simple concept but at the same time very powerful. Let’s try to understand them by example.

Here we have a definition of Effects and Roles:

trait Effect
object Effect {
  trait Read extends Effect
  trait Write extends Effect
  trait Delete extends Effect
  trait Update extends Effect
}

trait Role
object Role {
  trait Anonymous extends Role
  trait User extends Role
  trait Admin extends User
}

Nothing fancy, just type definitions. Now, let’s say we have a File which can only be accessed having the appropriate Role:

trait File[R <: Role]
object File {
  def apply[R <: Role] = new File[R] {}
}

And now we want to perform Actions and constraint them to a particular Role:

trait Action[F <: Effect, R <: Role]
object Action {
  def apply[F <: Effect, R <: Role] = new Action[F, R] {}

  def read[R <: User](db: File[R]) = new Action[Read, R] {}
  def write[R <: User](db: File[R]) = new Action[Write, R] {}
  def delete[R <: Admin](db: File[R]) = new Action[Delete, R] {}
  def update[R <: Admin](db: File[R]) = new Action[Update, R] {}
}

And this is how we use it:

// Action.read(File[Anonymous]) // Does not compile!

Action.read(File[User])
Action.write(File[User])

// Action.delete(File[User]) // Does not compile either!

Action.read(File[Admin])
Action.write(File[Admin])
Action.delete(File[Admin])
Action.update(File[Admin])

In the first case that does not compile, we will get an error like this from the Scala compiler:

Error:(40, 10) inferred type arguments [com.github.gvolpe.types.phantom.PhantomTypes.Role.Anonymous] do not conform to method read's type parameter bounds [R <: com.github.gvolpe.types.phantom.PhantomTypes.Role.User]
Action.read(Database[Anonymous])
Error:(40, 23) type mismatch;
found : com.github.gvolpe.types.phantom.PhantomTypes.Database[com.github.gvolpe.types.phantom.PhantomTypes.Role.Anonymous]
required: com.github.gvolpe.types.phantom.PhantomTypes.Database[R]

Saying that the Anonymous Role is not sufficient to invoke the read method that requires at least a User Role.

Up until now we have seen how we defined these types and used them just to add constraints to our methods but we have never created any instance of these traits, that’s why they are called Phantom Types. For further lecture, take a look at the great blog post of Daniel Westheide.

These types are also used in languages like Haskell. For example, they are useful when defining GADTs (Generic Algebraic Data Types):

data Expr a = I Int
            | B Bool
            | Add (Expr a) (Expr a)
            | Mul (Expr a) (Expr a)
            | Eq (Expr a) (Expr a)

Here a is a type variable where its sole purpose is to constraint operations and it actually is a phantom type since it never gets instantiated, it just marks the intended type of the expression.

Type Level Programming (vs. Value Level)

Let’s start by defining our own Boolean type with some logical operations:

sealed trait Bool {
  def not: Bool
  def &&(b: Bool): Bool
  def ||(b : Bool): Bool
  def ifElse[C](t: => C, f: => C): C
}

case object True extends Bool {
  override def not = False
  override def &&(b: Bool) = b
  override def ||(b: Bool) = True
  override def ifElse[C](t: => C, f: => C) = t
}

case object False extends Bool {
  override def not = True
  override def &&(b: Bool) = False
  override def ||(b: Bool) = b
  override def ifElse[C](t: => C, f: => C) = f
}

And then verify that behaves as we expect:

False.not == True
False.&&(True) == False
False.||(True) == True
False.ifElse(1, 2) == 2

In a similar way, we can define the same Bool type at the type level:

sealed trait Bool {
  type Not <: Bool
  type && [B <: Bool] <: Bool
  type || [B <: Bool] <: Bool
  type IfElse [C, T <: C, F <: C] <: C
}

type True = True.type
type False = False.type

object True extends Bool {
  type Not = False
  type && [B <: Bool] = B
  type || [B <: Bool] = True
  type IfElse [C, T <: C, F <: C] = T
}

object False extends Bool {
  type Not = True
  type && [B <: Bool] = False
  type || [B <: Bool] = B
  type IfElse [C, T <: C, F <: C] = F
}

And we can verify the behavior by using the =:= operator in combination with implicitly.

implicitly[False# Not =:= True]
implicitly[False# && [True] =:= False]
implicitly[False# || [True] =:= True]
implicitly[False# IfElse[Any, Int, String] =:= String]
//  implicitly[True# IfElse[Any, Int, String] =:= String] // This does not compile!

Natural Numbers, Factorial and Fibonacci

Now that we have seen a simple type like Bool defined at the type level, let’s see how to implement our own representation of Natural Numbers at the type level:

sealed trait Nat {
  type This >: this.type <: Nat
  type ++ = Succ[This]
  type + [_ <: Nat] <: Nat
  type * [_ <: Nat] <: Nat
}

object Zero extends Nat {
  type This = Zero
  type + [X <: Nat] = X
  type * [X <: Nat] = Zero
}

class Succ[N <: Nat] extends Nat {
  type This = Succ[N]
  type + [X <: Nat] = Succ[N# + [X]]
  type * [X <: Nat] = (N# * [X])# + [X]
}

We’ve seen this previously, but here we added two new types to perform sum and multiplication. If we also define some types, we can verify the correctness of our definition:

type Zero   = Zero.type
type One    = Zero# ++
type Two    = One# ++
type Three  = Two# ++
type Four   = Three# ++
type Five   = Four# ++
type Six    = Five# ++

implicitly[Two# + [Three] =:= Five]
implicitly[One# + [Two] =:= Three]
implicitly[Two# * [Two] =:= Four]
//  implicitly[Two# + [Three] =:= Four] // Does not compile

Factorial

This is how we can define it in terms of Nat and verify its correctness:

sealed trait Factorial[N <: Nat] { type Res <: Nat }

implicit object factorial0 extends Factorial[Zero] { type Res = One }

implicit def factorial[N <: Nat, X <: Nat](implicit fact: Factorial[N] { type Res = X}) =
  new Factorial[Succ[N]] { type Res = X# * [Succ[N]] }

implicitly[Factorial[Zero] { type Res = One }]
implicitly[Factorial[One] { type Res = One }]
implicitly[Factorial[Two] { type Res = Two }]
implicitly[Factorial[Three] { type Res = Six }]
//  implicitly[Factorial[Three] { type Res = Five }] // Does not compile!

Fibonacci

Here we can do pretty much the same we did with Factorial:

sealed trait Fibonacci[N <: Nat] { type Res <: Nat }

implicit object fibonacci0 extends Fibonacci[Zero] { type Res = Zero }

implicit object fibonacci1 extends Fibonacci[One] { type Res = One }

implicit def fibonacci[N <: Nat, X <: Nat, Y <: Nat](implicit fib1: Fibonacci[N] { type Res = X}, fib2: Fibonacci[Succ[N]] { type Res = Y }) =
  new Fibonacci[Succ[Succ[N]]] { type Res = X# + [Y] }

And verify its correctness:

implicitly[Fibonacci[Two] { type Res = One }]
implicitly[Fibonacci[Three] { type Res = Two }]
implicitly[Fibonacci[Four] { type Res = Three }]
implicitly[Fibonacci[Five] { type Res = Five }]
//  implicitly[Fibonacci[Five] { type Res = Two }] // Does not compile!

Final thoughts

If you’re still processing the first paragraph, don’t worry! We went through a lot of stuff hard to digest in a single reading, I know the feeling… Anyway, I hope this post has motivated you to go for more and explore these concepts in a deeper level. Thanks for reading!

The code shown in this post is available on GitHub:

DISCLAIMER: Most of the examples were taken from the online course ThCS Introduction to programming with dependent types in Scala.

Cheers,
Gabriel.

Midyear updates

Hi there!

It’s been a while since the last time I wrote here so today’s post will be kind of a summary of interesting stuff:

Talks I gave:

Functional Data Validation at the Eindhoven Haskell Meetup

I went through different ways of doing validation until I reached the functional way using Applicative Functors + Semigroup. In addition, I created the same demo in Scala using the Cats library.

Fs2 + Effects at the Brabant Scala Meetup (Netherlands)

Due to unforeseen circumstances this event was cancelled (due to be rescheduled for 13th of July)  but anyway I’m sharing with you the content of the talk that I already gave to some colleagues.

Open-source Contributions

I have started the development of fs2-rabbit just for a simple case of use and I had so much fun that I decided to open source it. It’s a stream-based client library for RabbitMQ built on top of Fs2 and cats-effect.

I have contributed to Dotty, the next generation Scala compiler. It’s a great way to be on the other side of the street! You will need to think like a compiler does!

I have also contributed some documentation to one of my favorites streaming libraries Fs2: Functional Streams for Scala (formerly Scalaz Stream)!

Constantly learning

I learned how to mix different algebras by making use of Coproduct and Inject when using the Free Monad, described in the excellent paper Data types à la carte by Swierstra. In order to illustrate this, I have created a demonstration project using the Cats library. I have to say I was thinking about writing a blog post about it but later decided that there are already too many posts talking about this so I don’t think it’ll be necessary 🙂

I’m also researching about concurrent and distributed applications in non-JVM languages. It has already taken me to play around with Erlang programs, Pipes Concurrency (Haskell), Cloud Haskell and Transient (Haskell). Up until now, I’m between Pipes Concurrency and Transient + Transient Universe, not very convinced about static actions building closures in Cloud Haskell, but it might be too early to come to conclusions.  Here are some examples:

It’s also quite interesting what’s going on in the cryptocurrency world. I keep my eyes peeled when it comes to Blockchain, Ethereum and all the good stuff built by Input / Output like Cardano SL (Haskell).

Conferences

I’m attending the Scala Wave conference in Gdansk, Poland this Friday and Saturday. During the first day I will be participating at the Open Source Sprees organized by Scala Center where I hope to welcome new contributors! Second day I will assist to all the interesting talks 🙂

Available for hire!

We, the Lambda Team, are launching our own Software Consultancy based in Europe. Make sure you check out our portfolio and get in contact for more information!

Cheers,
Gabriel.

Social Network using Titan Db: Part 2

Following the first part of this post here we’re going to give answers to more complex questions. The previously described questions have been a little bit changed:

  1. What are my followers with ages between 20 and 25 years old?
  2. How many people from Argentina is following me? And who are they?
  3. What are my followers since determined date?
  4. What’s the first follower of whom I have started following first?
  5. Can I reach the president of my country by common connections of my country? If so, show his information and the path.

Connections

Changes in the model are more than necessary since we need more information about people in our network and also about the relationships, so this is how the the connections look like now:

social-network-complex-case

Persons (represented as vertices) have now information about name, age, country and profession (also an Id that is not represented in the graphic). Relationships “following” / “followedBy” (represented as edges) have now a timestamp so we can find out since when someone is following you.

So for example, Gabi has started following Damian on July 1st of 2015 and Damian has started following him back on August 14th of 2015.

Answers by example

Based on the information in the diagram shown above we can choose someone and try to answer the questions by just looking at it in detail, so we can say:

  1. What are Gabi’s followers with ages between 20 and 25 years old? Ivan (21) and Guille (24).
  2. How many people from Argentina is following Gabi? And who are they? 3 people: Damian, Ivan and Guille.
  3. What are Gabi’s followers since January 2017? Ivan (March 2017) and Guille (February 2017).
  4. What’s the first follower of whom Mike has started following first? Mike has first started following Guille (July 2010) and Guille’s first follower is Gabi (January 2007).
  5. Can I reach the president of Argentina by Gabi’s common connections of his country? If so , show his information and the path. Yes, Capusotto. Path: Gabi –> Damian –> Gustavo –> Capusotto.

Solution

Now, that was easy because there’s only 9 people in our network, but as soon as it starts growing it’s going to be really difficult to tell but that’s not a problem at all, we can solve it using Titan Db.

This is how we create a person now that we have more information:

def createPerson(person: Person): Option[Person] = findPerson(person.id) match {
  case Some(v)  => None
  case None     =>
    g + (PersonLabel, PersonId          -> person.id,
                      PersonName        -> person.name,
                      PersonAge         -> person.age,
                      PersonCountry     -> person.country,
                      PersonProfession  -> person.profession)
    g.tx().commit()
    Some(person)
}
Filters

In order to filter out our results we will need to make use of the org.apache.tinkerpop.gremlin.process.traversal.P class that represents a Predicate. This is how we can find the followers with ages between 20 and 25 years old using P:

def findFollowersWithAgeRange(personId: PersonIdentifier, from: Int, to: Int): List[Person] = {
  g.V.has(PersonId, personId.id)
    .outE(FollowedBy)
    .inV()
    .has(PersonAge, P.gte(from)).has(PersonAge, P.lte(to))
    .map(_.mapPerson)
    .toList()
}

Basically what we are doing above is applying the filters just where we need them. Firstly we look up on all the vertices for a Person with a particular Id and once we get it, we continue traversing the graph on all the outgoing edges labelled with “FollowedBy” followed by an incoming vertex where the property “PersonAge” has a value between 20 and 25. Finally we map all the fields to our Person case class using a custom implicit conversion, see full source code (link at the end of the post).

Now finding the followers since determined date is quite similar, except this time we apply the predicate on the outgoing edges:

def findFollowersSince(personId: PersonIdentifier, since: Instant): List[Person] = {
 g.V.has(PersonId, personId.id)
   .outE(FollowedBy)
   .has(TimestampKey, P.gte(since.toEpochMilli))
   .inV()
   .map(_.mapPerson)
   .toList()
}

And in order to find the first follower of whom we has first started following we don’t need to use filters but sorted data by Timestamp (using orderBy) and iteration (for comprehension):

def findFirstFollowerOfTopOne(personId: PersonIdentifier): Option[(Person, Person)] = {
 val result = for {
   f <- g.V.has(PersonId, personId.id).outE(Following).orderBy(TimestampKey.value).inV()
   g <- g.V(f).hasLabel(PersonLabel).outE(FollowedBy).orderBy(TimestampKey.value).inV()
 } yield (f.mapPerson, g.mapPerson)
 result.headOption()
}

Note that in the second statement when we get the variable “g” we are not traversing the entire graph again but we are starting from a particular vertex “f”, using the g.V.apply(…) method.

Loops

In the end we have to find a way to our president by following common connections of our country. For this purpose we’ll use loops to traverse the graph in different directions given a predicate (repeat / until):

private def findPresident(personId: Long, country: String) = {
 g.V.has(PersonId, personId)
   .repeat(_.outE(Following).inV.has(PersonCountry, P.eq(country)).simplePath)
   .until(_.has(PersonProfession, "President"))
}

def findPresidentByCommonConnections(personId: PersonIdentifier, country: Country): Option[Person] = {
 findPresident(personId.id, country.value).map(_.mapPerson).headOption()
}

Now to get the path to our president we need to do as shown below:

def findPathToPresident(personId: PersonIdentifier, country: Country): List[Person] = {
 findPresident(personId.id, country.value)
   .path()
   .headOption()
   .toList
   .flatMap(_.persons)
}

implicit class PathOps(path: Path) {
 import scala.collection.JavaConversions._

 def persons: List[Person] = {
   path.objects.toList.collect {
     case v: Vertex => v.mapPerson
   }
 }
}

We get the first possible path to the president (headOption) and then we map every vertex in the path as a Person. This is done by a custom implicit conversion shown above in the implicit class PathOps. Note that we have to use JavaConversions to use the collect method.

This technique is well described on the Michael Pollmeier’s blog post.

Running the examples

There’s no main application in our project, only unit tests. So the only way to verify how the app works is by running the tests. My favorite way is to execute ‘sbt test’ on a terminal.

Alternatively, I created a gist with a main app where our flow chart is translated into code.

Final thoughts

There will be no more code following this post but something that I would like to do in the future is to benchmark the app just to see things like performance, reliability and latency, among others. Since we can integrate Titan Db with Cassandra and Spark for example, it would be really interesting to see how far we can go. But that would probably be another post, not sure yet.

As always, the full source code of the given example can be found on GitHub.

I hope you find this post interesting and feel free to comment about your experience with Titan Db, I’d like to hear about people using it in production!

Greetings from Cambodia! 🙂
Gabriel.

Social Network using Titan Db: Part 1

Hi everybody!

After a long time gone I’m slowly getting back into business but with a lot of energy 🙂

Introduction

These past weeks I’ve been solving a couple of coding challenges and for one of them I thought (and I still think) that the best solution would be achieved by using a Graph Db (can’t post details because of confidential reasons). I had a little bit of experience using Neo4j in the past, not an expert though. So I did a bit of research and I found Titan Db which caught my attention as they claim it is a scalable Distributed Graph Database supporting thousand of concurrent users executing complex graph traversals in real time, just what I needed.

So this post will be about creating a small social network twitter-alike (following / followed by) using Titan Db. For those impatient creatures, here’s the code.

Basic Example

social-network

In this basic example we have the following relationships:

  • Gabi is following Damian and John, and is followed by Damian and Mike.
  • Damian is following Gabi and John, and is followed by Gabi.
  • John is following Chris, and is followed by Gabi and Damian.
  • Mike is following Gabi.
  • Chris is followed by John.

Pretty basic but enough to demonstrate how we can start creating these relationships in Titan Db using the Gremlin Scala DSL.

Introduction to Graph Databases

NOTE: If you’re already familiar with this concept feel free to skip this part.

As described in the Apache TinkerPop website: “A graph is a structure composed of vertices and edges. Both vertices and edges can have an arbitrary number of key/value-pairs called properties. Vertices denote discrete objects such as a person, a place, or an event. Edges denote relationships between vertices. For instance, a person may know another person, have been involved in an event, and/or was recently at a particular place. Properties express non-relational information about the vertices and edges. Example properties include a vertex having a name, an age and an edge having a timestamp and/or a weight. Together, the aforementioned graph is known as a property graph and it is the foundational data structure of Apache TinkerPop”.

So for our example every person will be a Vertex and both relationships “following” and “followedBy” will be Edges. Every person has an Id and a Name which will be Properties of each Vertice.

Relationships in Scala

The following code is part of our SocialNetworkService adding some explanation of what’s happening:

private def findPerson(personId: Long): Option[Person] = {
    g.V.has(PersonId, personId) // Filter by Key (PersonId) and Value (personId)
      .value(PersonName) // Select property PersonName
      .headOption() // First match
      .map(Person.apply(personId, _)) // Convert to our Person case class
  }

  private def findPersonsBy(personId: Long, link: String): List[Person] = {
    // Filter by PersonId where the outcoming Edges matching link (either Following or FollowedBy) and then getting the incoming vertice
    val friends = for {
      f <- g.V.has(PersonId, personId).outE(link).inV() 
    } yield Person(f.value2(PersonId), f.value2(PersonName)) 
    friends.toList() 
} 

def findFollowers(personId: PersonIdentifier): List[Person] = findPersonsBy(personId.id, FollowedBy) 

def findFollowing(personId: PersonIdentifier): List[Person] = findPersonsBy(personId.id, Following) 

// Validate if the person already exists and then creating the person 
def createPerson(person: Person): Option[Person] = findPerson(person.id) match { 
    case Some(v)  => None
    case None     =>
      g + (PersonLabel, PersonId -> person.id, PersonName -> person.name)
      g.tx().commit()
      Some(person)
  }

  // Validate if both persons exist and then creating the relationship
  // TODO: Add validation for existent relationships
  def follow(from: Person, to: Person): Option[Friendship] =
    (findPerson(from.id), findPerson(to.id)) match {
      case (Some(f), Some(t)) =>
        val friendship = for {
          f <- g.V.has(PersonId, from.id)
          t <- g.V.has(PersonId, to.id) } 
        yield { 
          f --- Following --> t // "from" is now following "to"
          t --- FollowedBy --> f // By nature "to" is now followed by "from"
        }
        friendship.headOption() // Execute the query
        g.tx().commit() // Commit the transaction
        Some(Friendship(from, to))
      case _ => None
    }

I really like the DSL of Gremlin. In our example g is our graph from which we can access all the vertices by g.V and edges by g.E. Then we can filter out by the many has(…) methods, add new vertices g + (Label, Key -> Value) and new relationships (edges) using arrow-alike connectors a <- – Link – -> b provided by the DSL

What’s next?

Well, this was just an introductory post to this case using Titan Db, however in the second part we’re going to address more complex scenarios so we can give the following questions a response:

  • What are my followers with ages between 20 and 25 years old?
  • How many people from Argentina is following me?
  • What are my new followers of the week? And of the month?
  • What are the first 10 followers in common of the people I’m following?
  • Can I reach the president of my country by common connections of my country? If so show his information.

And maybe I can come up with more examples, but that’s basically what I have in mind at the moment for the next chapter of this post. But hey! If you feel challenged please go ahead and try to implement it yourself! 😉

Oh again, here’s the project as always on GitHub.

UPDATE: See the part 2 of this post!

Until next post!
Gabriel.

CRUD and error handling with Http4s

Hi everybody!

Last year I wrote an introductory post to Http4s and a few months later I started using it in the company I was working for. Results have been very good so far regarding readability in a pure-functional-style, usability, throughput and performance in production environments so I would recommend the use of it.

In this post I want to show you how to create a simple CRUD (Create, Read, Update, Delete) demo, always needed when creating an API. And in this kind of operations you will always need to handle expected errors from the backend and returning the appropriate HTTP Status Code and response. So let’s start!

EDIT: This post has been written in October 2016 using Http4s v0.14.6 for the examples and things have changed quite a lot ever since. For a newer version using Http4s v0.18.x (Cats v1.0.0-RC1) please check out this Giter8 Templatethis repo and this blog post about all the updates.

CRUD for users

In this demo we will create a CRUD for an User (username, email and age). We will support the following operations:

  1. Find all the users sorted by username
  2. Create a new user:
    • If the username already exists return a descriptive error.
    • Otherwise return the user with the designated id.
  3. Find user by id:
    • Return either the user or a descriptive message indicating user not found.
  4. Edit the age of a user by id:
    • Return either a successful code or a descriptive message indicating user not found.
  5. Delete user by id:
    • Return either a successful code or a descriptive message indicating user not found.

HTTP Status Code Responses

So far this is the definition of the most common use of a crud with some validations. Let’s see how we define the HTTP Endpoints for all these operations:

  1. GET /users | /users?sort=ASC/users?sort=DESC
    • Returns a 200 http status code with a list of all the users.
  2. POST /users
    • If the username is already in use returns a 409 http status code indicating the duplication.
    • Otherwise creates the user and returns a 201 http status code and the user with id in the body.
  3. GET /users/{id}
    • If the user exists it returns a 200 http status code and the user itself.
    • Otherwise it returns a 400 http status code indicating user not found.
  4. PUT /users/{id}
    • If the user exists it returns a 202 http status code.
    • Otherwise it returns a 400 http status code indicating user not found.
  5. DELETE /users/{id}
    • If the user exists it returns a 204 http status code.
    • Otherwise it returns a 400 http status code indicating user not found.

OK, show me the code!

We will start by creating the main app running the server, as indicated by the official documentation, it’s recommended to extend the ServerApp trait that will handle the shutdown and clean up of the resources automatically:

package com.gvolpe.http.server

import com.gvolpe.http.server.endpoint.UserHttpEndpoint
import org.http4s.server.{Server, ServerApp}
import org.http4s.server.blaze._
import scalaz.concurrent.Task

object HttpApi extends ServerApp {

  override def server(args: List[String]): Task[Server] = {
    BlazeBuilder
      .bindHttp(8080, "localhost")
      .mountService(UserHttpEndpoint.service)
      .start
  }

}

By default we use the Blaze server as it is the native backend supported by http4s but it’s also possible to use different servers, however that topic is out of the goal of this post.

Our UserService trait defines the CRUD operations always returning a scalaz.concurrent.Task[A] as shown below:

trait UserService {
  def save(user: User): Task[Unit]
  def findAll(sort: UserSorting): Task[List[User]]
  def find(id: Long): Task[User]
  def remove(id: Long): Task[Unit]
  def edit(id: Long, age: Int): Task[Unit]
}

We will not care about the implementation of this service because what it matters in this case is the returning types and the possible expected errors that a Task might contain when it runs. However you can take a look at our default implementation on GitHub (link at the end of the post).

In the code shown above the type UserSorting is just an ADT defining two possible values, either Asc or Desc.

Main User HTTP Endpoint

So this is how our UserHttpEndpoint object referenced by the main server app looks like. First of all, we define the codecs available implicitly in the scope of the object. For this purpose we are using the amazing Circe library using automated codec derivation.

implicit def circeJsonDecoder[A](implicit decoder: Decoder[A]) = jsonOf[A]
implicit def circeJsonEncoder[A](implicit encoder: Encoder[A]) = jsonEncoderOf[A]

Then we define the main resources of the endpoint as an HttpService that represents a PartialFunction[Request, Task[Response]]:

val service = HttpService {
  case GET -> Root / "users" : ? SortQueryParamMatcher(sort) =>
    Ok(UserService.findAll(UserSorting.from(sort)))
  case req @ POST -> Root / "users" =>
    req.decode[UserForm] { userForm =>
      val user = User(Random.nextInt(1000), userForm.username, userForm.email, userForm.age)
      UserService.save(user).flatMap(_ => Created(s"User with id: ${user.id}")).handleWith(errorHandler)
    }
  case GET -> Root / "users" / LongVar(id) =>
    Ok(UserService.find(id)).handleWith(errorHandler)    
  case DELETE -> Root / "users" / LongVar(id) =>
    UserService.remove(id).flatMap(_ => NoContent()).handleWith(errorHandler)
  case req @ PUT -> Root / "users" / LongVar(id) =>
    req.decode[UserAgeForm] { ageForm =>
      UserService.edit(id, ageForm.age).flatMap(_ => Accepted()).handleWith(errorHandler)
    }
}

In the code shown above we are using a SortQueryParamMatcher and an errorHandler variable that we didn’t see yet, so this is how is defined:

object SortQueryParamMatcher extends OptionalQueryParamDecoderMatcher[String]("sort")
  
private val errorHandler: PartialFunction[Throwable, Task[Response]] = {
  case UserNotFoundException(id)              => BadRequest(s"User with id: $id not found!")
  case DuplicatedUsernameException(username)  => Conflict(s"Username $username already in use!")
}

Adding a new validation would be just adding a new case in the partial function, very simple isn’t it?

Final thoughts

As I said at the beginning of this post I totally encourage the use of this powerful library if you like to write pure functional code and at the same time having great performance results. The documentation has improved a lot since the last year so please give it a try and let me know if you would like me to write about any other feature of this library not covered either here or in the official docs.

As always, a demo project for this post is available on GitHub, check it out here!

Until next post!
Gabriel.

PS: I’m currently on a world trip for undetermined amount of time and sometimes with limited connection to Internet so the gaps between posts might be bigger but I’ll try to keep on writing. I suppose I’ll be back in the industry next year at some point… Cheers from India!

Typed Actors

Hi everybody!

On these days I had a break and I was able to investigate something different from work not related to streams and I discovered this library Typed Actors by @knutwalker which I liked so much that I decided to write a post about it. And just a short post because its documentation is already very good! For the impatient here’s the demo project I’ve created 🙂

AKKA ACTORS

As you might know, if you ever used Akka actors, its main method type ‘receive’ is Any => Unit, exposing the lack of type safety. This is very dangerous and human error prone, given the fact that you can send any message to an Actor B and it might not handle that specific type of message. But what if you can get errors in compile time to identify these cases? Well, this is what Typed Actors provide and it’s very nice to work with.

TYPED ACTORS

Taken directly from its documentation, Typed Actors has the following goals:

  • add a compile-time layer to existing ActorRefs with minimal runtime overhead
  • be compatible with all of the existing Akka modules, traits, and extensions in terms of composition and behavior

There are a few ways to define a TypedActor but the one I like the most and I recommend is to extend the TypedActor trait to add more exhaustiveness check for our types. Let’s see a few examples:

import com.gvolpe.typed.examples.actor.StrictTypedActor._
import de.knutwalker.akka.typed._

object StrictTypedActor {
  sealed trait StrictMessage extends Product with Serializable
  case object StrictOne extends StrictMessage
  case object StrictTwo extends StrictMessage

  case object NotStrict

  // Same as Props[StrictMessage, StrictTypedActor] but using type derivation
  def props = PropsFor[StrictTypedActor]
}

class StrictTypedActor extends TypedActor.Of[StrictMessage] {
  override def typedReceive: TypedReceive = {
    case StrictOne => println(s"received StrictOne at $typedSelf")
    case StrictTwo => println(s"received StrictTwo at $typedSelf")
//    case NotStrict => println(s"this does not compile!")
  }
}

In the case above, the actor will only handle messages of type StrictMessage defined as an ADT. Let’s see the types that we will get when using this typed actor and what we can do with it:

  import akka.actor.ActorSystem
  import de.knutwalker.akka.typed._ 

  implicit val system = ActorSystem("typed-actors-demo")

  val actor: ActorRef[StrictMessage] = ActorOf(StrictTypedActor.props, name = "typed-actor")

  actor ! StrictOne
//actor ! "Hello world!" // This won't compile!!!

It’s very nice that if we try to send a message that is not handled by the actor we get a compilation error!

INTERACTION WITH EXISTING AKKA ACTORS

It’s also possible to work with an existent unsafe actor application without any problems. The library provides two main functions to switch between typed and untyped actors all we want. Let’s see an example:

  import akka.actor.ActorSystem
  import de.knutwalker.akka.typed._ 

  implicit val system = ActorSystem("typed-actors-demo")

  val actor: ActorRef[StrictMessage] = ActorOf(StrictTypedActor.props, name = "typed-actor")

  val unsafeActor: UntypedActorRef = actor.untyped // Alias for akka.actor.ActorRef

  val typedAgain: ActorRef[StrictMessage] = unsafeActor.typed[StrictMessage]

That’s it, not too much to add. Typed Actors makes it easy only by adding a small extra amount of code and we gain all the best of type safety. However we decide how much type safety we want to put in our application just because it’s very easy to have both typed and untyped actors living together in the same context.

EXAMPLES

There are a lot of cases that I haven’t mentioned in this post like Union Types, Ask Pattern (or ReplyTo), Persistence Actors and the Typed Creator module among others, but almost everything is possible using this powerful library. Check out the official documentation that is awesome! And I’ve also created a demo project with a lot of examples and also the Akka example to calculate a Pi approximation converted to use Typed Actors. Check it out on GitHub!

Until next post!
Gabriel.

From Scalaz Streams to FS2

Hi everybody!

Last April I wrote this post about Scalaz Streams where I explained how we consume orders from Kafka, persist them to Cassandra, call a web service to update the prices and finally publish the result to RabbitMQ. And this was achieved by running two streams in parallel. Here’s the graphic to recap:

pricer_streams_parallel

Today I will introduce the same problem but solved with FS2: Functional Streams for Scala instead, the evolution of Scalaz Streams. If you are one of those impatient creatures here is the code 😉

MAIN DIFFERENCES

Our main flow was defined as follows:

def flow(consumer: ProcessT[Order],
           logger: Sink[Task, Order],
           storageEx: Exchange[Order, Order],
           pricer: Channel[Task, Order, Order],
           publisher: Sink[Task, Order])
          (implicit S: Strategy)= {
    merge.mergeN(
      Process(
        consumer          observe   logger    to  storageEx.write,
        storageEx.read    through   pricer    to  publisher
      )
    )
  }

And from this Scalaz Streams code we can name the changes in FS2:

  1. Process[F[_], +A] is now Stream[F[_], +A]
  2. Channel[F[_], +A, +B] is now Pipe[F[_], A, B] which is an alias type for Stream[F, A] => Stream[F, B]
  3. Sink[F[_], +A] is the same as Channel being the type of B Unit. Pipe[F[_], A, Unit]
  4. Exchange is not there anymore so we have to supply it using two streams.
  5. merge.mergeN is now fs2.concurrent.join

So this is how it looks the pricer flow with FS2:

def flow(consumer: StreamT[Order],
           logger: SinkT[Order],
           storage: OrderStorage,
           pricer: PipeT[Order, Order],
           publisher: SinkT[Order])(implicit S: fs2.Strategy) = {
    fs2.concurrent.join(2)(
      Stream(
        consumer      observe logger to storage.write,
        storage.read  through pricer to publisher
      )
    )

Note: All the functions are now defined with a generic effect type whereas before a lot of the functions were pretty tight to Scalaz Task. FS2 requires an implicit Async[F] where F is the effect type in order to run the stream. By default, an Async implementation for Task is provided implicitly.

TIMER, SCHEDULER: time package

One of the things that I improved in this new version is the Order Generator. Before it was a naive implementation using Task and blocking the thread for 2 seconds. Now it’s been implemented using the functions provided by the fs2.time package. Here’s the code:

  implicit val scheduler = fs2.Scheduler.fromFixedDaemonPool(2, "generator-scheduler")
  implicit val S         = fs2.Strategy.fromFixedDaemonPool(2, "generator-timer")

  private val defaultOrderGen: PipeT[Int, Order] = { orderIds =>
    val tickInterrupter = time.sleep[Task](11.seconds) ++ Stream(true)
    val orderTick       = time.awakeEvery[Task](2.seconds).interruptWhen(tickInterrupter)
    (orderIds zip orderTick) flatMap { case (id, t) =>
      val itemId    = Random.nextInt(500).toLong
      val itemPrice = Random.nextInt(10000).toDouble
      val newOrder  = Order(id.toLong, List(Item(itemId, s"laptop-$id", itemPrice)))
      Stream.emit(newOrder)
    }
  }

The function time.awakeEvery[Task] generates a Stream[Task, FiniteDuration]. And any Stream has the function interruptWhen(haltWhenTrue: Stream[F[_], Boolean] available to halt when the given stream of Boolean returns true. Based on this I used the function time.sleep[Task] wich generates a Stream[Task, Nothing] and append a Stream(true) to it. So after the given duration this combination will return what we need for the interrupter; a Stream[Task, Boolean].

QUEUES, SIGNALS AND TOPICS: async package

Another breaking change is the way queues, signals and topics, among others, are being generated. In Scalaz Stream they were tight to Task as the effect type whereas in FS2 you have to provide it. And the other difference you will notice is that on creation before you would get a Queue[A] whereas in FS2 you get a F[Queue[F, A]] as follows:

// Scalaz Streams
val oldQ: Queue[Int] = async.boundedQueue[Int](100) 
// FS2 
val newQ: Task[Queue[Task, Int]] = async.boundedQueue[Task, Int](100)

UTILS

The util functions channel.lift and sink.lift that I used in the first demo are no longer available so I created a FS2Utils file to provide similar functionality.

And finally one interesting function I’m using in the final combination of the streams is mergeHaltBoth wich works as merge but it will halt when one of the streams halt. In our case the Order Generator flow will halt after 11 seconds.

ADDITIONAL RESOURCES

I was inspired to write this post after watch the excellent series of videos published by Michael Pilquist and because at my work we are strongly using Scalaz Streams (maybe we migrate to FS2 sometime in the near future, I don’t know yet). Also the official guide and migration guide are really useful.

And again, the demo project for this post is available on GitHub. That’s all I have today hope you enjoyed it.

Until next post!
Gabriel.