Social Network using Titan Db: Part 2

Following the first part of this post here we’re going to give answers to more complex questions. The previously described questions have been a little bit changed:

  1. What are my followers with ages between 20 and 25 years old?
  2. How many people from Argentina is following me? And who are they?
  3. What are my followers since determined date?
  4. What’s the first follower of whom I have started following first?
  5. Can I reach the president of my country by common connections of my country? If so, show his information and the path.


Changes in the model are more than necessary since we need more information about people in our network and also about the relationships, so this is how the the connections look like now:


Persons (represented as vertices) have now information about name, age, country and profession (also an Id that is not represented in the graphic). Relationships “following” / “followedBy” (represented as edges) have now a timestamp so we can find out since when someone is following you.

So for example, Gabi has started following Damian on July 1st of 2015 and Damian has started following him back on August 14th of 2015.

Answers by example

Based on the information in the diagram shown above we can choose someone and try to answer the questions by just looking at it in detail, so we can say:

  1. What are Gabi’s followers with ages between 20 and 25 years old? Ivan (21) and Guille (24).
  2. How many people from Argentina is following Gabi? And who are they? 3 people: Damian, Ivan and Guille.
  3. What are Gabi’s followers since January 2017? Ivan (March 2017) and Guille (February 2017).
  4. What’s the first follower of whom Mike has started following first? Mike has first started following Guille (July 2010) and Guille’s first follower is Gabi (January 2007).
  5. Can I reach the president of Argentina by Gabi’s common connections of his country? If so , show his information and the path. Yes, Capusotto. Path: Gabi –> Damian –> Gustavo –> Capusotto.


Now, that was easy because there’s only 9 people in our network, but as soon as it starts growing it’s going to be really difficult to tell but that’s not a problem at all, we can solve it using Titan Db.

This is how we create a person now that we have more information:

def createPerson(person: Person): Option[Person] = findPerson( match {
  case Some(v)  => None
  case None     =>
    g + (PersonLabel, PersonId          ->,
                      PersonName        ->,
                      PersonAge         -> person.age,
                      PersonCountry     ->,
                      PersonProfession  -> person.profession)

In order to filter out our results we will need to make use of the org.apache.tinkerpop.gremlin.process.traversal.P class that represents a Predicate. This is how we can find the followers with ages between 20 and 25 years old using P:

def findFollowersWithAgeRange(personId: PersonIdentifier, from: Int, to: Int): List[Person] = {
    .has(PersonAge, P.gte(from)).has(PersonAge, P.lte(to))

Basically what we are doing above is applying the filters just where we need them. Firstly we look up on all the vertices for a Person with a particular Id and once we get it, we continue traversing the graph on all the outgoing edges labelled with “FollowedBy” followed by an incoming vertex where the property “PersonAge” has a value between 20 and 25. Finally we map all the fields to our Person case class using a custom implicit conversion, see full source code (link at the end of the post).

Now finding the followers since determined date is quite similar, except this time we apply the predicate on the outgoing edges:

def findFollowersSince(personId: PersonIdentifier, since: Instant): List[Person] = {
   .has(TimestampKey, P.gte(since.toEpochMilli))

And in order to find the first follower of whom we has first started following we don’t need to use filters but sorted data by Timestamp (using orderBy) and iteration (for comprehension):

def findFirstFollowerOfTopOne(personId: PersonIdentifier): Option[(Person, Person)] = {
 val result = for {
   f <- g.V.has(PersonId,
   g <- g.V(f).hasLabel(PersonLabel).outE(FollowedBy).orderBy(TimestampKey.value).inV()
 } yield (f.mapPerson, g.mapPerson)

Note that in the second statement when we get the variable “g” we are not traversing the entire graph again but we are starting from a particular vertex “f”, using the g.V.apply(…) method.


In the end we have to find a way to our president by following common connections of our country. For this purpose we’ll use loops to traverse the graph in different directions given a predicate (repeat / until):

private def findPresident(personId: Long, country: String) = {
 g.V.has(PersonId, personId)
   .repeat(_.outE(Following).inV.has(PersonCountry, P.eq(country)).simplePath)
   .until(_.has(PersonProfession, "President"))

def findPresidentByCommonConnections(personId: PersonIdentifier, country: Country): Option[Person] = {
 findPresident(, country.value).map(_.mapPerson).headOption()

Now to get the path to our president we need to do as shown below:

def findPathToPresident(personId: PersonIdentifier, country: Country): List[Person] = {
 findPresident(, country.value)

implicit class PathOps(path: Path) {
 import scala.collection.JavaConversions._

 def persons: List[Person] = {
   path.objects.toList.collect {
     case v: Vertex => v.mapPerson

We get the first possible path to the president (headOption) and then we map every vertex in the path as a Person. This is done by a custom implicit conversion shown above in the implicit class PathOps. Note that we have to use JavaConversions to use the collect method.

This technique is well described on the Michael Pollmeier’s blog post.

Running the examples

There’s no main application in our project, only unit tests. So the only way to verify how the app works is by running the tests. My favorite way is to execute ‘sbt test’ on a terminal.

Alternatively, I created a gist with a main app where our flow chart is translated into code.

Final thoughts

There will be no more code following this post but something that I would like to do in the future is to benchmark the app just to see things like performance, reliability and latency, among others. Since we can integrate Titan Db with Cassandra and Spark for example, it would be really interesting to see how far we can go. But that would probably be another post, not sure yet.

As always, the full source code of the given example can be found on GitHub.

I hope you find this post interesting and feel free to comment about your experience with Titan Db, I’d like to hear about people using it in production!

Greetings from Cambodia! 🙂

Social Network using Titan Db: Part 1

Hi everybody!

After a long time gone I’m slowly getting back into business but with a lot of energy 🙂


These past weeks I’ve been solving a couple of coding challenges and for one of them I thought (and I still think) that the best solution would be achieved by using a Graph Db (can’t post details because of confidential reasons). I had a little bit of experience using Neo4j in the past, not an expert though. So I did a bit of research and I found Titan Db which caught my attention as they claim it is a scalable Distributed Graph Database supporting thousand of concurrent users executing complex graph traversals in real time, just what I needed.

So this post will be about creating a small social network twitter-alike (following / followed by) using Titan Db. For those impatient creatures, here’s the code.

Basic Example


In this basic example we have the following relationships:

  • Gabi is following Damian and John, and is followed by Damian and Mike.
  • Damian is following Gabi and John, and is followed by Gabi.
  • John is following Chris, and is followed by Gabi and Damian.
  • Mike is following Gabi.
  • Chris is followed by John.

Pretty basic but enough to demonstrate how we can start creating these relationships in Titan Db using the Gremlin Scala DSL.

Introduction to Graph Databases

NOTE: If you’re already familiar with this concept feel free to skip this part.

As described in the Apache TinkerPop website: “A graph is a structure composed of vertices and edges. Both vertices and edges can have an arbitrary number of key/value-pairs called properties. Vertices denote discrete objects such as a person, a place, or an event. Edges denote relationships between vertices. For instance, a person may know another person, have been involved in an event, and/or was recently at a particular place. Properties express non-relational information about the vertices and edges. Example properties include a vertex having a name, an age and an edge having a timestamp and/or a weight. Together, the aforementioned graph is known as a property graph and it is the foundational data structure of Apache TinkerPop”.

So for our example every person will be a Vertex and both relationships “following” and “followedBy” will be Edges. Every person has an Id and a Name which will be Properties of each Vertice.

Relationships in Scala

The following code is part of our SocialNetworkService adding some explanation of what’s happening:

private def findPerson(personId: Long): Option[Person] = {
    g.V.has(PersonId, personId) // Filter by Key (PersonId) and Value (personId)
      .value(PersonName) // Select property PersonName
      .headOption() // First match
      .map(Person.apply(personId, _)) // Convert to our Person case class

  private def findPersonsBy(personId: Long, link: String): List[Person] = {
    // Filter by PersonId where the outcoming Edges matching link (either Following or FollowedBy) and then getting the incoming vertice
    val friends = for {
      f <- g.V.has(PersonId, personId).outE(link).inV() 
    } yield Person(f.value2(PersonId), f.value2(PersonName)) 

def findFollowers(personId: PersonIdentifier): List[Person] = findPersonsBy(, FollowedBy) 

def findFollowing(personId: PersonIdentifier): List[Person] = findPersonsBy(, Following) 

// Validate if the person already exists and then creating the person 
def createPerson(person: Person): Option[Person] = findPerson( match { 
    case Some(v)  => None
    case None     =>
      g + (PersonLabel, PersonId ->, PersonName ->

  // Validate if both persons exist and then creating the relationship
  // TODO: Add validation for existent relationships
  def follow(from: Person, to: Person): Option[Friendship] =
    (findPerson(, findPerson( match {
      case (Some(f), Some(t)) =>
        val friendship = for {
          f <- g.V.has(PersonId,
          t <- g.V.has(PersonId, } 
        yield { 
          f --- Following --> t // "from" is now following "to"
          t --- FollowedBy --> f // By nature "to" is now followed by "from"
        friendship.headOption() // Execute the query
        g.tx().commit() // Commit the transaction
        Some(Friendship(from, to))
      case _ => None

I really like the DSL of Gremlin. In our example g is our graph from which we can access all the vertices by g.V and edges by g.E. Then we can filter out by the many has(…) methods, add new vertices g + (Label, Key -> Value) and new relationships (edges) using arrow-alike connectors a <- – Link – -> b provided by the DSL

What’s next?

Well, this was just an introductory post to this case using Titan Db, however in the second part we’re going to address more complex scenarios so we can give the following questions a response:

  • What are my followers with ages between 20 and 25 years old?
  • How many people from Argentina is following me?
  • What are my new followers of the week? And of the month?
  • What are the first 10 followers in common of the people I’m following?
  • Can I reach the president of my country by common connections of my country? If so show his information.

And maybe I can come up with more examples, but that’s basically what I have in mind at the moment for the next chapter of this post. But hey! If you feel challenged please go ahead and try to implement it yourself! 😉

Oh again, here’s the project as always on GitHub.

UPDATE: See the part 2 of this post!

Until next post!

CRUD and error handling with Http4s

Hi everybody!

Last year I wrote an introductory post to Http4s and a few months later I started using it in the company I was working for. Results have been very good so far regarding readability in a pure-functional-style, usability, throughput and performance in production environments so I would recommend the use of it.

In this post I want to show you how to create a simple CRUD (Create, Read, Update, Delete) demo, always needed when creating an API. And in this kind of operations you will always need to handle expected errors from the backend and returning the appropriate HTTP Status Code and response. So let’s start!

CRUD for users

In this demo we will create a CRUD for an User (username, email and age). We will support the following operations:

  1. Find all the users sorted by username
  2. Create a new user:
    • If the username already exists return a descriptive error.
    • Otherwise return the user with the designated id.
  3. Find user by id:
    • Return either the user or a descriptive message indicating user not found.
  4. Edit the age of a user by id:
    • Return either a successful code or a descriptive message indicating user not found.
  5. Delete user by id:
    • Return either a successful code or a descriptive message indicating user not found.

HTTP Status Code Responses

So far this is the definition of the most common use of a crud with some validations. Let’s see how we define the HTTP Endpoints for all these operations:

  1. GET /users | /users?sort=ASC/users?sort=DESC
    • Returns a 200 http status code with a list of all the users.
  2. POST /users
    • If the username is already in use returns a 409 http status code indicating the duplication.
    • Otherwise creates the user and returns a 201 http status code and the user with id in the body.
  3. GET /users/{id}
    • If the user exists it returns a 200 http status code and the user itself.
    • Otherwise it returns a 400 http status code indicating user not found.
  4. PUT /users/{id}
    • If the user exists it returns a 202 http status code.
    • Otherwise it returns a 400 http status code indicating user not found.
  5. DELETE /users/{id}
    • If the user exists it returns a 204 http status code.
    • Otherwise it returns a 400 http status code indicating user not found.

OK, show me the code!

We will start by creating the main app running the server, as indicated by the official documentation, it’s recommended to extend the ServerApp trait that will handle the shutdown and clean up of the resources automatically:

package com.gvolpe.http.server

import com.gvolpe.http.server.endpoint.UserHttpEndpoint
import org.http4s.server.{Server, ServerApp}
import org.http4s.server.blaze._
import scalaz.concurrent.Task

object HttpApi extends ServerApp {

  override def server(args: List[String]): Task[Server] = {
      .bindHttp(8080, "localhost")


By default we use the Blaze server as it is the native backend supported by http4s but it’s also possible to use different servers, however that topic is out of the goal of this post.

Our UserService trait defines the CRUD operations always returning a scalaz.concurrent.Task[A] as shown below:

trait UserService {
  def save(user: User): Task[Unit]
  def findAll(sort: UserSorting): Task[List[User]]
  def find(id: Long): Task[User]
  def remove(id: Long): Task[Unit]
  def edit(id: Long, age: Int): Task[Unit]

We will not care about the implementation of this service because what it matters in this case is the returning types and the possible expected errors that a Task might contain when it runs. However you can take a look at our default implementation on GitHub (link at the end of the post).

In the code shown above the type UserSorting is just an ADT defining two possible values, either Asc or Desc.

Main User HTTP Endpoint

So this is how our UserHttpEndpoint object referenced by the main server app looks like. First of all, we define the codecs available implicitly in the scope of the object. For this purpose we are using the amazing Circe library using automated codec derivation.

implicit def circeJsonDecoder[A](implicit decoder: Decoder[A]) = jsonOf[A]
implicit def circeJsonEncoder[A](implicit encoder: Encoder[A]) = jsonEncoderOf[A]

Then we define the main resources of the endpoint as an HttpService that represents a PartialFunction[Request, Task[Response]]:

val service = HttpService {
  case GET -> Root / "users" : ? SortQueryParamMatcher(sort) =>
  case req @ POST -> Root / "users" =>
    req.decode[UserForm] { userForm =>
      val user = User(Random.nextInt(1000), userForm.username,, userForm.age) => Created(s"User with id: ${}")).handleWith(errorHandler)
  case GET -> Root / "users" / LongVar(id) =>
  case DELETE -> Root / "users" / LongVar(id) =>
    UserService.remove(id).flatMap(_ => NoContent()).handleWith(errorHandler)
  case req @ PUT -> Root / "users" / LongVar(id) =>
    req.decode[UserAgeForm] { ageForm =>
      UserService.edit(id, ageForm.age).flatMap(_ => Accepted()).handleWith(errorHandler)

In the code shown above we are using a SortQueryParamMatcher and an errorHandler variable that we didn’t see yet, so this is how is defined:

object SortQueryParamMatcher extends OptionalQueryParamDecoderMatcher[String]("sort")
private val errorHandler: PartialFunction[Throwable, Task[Response]] = {
  case UserNotFoundException(id)              => BadRequest(s"User with id: $id not found!")
  case DuplicatedUsernameException(username)  => Conflict(s"Username $username already in use!")

Adding a new validation would be just adding a new case in the partial function, very simple isn’t it?

Final thoughts

As I said at the beginning of this post I totally encourage the use of this powerful library if you like to write pure functional code and at the same time having great performance results. The documentation has improved a lot since the last year so please give it a try and let me know if you would like me to write about any other feature of this library not covered either here or in the official docs.

As always, a demo project for this post is available on GitHub, check it out here!

Until next post!

PS: I’m currently on a world trip for undetermined amount of time and sometimes with limited connection to Internet so the gaps between posts might be bigger but I’ll try to keep on writing. I suppose I’ll be back in the industry next year at some point… Cheers from India!

Typed Actors

Hi everybody!

On these days I had a break and I was able to investigate something different from work not related to streams and I discovered this library Typed Actors by @knutwalker which I liked so much that I decided to write a post about it. And just a short post because its documentation is already very good! For the impatient here’s the demo project I’ve created 🙂


As you might know, if you ever used Akka actors, its main method type ‘receive’ is Any => Unit, exposing the lack of type safety. This is very dangerous and human error prone, given the fact that you can send any message to an Actor B and it might not handle that specific type of message. But what if you can get errors in compile time to identify these cases? Well, this is what Typed Actors provide and it’s very nice to work with.


Taken directly from its documentation, Typed Actors has the following goals:

  • add a compile-time layer to existing ActorRefs with minimal runtime overhead
  • be compatible with all of the existing Akka modules, traits, and extensions in terms of composition and behavior

There are a few ways to define a TypedActor but the one I like the most and I recommend is to extend the TypedActor trait to add more exhaustiveness check for our types. Let’s see a few examples:

import de.knutwalker.akka.typed._

object StrictTypedActor {
  sealed trait StrictMessage extends Product with Serializable
  case object StrictOne extends StrictMessage
  case object StrictTwo extends StrictMessage

  case object NotStrict

  // Same as Props[StrictMessage, StrictTypedActor] but using type derivation
  def props = PropsFor[StrictTypedActor]

class StrictTypedActor extends TypedActor.Of[StrictMessage] {
  override def typedReceive: TypedReceive = {
    case StrictOne => println(s"received StrictOne at $typedSelf")
    case StrictTwo => println(s"received StrictTwo at $typedSelf")
//    case NotStrict => println(s"this does not compile!")

In the case above, the actor will only handle messages of type StrictMessage defined as an ADT. Let’s see the types that we will get when using this typed actor and what we can do with it:

  import de.knutwalker.akka.typed._ 

  implicit val system = ActorSystem("typed-actors-demo")

  val actor: ActorRef[StrictMessage] = ActorOf(StrictTypedActor.props, name = "typed-actor")

  actor ! StrictOne
//actor ! "Hello world!" // This won't compile!!!

It’s very nice that if we try to send a message that is not handled by the actor we get a compilation error!


It’s also possible to work with an existent unsafe actor application without any problems. The library provides two main functions to switch between typed and untyped actors all we want. Let’s see an example:

  import de.knutwalker.akka.typed._ 

  implicit val system = ActorSystem("typed-actors-demo")

  val actor: ActorRef[StrictMessage] = ActorOf(StrictTypedActor.props, name = "typed-actor")

  val unsafeActor: UntypedActorRef = actor.untyped // Alias for

  val typedAgain: ActorRef[StrictMessage] = unsafeActor.typed[StrictMessage]

That’s it, not too much to add. Typed Actors makes it easy only by adding a small extra amount of code and we gain all the best of type safety. However we decide how much type safety we want to put in our application just because it’s very easy to have both typed and untyped actors living together in the same context.


There are a lot of cases that I haven’t mentioned in this post like Union Types, Ask Pattern (or ReplyTo), Persistence Actors and the Typed Creator module among others, but almost everything is possible using this powerful library. Check out the official documentation that is awesome! And I’ve also created a demo project with a lot of examples and also the Akka example to calculate a Pi approximation converted to use Typed Actors. Check it out on GitHub!

Until next post!

From Scalaz Streams to FS2

Hi everybody!

Last April I wrote this post about Scalaz Streams where I explained how we consume orders from Kafka, persist them to Cassandra, call a web service to update the prices and finally publish the result to RabbitMQ. And this was achieved by running two streams in parallel. Here’s the graphic to recap:


Today I will introduce the same problem but solved with FS2: Functional Streams for Scala instead, the evolution of Scalaz Streams. If you are one of those impatient creatures here is the code 😉


Our main flow was defined as follows:

def flow(consumer: ProcessT[Order],
           logger: Sink[Task, Order],
           storageEx: Exchange[Order, Order],
           pricer: Channel[Task, Order, Order],
           publisher: Sink[Task, Order])
          (implicit S: Strategy)= {
        consumer          observe   logger    to  storageEx.write,    through   pricer    to  publisher

And from this Scalaz Streams code we can name the changes in FS2:

  1. Process[F[_], +A] is now Stream[F[_], +A]
  2. Channel[F[_], +A, +B] is now Pipe[F[_], A, B] which is an alias type for Stream[F, A] => Stream[F, B]
  3. Sink[F[_], +A] is the same as Channel being the type of B Unit. Pipe[F[_], A, Unit]
  4. Exchange is not there anymore so we have to supply it using two streams.
  5. merge.mergeN is now fs2.concurrent.join

So this is how it looks the pricer flow with FS2:

def flow(consumer: StreamT[Order],
           logger: SinkT[Order],
           storage: OrderStorage,
           pricer: PipeT[Order, Order],
           publisher: SinkT[Order])(implicit S: fs2.Strategy) = {
        consumer      observe logger to storage.write,  through pricer to publisher

Note: All the functions are now defined with a generic effect type whereas before a lot of the functions were pretty tight to Scalaz Task. FS2 requires an implicit Async[F] where F is the effect type in order to run the stream. By default, an Async implementation for Task is provided implicitly.

TIMER, SCHEDULER: time package

One of the things that I improved in this new version is the Order Generator. Before it was a naive implementation using Task and blocking the thread for 2 seconds. Now it’s been implemented using the functions provided by the fs2.time package. Here’s the code:

  implicit val scheduler = fs2.Scheduler.fromFixedDaemonPool(2, "generator-scheduler")
  implicit val S         = fs2.Strategy.fromFixedDaemonPool(2, "generator-timer")

  private val defaultOrderGen: PipeT[Int, Order] = { orderIds =>
    val tickInterrupter = time.sleep[Task](11.seconds) ++ Stream(true)
    val orderTick       = time.awakeEvery[Task](2.seconds).interruptWhen(tickInterrupter)
    (orderIds zip orderTick) flatMap { case (id, t) =>
      val itemId    = Random.nextInt(500).toLong
      val itemPrice = Random.nextInt(10000).toDouble
      val newOrder  = Order(id.toLong, List(Item(itemId, s"laptop-$id", itemPrice)))

The function time.awakeEvery[Task] generates a Stream[Task, FiniteDuration]. And any Stream has the function interruptWhen(haltWhenTrue: Stream[F[_], Boolean] available to halt when the given stream of Boolean returns true. Based on this I used the function time.sleep[Task] wich generates a Stream[Task, Nothing] and append a Stream(true) to it. So after the given duration this combination will return what we need for the interrupter; a Stream[Task, Boolean].


Another breaking change is the way queues, signals and topics, among others, are being generated. In Scalaz Stream they were tight to Task as the effect type whereas in FS2 you have to provide it. And the other difference you will notice is that on creation before you would get a Queue[A] whereas in FS2 you get a F[Queue[F, A]] as follows:

// Scalaz Streams
val oldQ: Queue[Int] = async.boundedQueue[Int](100) 
// FS2 
val newQ: Task[Queue[Task, Int]] = async.boundedQueue[Task, Int](100)


The util functions channel.lift and sink.lift that I used in the first demo are no longer available so I created a FS2Utils file to provide similar functionality.

And finally one interesting function I’m using in the final combination of the streams is mergeHaltBoth wich works as merge but it will halt when one of the streams halt. In our case the Order Generator flow will halt after 11 seconds.


I was inspired to write this post after watch the excellent series of videos published by Michael Pilquist and because at my work we are strongly using Scalaz Streams (maybe we migrate to FS2 sometime in the near future, I don’t know yet). Also the official guide and migration guide are really useful.

And again, the demo project for this post is available on GitHub. That’s all I have today hope you enjoyed it.

Until next post!

Functional Chain of Responsibility pattern

Hi everyone!

If you are familiar with any OOP language you might have been used the Chain of Responsibility pattern at least once. Today’s topic is about solving the same problem but in a functional way. Let’s see the alternatives we have.

OOP Style

Because it’s possible to write OOP and imperative code in Scala one can rewrite this pattern the same way you would do it in Java. This is an example I found on Gist. Obviously, you shouldn’t do it this way!

Partial Functions

In one of the posts of the amazing series The Neophyte’s Guide to Scala by Daniel Westheide he mentions the alternative to the Chain of Responsibility pattern by chaining partial functions using the orElse method. And following this alternative here’s a good example I found in the blog post Design Patterns in Scala by Pavel Fatin.

This is the simplest way to solve it. However one of the cons is that you always need to specify a default partial function in case neither of the functions can be applied.

Let’s say that we want to apply only one of different rules and when we find the right one, we apply it and finish the flow by returning a value. This is how we would do it:

case class DemoState(number: Int)

type PFRule = PartialFunction[DemoState, Option[String]]

private def numberRule(f: Int => Boolean, result: String): PFRule = {
  case DemoState(n) if f(n) => Option(result)

val GreaterThanFiveRule = numberRule(_ > 5, "Greater than five")
val LessThanFiveRule    = numberRule(_ < 5, "Less than five")
val EqualsFiveRule      = numberRule(_ == 5, "Equals five")

val NumberRules = GreaterThanFiveRule orElse LessThanFiveRule orElse EqualsFiveRule

NumberRules(5) // It will return "Equals five"
NumberRules(1) // It will return "Less than five"
NumberRules(7) // It will return "Greater than five"

We can combine any PartialFunction of the same type using orElse. The order is very important. If you have rules than can be applied in different cases, the order will define the priority.

Tail Recursive technique

I’ve found one interesting solution using a tail-recursive approach and the rules defined as Either in this post. Let’s say that we have defined the same number rules in this way using Scalaz disjunction:

import scalaz._, Scalaz._

trait Rule[A, B] {
  def handle(request: B): A \/ B

type FiveRule = Rule[String, DemoState]

case object GreaterThanFiveRule extends FiveRule {
  override def handle(state: DemoState): String \/ DemoState =
    if (state.number > 5) "Greater than five".left
    else state.right

case object LessThanFiveRule extends FiveRule {
  override def handle(state: DemoState): String \/ DemoState =
    if (state.number < 5) "Less than five".left
    else state.right

case object EqualsFiveRule extends FiveRule {
  override def handle(state: DemoState): String \/ DemoState =
    if (state.number == 5) "Equals five".left
    else state.right

val NumberRules = List(GreaterThanFiveRule, LessThanFiveRule, EqualsFiveRule) 

And the tail-recursive technique to apply only one of the rules:

implicit class RuleListHandler[A, B](list: List[Rule[A, B]]) {
  def applyOnlyOneTailRec(request: B): A \/ B = {
    def loop(req: B, rules: List[Rule[A, B]]): A \/ B = {
      if (rules.isEmpty) req.right
      else rules.head.handle(req) match {
        case -\/(value)   => value.left
        case \/-(handler) => loop(req, rules.tail)
    loop(request, list)

Then we can apply only one rule by calling this function:

// This will return -\/(Equals five)
NumberRules applyOnlyOneTailRec (_.handle(DemoState(5)))

Applicative Functor Traverse

If your code base is mostly Scalaz, another option to apply the same logic is to use the Applicative method traverse that is defined this way:

trait Traverse[F[_]] extends Functor[F] with Foldable[F] { self =>
  def traverseImpl[G[_]:Applicative,A,B](fa: F[A])(f: A => G[B]): G[F[B]]

If you’ve ever used Scala Future you might have used both sequence and/or traverse methods:

val list: List[Future[Int]] = List(Future(1), Future(2))
val x: Future[List[Int]] = Future.sequence(list) map (l => l map (n => n+1))
val y: Future[List[Int]] = Future.traverse(list)(l => l map (n => n+1))

Traverse is the same to map over an Applicative and then apply sequence but with a better performance because it doesn’t need the additional list to copy the values before apply map. In the case of Future is just sequence and then map. In Scalaz anything can be traversable if there’s evidence that it’s an Applicative Functor Foldable as defined in The Essence of the Iterator pattern paper. And more about the implementation in Scalaz here.

The tail-recursive technique is a naive solution that we can replace by calling traverse to apply only one of the rules because it’s going to fail-fast:

// This will return -\/(Equals five) as in the tail-recursive solution
NumberRules traverseU (_.handle(DemoState(5)))

The problem with working with the monadic fail-fast technique and disjunction is that we’ll have the expected value in the left side. One thing that we can probably do is to invoke swap once we get the value to flip sides between left and right. I defined an implicit function to do this for any given List of Rules:

implicit class RuleListHandler[A, B](list: List[Rule[A, B]]) {
  def applyOnlyOneTraverse(request: B): List[B] \/ A = {
    list traverseU (_.handle(request)) swap

// This will return \/-(Greater than five)
NumberRules applyOnlyOneTraverse DemoState(8)

In Scalaz you will find the function traverse and traverseU (the same with sequence). The difference is that traverseU can derive the type of the Applicative by using unapply whereas for traverse you need to specify the type.

As always, an example project showing different combinations can be found on GitHub.

Until the next post!

Scalaz Streams: Parallel computing and better testing

Hi there,

I finally made some time to write this post after being back home for holidays and switching teams in my current job. I’ve been working in this new project since February where we are widely using Scalaz Streams, which gave me the possibility to learn a lot (thanks Fred!).

Last year I wrote about designing a full project using Akka Streams being able to test every piece of the stream as a partial flow graph. Now I’ll talk about doing something similar using Scalaz Streams which has the advantage of being always in control of the concurrency and parallelism. If you want to see a comparison between them, check out the good Adam Warski’s post about it… So let’s start by defining a case!

Definition of the case

We need to consume messages of Orders of Items from Kafka, persisting every order to Cassandra, update the item prices by applying a margination profit and publish the updated order downstream to Rabbit MQ.

That’s the real case but for the sake of simplicity, in the following example I’m not going to use Kafka, Cassandra and Rabbit MQ but an in memory representation to simulate the same behavior.

So I started by drawing the streaming flow and how the different components are connected:


Looks pretty simple isn’t it? Well in this design every order has to be consumed from Kafka, be logged, persisted to Cassandra, updated by the pricer service and finally published to Rabbit MQ. In general, a processing overhead always occurs when I/O operations are involved, in this case the persisting phase. Hence parallelising the streaming between writing and reading is recommended. So this is how the graphic looks like after the flow has been split out in two:

With this design we can continuously consume and persisting orders and in parallel we can read, update and publish the orders downstream.

Okay, show me the code….

Fair enough! In Scalaz Streams a stream is represented as a Process[+F[_], +A] where F is the effect side and A the value side. The most general use case has a Task in place of F[_]. Then we have different type aliases of a Process that are more meaningful like Sink (used for side effects processing), Channel (effectful process that can send values and get back responses) and Writer (a Process[+F[_], +A] where A is a disjunction) among others. We also have an Exchange that represents the interconnection between two systems, a read side and a write side. Knowing about this components we can model the graphic in this way:

Note: If you are not so familiar with all this concepts I recommend you to read this user notes written by Devon Miller.

Kakfa      - consumer: Process[Task, Order]
Logger     - logger: Sink[Task, Order]
Cassandra  - storageEx: Exchange[Order, Order]
Pricer     - pricer: Channel[Task, Order, Order]
Rabbit MQ  - publisher: Sink[Task, Order])

Then we can connect every component as in the graphic and specify the parallel processing using

    consumer          observe   logger    to  storageEx.write,    through   pricer    to  publisher

The code above deserves a bit of explanation. I’m using that can merge N streams (Process[+F[_], +A]) in a nondeterministic way. This means that given stream1 and stream2 is going to take an element from the faster publisher. This means nondeterministic parallelization. It’s the way that the merge function works for Wye using two streams but more optimized (using nondeterminism.njoin under the hood).

This code is part of the PricerStream object. I’ve also created another process to generate random Orders to be consumed by the main process. All the source code for this demo is available on GitHub, check it out here!


Having the definition of the streaming flow isolated makes it easy to test the behavior in isolation. The flow method in the PricerStream object only defines how the components are connected and accepts each one of them as a parameter.  All we have to do is created an in memory representation of this components and invoke the flow. Then we can assert whether an order has been published or not.

For example, this is how we test the main flow defined above:

val (consumer, logger, storageEx, pricer, publisherEx, kafkaQ) = createStreams
val result = for {
  _           <-  eval(kafkaQ.enqueueOne(testOrder))
  _           <-  PricerStream.flow(consumer, logger, storageEx, pricer, publisherEx.write)
  update      <-
} yield {
  update.items foreach { item =>
    item.price should be (OrderTestGenerator.FixedUpdatedPrice)

Take a look at the source code to discover what’s going on here! The createStreams method is returning all the mocks that we need to simulate Kafka, Cassandra and RabbitMQ for instance. Then we create a stream in a for-comprehension style by publishing “testOrder” to the Kafka topic, invoking the Pricer Stream flow and finally asserting on the Order published downstream. This is quite simple once you understand how it works and how powerful it is!

Okay, that’s pretty much what I have today! There are a few points I would like to dig in deeper like benchmarks and performance that are actually beyond the topic of this post but hopefully I can write about it later on.

Recommended Resources

There are a lot of resources out there very useful. I’d like to point you specifically to the classic Daniel Spiewak’s talk and to the excellent Derek Chen Becker’s talk given at LambdaConf 2015. I’ve created a playground project with all the stuff I learned from this talks, check it out! And of course it’s always good to check the Additional Resources wiki page of FS2.

Until next post!