From Scalaz Streams to FS2

Hi everybody!

Last April I wrote this post about Scalaz Streams where I explained how we consume orders from Kafka, persist them to Cassandra, call a web service to update the prices and finally publish the result to RabbitMQ. And this was achieved by running two streams in parallel. Here’s the graphic to recap:

pricer_streams_parallel

Today I will introduce the same problem but solved with FS2: Functional Streams for Scala instead, the evolution of Scalaz Streams. If you are one of those impatient creatures here is the code¬†ūüėČ

MAIN DIFFERENCES

Our main flow was defined as follows:

def flow(consumer: ProcessT[Order],
           logger: Sink[Task, Order],
           storageEx: Exchange[Order, Order],
           pricer: Channel[Task, Order, Order],
           publisher: Sink[Task, Order])
          (implicit S: Strategy)= {
    merge.mergeN(
      Process(
        consumer          observe   logger    to  storageEx.write,
        storageEx.read    through   pricer    to  publisher
      )
    )
  }

And from this Scalaz Streams code we can name the changes in FS2:

  1. Process[F[_], +A] is now Stream[F[_], +A]
  2. Channel[F[_], +A, +B] is now Pipe[F[_], A, B] which is an alias type for Stream[F, A] => Stream[F, B]
  3. Sink[F[_], +A] is the same as Channel being the type of B Unit. Pipe[F[_], A, Unit]
  4. Exchange is not there anymore so we have to supply it using two streams.
  5. merge.mergeN is now fs2.concurrent.join

So this is how it looks the pricer flow with FS2:

def flow(consumer: StreamT[Order],
           logger: SinkT[Order],
           storage: OrderStorage,
           pricer: PipeT[Order, Order],
           publisher: SinkT[Order])(implicit S: fs2.Strategy) = {
    fs2.concurrent.join(2)(
      Stream(
        consumer      observe logger to storage.write,
        storage.read  through pricer to publisher
      )
    )

Note: All the functions are now defined with a generic effect type whereas before a lot of the functions were pretty tight to Scalaz Task. FS2 requires an implicit Async[F] where F is the effect type in order to run the stream. By default, an Async implementation for Task is provided implicitly.

TIMER, SCHEDULER: time package

One of the things that I improved in this new version is the Order Generator. Before it was a naive implementation using Task and blocking the thread for 2 seconds. Now it’s been implemented using the functions provided by the fs2.time package. Here’s the code:

  implicit val scheduler = fs2.Scheduler.fromFixedDaemonPool(2, "generator-scheduler")
  implicit val S         = fs2.Strategy.fromFixedDaemonPool(2, "generator-timer")

  private val defaultOrderGen: PipeT[Int, Order] = { orderIds =>
    val tickInterrupter = time.sleep[Task](11.seconds) ++ Stream(true)
    val orderTick       = time.awakeEvery[Task](2.seconds).interruptWhen(tickInterrupter)
    (orderIds zip orderTick) flatMap { case (id, t) =>
      val itemId    = Random.nextInt(500).toLong
      val itemPrice = Random.nextInt(10000).toDouble
      val newOrder  = Order(id.toLong, List(Item(itemId, s"laptop-$id", itemPrice)))
      Stream.emit(newOrder)
    }
  }

The function time.awakeEvery[Task] generates a Stream[Task, FiniteDuration]. And any Stream has the function interruptWhen(haltWhenTrue: Stream[F[_], Boolean] available to halt when the given stream of Boolean returns true. Based on this I used the function time.sleep[Task] wich generates a Stream[Task, Nothing] and append a Stream(true) to it. So after the given duration this combination will return what we need for the interrupter; a Stream[Task, Boolean].

QUEUES, SIGNALS AND TOPICS: async package

Another breaking change is the way queues, signals and topics, among others, are being generated. In Scalaz Stream they were tight to Task as the effect type whereas in FS2 you have to provide it. And the other difference you will notice is that on creation before you would get a Queue[A] whereas in FS2 you get a F[Queue[F, A]] as follows:

// Scalaz Streams
val oldQ: Queue[Int] = async.boundedQueue[Int](100) 
// FS2 
val newQ: Task[Queue[Task, Int]] = async.boundedQueue[Task, Int](100)

UTILS

The util functions channel.lift and sink.lift that I used in the first demo are no longer available so I created a FS2Utils file to provide similar functionality.

And finally one interesting function I’m using in the final combination¬†of the streams is mergeHaltBoth wich works as merge but it will halt when one of the streams halt.¬†In our case the Order Generator flow will halt after 11 seconds.

ADDITIONAL RESOURCES

I was inspired to write this post after watch the excellent series of videos published by Michael Pilquist and because at my work we are strongly using Scalaz Streams (maybe we migrate to FS2 sometime in the near future, I don’t know yet). Also the official guide and migration guide are really useful.

And again, the demo project for this post is available on GitHub. That’s all I have today hope you enjoyed it.

Until next post!
Gabriel.

Advertisements

Functional Chain of Responsibility pattern

Hi everyone!

If you are familiar with any OOP language you might have been used the Chain of Responsibility pattern¬†at least once. Today’s topic is about solving the same problem but in a functional way. Let’s see the alternatives we have.

OOP Style

Because it’s possible to write OOP and imperative code in Scala one can rewrite this pattern the same way you would do it in Java. This¬†is an example¬†I found on Gist. Obviously, you shouldn’t do it this way!

Partial Functions

In one of the posts of the amazing series The Neophyte’s Guide to Scala by Daniel Westheide he mentions the alternative to the Chain of Responsibility pattern by chaining partial functions using the orElse method. And following this alternative here’s a good example I found in the blog post¬†Design Patterns in Scala by Pavel Fatin.

This is the simplest way to solve it. However one of the cons is that you always need to specify a default partial function in case neither of the functions can be applied.

Let’s say that we want to apply only one of different rules and when we find the right one, we apply it and finish the flow by returning a value. This is how we would do it:

case class DemoState(number: Int)

type PFRule = PartialFunction[DemoState, Option[String]]

private def numberRule(f: Int => Boolean, result: String): PFRule = {
  case DemoState(n) if f(n) => Option(result)
}

val GreaterThanFiveRule = numberRule(_ > 5, "Greater than five")
val LessThanFiveRule    = numberRule(_ < 5, "Less than five")
val EqualsFiveRule      = numberRule(_ == 5, "Equals five")

val NumberRules = GreaterThanFiveRule orElse LessThanFiveRule orElse EqualsFiveRule

NumberRules(5) // It will return "Equals five"
NumberRules(1) // It will return "Less than five"
NumberRules(7) // It will return "Greater than five"

We can combine any PartialFunction of the same type using orElse. The order is very important. If you have rules than can be applied in different cases, the order will define the priority.

Tail Recursive technique

I’ve found one interesting solution using a tail-recursive approach and the rules defined as Either in this post. Let’s say that we have defined the same number rules in this way using Scalaz disjunction:

import scalaz._, Scalaz._

trait Rule[A, B] {
  def handle(request: B): A \/ B
}

type FiveRule = Rule[String, DemoState]

case object GreaterThanFiveRule extends FiveRule {
  override def handle(state: DemoState): String \/ DemoState =
    if (state.number > 5) "Greater than five".left
    else state.right
}

case object LessThanFiveRule extends FiveRule {
  override def handle(state: DemoState): String \/ DemoState =
    if (state.number < 5) "Less than five".left
    else state.right
}

case object EqualsFiveRule extends FiveRule {
  override def handle(state: DemoState): String \/ DemoState =
    if (state.number == 5) "Equals five".left
    else state.right
}

val NumberRules = List(GreaterThanFiveRule, LessThanFiveRule, EqualsFiveRule) 

And the tail-recursive technique to apply only one of the rules:

implicit class RuleListHandler[A, B](list: List[Rule[A, B]]) {
  def applyOnlyOneTailRec(request: B): A \/ B = {
    @tailrec
    def loop(req: B, rules: List[Rule[A, B]]): A \/ B = {
      if (rules.isEmpty) req.right
      else rules.head.handle(req) match {
        case -\/(value)   => value.left
        case \/-(handler) => loop(req, rules.tail)
      }
    }
    loop(request, list)
  }
}

Then we can apply only one rule by calling this function:

// This will return -\/(Equals five)
NumberRules applyOnlyOneTailRec (_.handle(DemoState(5)))

Applicative Functor Traverse

If your code base is mostly Scalaz, another option to apply the same logic is to use the Applicative method traverse that is defined this way:

trait Traverse[F[_]] extends Functor[F] with Foldable[F] { self =>
  def traverseImpl[G[_]:Applicative,A,B](fa: F[A])(f: A => G[B]): G[F[B]]
}

If you’ve ever used Scala Future you might have used both sequence and/or traverse methods:

val list: List[Future[Int]] = List(Future(1), Future(2))
val x: Future[List[Int]] = Future.sequence(list) map (l => l map (n => n+1))
val y: Future[List[Int]] = Future.traverse(list)(l => l map (n => n+1))

Traverse is the same to map over an Applicative and then apply sequence but with a better performance because it doesn’t need the additional list to copy the values before apply map. In the case of Future is just sequence and then map. In Scalaz anything can be traversable if there’s evidence that it’s an Applicative Functor Foldable as defined in The Essence of the Iterator pattern paper. And more about the implementation in Scalaz here.

The tail-recursive technique is a naive solution that we can replace by calling traverse to apply only one of the rules because it’s going to fail-fast:

// This will return -\/(Equals five) as in the tail-recursive solution
NumberRules traverseU (_.handle(DemoState(5)))

The problem with working with the monadic fail-fast technique and disjunction is that we’ll have the expected value in the left side. One thing that we can probably do is to invoke swap once we get the value to flip sides between left and right. I defined an implicit function to do this for any given List of Rules:

implicit class RuleListHandler[A, B](list: List[Rule[A, B]]) {
  def applyOnlyOneTraverse(request: B): List[B] \/ A = {
    list traverseU (_.handle(request)) swap
  }
}

// This will return \/-(Greater than five)
NumberRules applyOnlyOneTraverse DemoState(8)

In Scalaz you will find the function traverse and traverseU (the same with sequence). The difference is that traverseU can derive the type of the Applicative by using unapply whereas for traverse you need to specify the type.

As always, an example project showing different combinations can be found on GitHub.

Until the next post!
Gabriel.

Scalaz Streams: Parallel computing and better testing

Hi there,

I finally made some time to write this post after being back home for holidays and switching teams in my current job.¬†I’ve been working in this new project since February where we are widely using Scalaz Streams, which gave me the possibility to learn a lot (thanks Fred!).

Last year I wrote about designing a full project using Akka Streams being able to test every piece of the stream as a partial flow graph. Now I’ll talk about doing something similar using Scalaz Streams which has¬†the advantage of being always in control of the concurrency and parallelism. If you want to see a comparison between them, check out the good Adam Warski’s post about it… So let’s start by defining a case!

Definition of the case

We need to consume messages of Orders of Items from Kafka, persisting every order to Cassandra, update the item prices by applying a margination profit and publish the updated order downstream to Rabbit MQ.

That’s the real case but for the sake of simplicity, in the following example I’m not going to use Kafka, Cassandra and Rabbit MQ but an in memory representation to simulate the same behavior.

So I started by drawing the streaming flow and how the different components are connected:

pricer_streams

Looks pretty simple isn’t it? Well in this design every order has to be consumed from Kafka, be logged, persisted to Cassandra, updated by the pricer service and finally published to Rabbit MQ. In general, a processing overhead always¬†occurs when I/O operations are involved, in this case the persisting phase. Hence parallelising the streaming between writing and reading is recommended. So this is how the graphic looks like¬†after the flow has been split out in two:

pricer_streams_parallel
With this design we can continuously consume and persisting orders and in parallel we can read, update and publish the orders downstream.

Okay, show me the¬†code….

Fair enough! In Scalaz Streams a stream is represented as a Process[+F[_], +A] where F is the effect side and A the value side. The most general use case has a Task in place of F[_]. Then we have different type aliases of a Process that are more meaningful like Sink (used for side effects processing), Channel (effectful process that can send values and get back responses) and Writer (a Process[+F[_], +A] where A is a disjunction) among others. We also have an Exchange that represents the interconnection between two systems, a read side and a write side. Knowing about this components we can model the graphic in this way:

Note: If you are not so familiar with all this concepts I recommend you to read this user notes written by Devon Miller.

Kakfa      - consumer: Process[Task, Order]
Logger     - logger: Sink[Task, Order]
Cassandra  - storageEx: Exchange[Order, Order]
Pricer     - pricer: Channel[Task, Order, Order]
Rabbit MQ  - publisher: Sink[Task, Order])

Then we can connect every component as in the graphic and specify the parallel processing using scalaz.stream.merge.mergeN:

merge.mergeN(
  Process(
    consumer          observe   logger    to  storageEx.write,
    storageEx.read    through   pricer    to  publisher
  )
)

The code above deserves a bit of explanation. I’m using scalaz.stream.merge.mergeN that can merge N streams (Process[+F[_], +A]) in a nondeterministic way. This means that given stream1 and stream2 is going to take an element from the faster publisher. This means nondeterministic parallelization. It’s the way that the merge function works for Wye using two streams but more optimized (using nondeterminism.njoin under the hood).

This code is part of the PricerStream object. I’ve also created another process to generate random Orders to be consumed by the main process. All the source code for this demo is available on GitHub,¬†check it out here!

Testing

Having the definition of the streaming flow isolated makes it easy to test the behavior in isolation. The flow method in the PricerStream object only defines how the components are connected and accepts each one of them as a parameter.  All we have to do is created an in memory representation of this components and invoke the flow. Then we can assert whether an order has been published or not.

For example, this is how we test the main flow defined above:

val (consumer, logger, storageEx, pricer, publisherEx, kafkaQ) = createStreams
val result = for {
  _           <-  eval(kafkaQ.enqueueOne(testOrder))
  _           <-  PricerStream.flow(consumer, logger, storageEx, pricer, publisherEx.write)
  update      <-  publisherEx.read.take(1)
} yield {
  update.items foreach { item =>
    item.price should be (OrderTestGenerator.FixedUpdatedPrice)
  }
}
result.take(1).runLast.timed(3.seconds).run.get

Take a look at the source code to discover what’s going on here! The createStreams method is returning all the mocks that we need to simulate Kafka, Cassandra and RabbitMQ for instance. Then we create a stream in a for-comprehension style by publishing “testOrder” to the Kafka topic, invoking the Pricer Stream flow and finally asserting on the Order published downstream. This is quite simple once you understand how it works and how powerful it is!

Okay, that’s pretty much what I have today! There are a few points I would like to dig in deeper like benchmarks and performance that are actually beyond the topic of this post but hopefully I can write about it later on.

Recommended Resources

There are a lot of resources out there very useful. I’d like to point you specifically to the¬†classic¬†Daniel Spiewak’s talk¬†and to the excellent Derek Chen Becker’s talk given¬†at LambdaConf 2015. I’ve created a playground project¬†with all the stuff I learned from this talks, check it out! And of course it’s always good to check the Additional Resources wiki page of FS2.

Until next post!

Gabriel.

Cassandra Scala drivers

Hi there! After a long time of laziness I came back to write something about what I’ve done during this time of absence apart from my holidays. And BTW happy new year!

In the last year I’ve been working quite a lot with¬†Cassandra, always using the official Java Driver supported by DataStax, the company behind the database. And I remember that I’ve searched before for a Scala driver and reactive if it was possible and I found only one but not very good to me at that moment. The thing is¬†during this week I was challenged to create a program to basically query some of our Cassandra tables by¬†exposing¬†a¬†REST¬†API, so I started looking for a cool Scala driver.

RESEARCH RESULTS

Starting by simply type “scala cassandra driver” in Google these are the first four¬†results:
scala-driver-search

First a reactive type-safe driver, second a Scala wrapper of the official Java driver, third the official web site driver list and last a Stack Overflow topic about it.

WORKING WITH PHANTOM

So I decided to give Phantom a chance because it has a high activity on GitHub, it is an open source driver and I think the DSL is pretty good. However I found a few problems trying to start working with; a basic thing like getting a connection to the database using username/password authentication is not supported by the DSL. And after research for a while I found that a lot of good features are only present in the commercial version.

That’s why after getting the problem mentioned above I kept searching for more Scala drivers and summing up this is the list:

  • scala-cassandra: just a wrapper around the official Java driver. Last activity on GitHub 2 years ago.
  • cascal: one of the drivers mentioned¬†in the official DataStax driver¬†list¬†apart from Phantom. Last activity on GitHub 3 years ago.
  • cassie: developed by Twitter. Last activity on GitHub 3¬†years ago.
  • scqla: lack of design, no DSL. Last activity on GitHub 2 years ago.

As you can see all of this projects are no longer maintained, so once again I came back to Phantom trying to make it work.

PHANTOM SSL EXTENSION

After a few hours trying to get a connection using Phantom, including¬†opening an issue (hopefully I’ll get a response soon), I created a¬†project to share my workaround solution which I called phantom-ssl-extension, mainly intended to work with Java 8.

It consists on the creation of a CustomSessionProvider mixing Phantom with the official Java driver and a few util functions. I invite you to check the example out in order to see how it works and I hope you had enjoyed this short research post.

Until next time!
Gabriel.

Akka Stream 2.0-M1: Quick Update

Yesterday¬†was announced the first milestone of the second version of Akka Stream and Http. I wanted to give it a try so I updated the previous project¬†in order to work with this new version. You’ll find a new branch called “akka-stream-2.0-M1”.

UPDATED PROJECT

Functions partial and closed of FlowGraph are no longer existing. They were replaced by function create. As you can see for instance in EventInputFlow the change is very straight forward in partial cases.

One of the most significant changes I had to do was in the FlowTestKit where I was using FlowGraph.closed. And as I pointed above it was replaced by the function create. But in this case it’s mandatory to return a ClosedShape. With this new object it’s not possible to invoke the run() function because it’s not runnable like previously. To make it runnable it’s necessary to use RunnableGraph.fromGraph(closedShape) and then you can invoke the run() function.

All the changes introduced in the project can be seen in this comparison with the master branch.

MORE NEW CHANGES

Another big change that I’m not using in the demo project is the wrap() function. It was replaced by descriptive functions depending on the case. If you want to create a Flow from a FlowShape you should use Flow.fromGraph but if you need to create a Flow from a Sink and Source you should use¬†now the function Flow.fromSinkAndSource¬† or¬†Flow.fromSinkAndSourceMat.

If you were using FlexiMerge and FlexiRoute for custom stream processing you’ll notice that they have been¬†replaced by GraphStage.¬†Take a look at the migration guide from 1.0 to 2.x to see all the changes introduced in this first milestone¬†and if you can¬†give it a try!

FINAL THOUGHTS

It’s promising what is coming in this new Akka Stream 2 world. The API is becoming stronger, powerful and very handy. And talking about performance it’s worth to mention that the stream processing is faster now and it will be much faster in the upcoming¬†versions –¬†with the introduction of GraphStage it will be possible to execute multiple stream processing steps inside one actor, reducing the numbers of thread-hops¬†-. Other¬†libraries such as¬†Akka Http are getting improvements too thanks to Akka Stream joint efforts.

Until next post!
Gabriel.

Akka Cluster 2.4

In the last month I’ve been working with Akka Cluster 2.3 and now we migrated to the version 2.4. We chose Cassandra for persistence.

In this post I’ll try to explain what I learned and the main feature of the version 2.4 that we use: Shard rebalancing (2.3) and Remember entities.

DEFINITION

A cluster is just a group of nodes, being a node just a logical member with it’s own Actor System. So for instance the following image shows a cluster of two nodes:

cluster

DEMO PROJECT

I created a simple demo project that demonstrates the use of the remember entities feature, for me one of the most attractive in the version 2.4.

Note: Please use Apache Cassandra 2.0.16 or 2.1.6 or higher to avoid this bug. Otherwise is not going to work properly.

In this project we have an Entity Actor that is a persistence actor sharded in the cluster and a Message Generator (simple Actor) that generates two messages every minute (so we have time to shutdown nodes and check the behavior). I named the two nodes Dc1 and Dc2. They are configured as seed-nodes. The seed nodes are part of the cluster from the very beginning (initial state of the cluster) being the oldest one who becomes the Leader. Later on more nodes can join the cluster. They only need to know at least one of the seed nodes. Read more about joining nodes here.

There are two good features to test in this simple demo: Shard rebalancing and Remember entities. Both cases happen when nodes become Up and / or Down. So let’s start with the first case!

SHARD REBALANCING

First we have to start Dc1Cluster App and wait for the first two messages. I added some ‘println’ on the functions preStart and postStop of the EntityActor to see when they are called. We will see that preStart is called after¬†Dc1 starts successfully and then it consumes the two messages. So now is the moment to start Dc2. Once it’s started we will see that postStop is called on Dc1 but preStart is not called on Dc2 until a new message is received. Now if we wait to receive two more messages we will see that one message is received by Dc1 and the other one by Dc2. This means that the rebalancing is working well and the remember entities feature is not activated as in version 2.3.

Now we can shutdown Dc2 or Dc1. In the first case postStop will be called in Dc2 and preStart in Dc1 and later on Dc1 will receive the two messages. But maybe is more interesting to shutdown Dc1 and see a similar behavior with the addition that Dc2 will become the leader.

Take a look at the commented logs for DC1 and DC2 for this case of remember entities off.

REMEMBER ENTITIES

This feature is similar to the shard rebalancing case being the main difference that the EntityActor is restarted automatically in a rebalancing scenario. Without this function the preStart in the node will be called only in the case of a new incoming message. The property is configured in the application.conf as akka.cluster.sharding.remember-entities with possible values on/off. To test it follow the previous case by turning on this property and by comparing the behavior. You will see that preStart is called immediately after see postStop in the other node. Here are the docs.

SHUTTING DOWN A NODE

When I say to shutdown a node I mean to kill the process. In this case the property akka.cluster.auto-down-unreachable-after plays it’s role. In the project it is configured to 10 seconds. This means that after that time the leader will remove the down node from the cluster and the shard rebalancing will happen.

But in this new version of Akka we can perform a Graceful Shutdown. In the project I created a MBean that exposes a JMX resource that we can call later to shutdown the node gracefully. The easiest way to do it is by using JConsole and click the button “leaveClusterAndShutdown”. In this case the shard rebalancing will happen first and then the down node will be removed from the cluster.

Java Monitoring & Management Console_024

See the logs for the case of a Graceful Shutdown combined with Remember Entities ON.

WORKING ACROSS TWO MACHINES

All the projects that I found on the web are simple examples starting all the nodes in the same machine. But I’m pretty sure that you want to try using different machines. For this purpose you will need to configure akka.cluster.seed-nodes and akka.remote.netty.tcp properly. After that you can get the same basic example working across a real network. For instance if you want to start Node 1 in Machine 1 with IP 192.168.7.1 and Node 2 in Machine 2 with IP 192.168.7.2 (see the image below), having both machines on the same network, this would be the configuration for the Node 1:

cluster-machines

akka {
  remote = {
  enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "192.168.7.1"
      port = 2551
      bind-hostname = "192.168.7.1"
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://KlasterSystem@192.168.7.1:2551",
      "akka.tcp://KlasterSystem@192.168.7.2:2551"]
  }
}

And for Node 2 you only need to change the IP address in akka.remote.netty.tcp.

CONCLUSION

It’s very interesting what you can achieve by using Akka actors sharded accross the network. You can scale up and out easily. In the next weeks we are going live with this new feature and I’m really excited!

Until next post!
Gabriel.

HTTP API on top of Scalaz Streams

In the last days I’ve been playing with http4s which is defined as a minimal, idiomatic Scala interface for HTTP.

It’s a powerful library, type safe, composable and asynchronous. And it supports different servers like Blaze, Jetty and Tomcat.

Although the project remains a lot of work it’s always good to give it a try.

CREATING THE FIRST HTTP SERVICE

I started creating a basic service that returns a simple string when accessing the root level (localhost:8080/). This is how it looks:

object HomeService {

  def apply(): HttpService = service

  private val service = HttpService {
    case GET -> Root =>
      Ok("Http4s API")
  }

}

And here is the main application that runs the Blaze server:

object Api extends App {

  BlazeBuilder.bindHttp(8080)
    .mountService(HomeService(), "/")
    .run
    .awaitShutdown()

}

Fair enough to get a server running and serving a GET resource. Until here we have the “hello world” example of an HTTP service. After this we can create more services and add them to the server by invoking the mountService function of the server builder.

What I did was to create two similar services for Products and Users serving just mocking data. However the main difference is that the ProductService serves Json data using Play Json and the UserService exposes Json data using Circe.

PRODUCT SERVICE

This is the code for one of the GET resources for the ProductService:

case GET -> Root =>
  val products = List(Product(1, "Book"), Product(2, "Calc"), Product(3, "Guitar"))
  Ok(Json.toJson(products))

It’s very handy. However to get this code working you need some implicit values in the scope:

  • The Writer for the Play Json library.
  • The EntityEncoder[T] for http4s.

To accomplish this requirements I created the following object that is imported in the ProductService:

object PlayJsonImplicits {

  implicit val playJsonEncoder: EntityEncoder[JsValue] =
    EntityEncoder
      .stringEncoder(Charset.`UTF-8`)
      .contramap { json: JsValue => json.toString() }
      .withContentType(`Content-Type`(MediaType.`application/json`, Charset.`UTF-8`))

  implicit val productJsonFormat = Json.format[Product]

}

USER SERVICE

This is the code for one of the GET resources of the UserService:

case GET -> Root / id =>
  Ok(User(id.toLong, s"User$id", s"user$id@mail.com").asJson)

And here it happens something similar to the service above. In this case we need to import the Circe implicit values and to provide an EntityEncoder[T]. This is how it looks:

import io.circe.generic.auto._
import io.circe.syntax._
object CirceImplicits {

  implicit val circeJsonEncoder: EntityEncoder[CirceJson] =
    EntityEncoder
      .stringEncoder(Charset.`UTF-8`)
      .contramap { json: CirceJson => json.noSpaces }
      .withContentType(`Content-Type`(MediaType.`application/json`, Charset.`UTF-8`))

}

Until here we have a few services serving Json data but the most attracting feature of this library is the streaming one. So let’s move on some examples.

STREAMING DATA

Http4s was built on top of Scalaz Streams and every Request is transformed into an asynchronous Scalaz Task[Response]. This means that you can use any function that returns an async Task as a Http Response using the helpers provided by http4s.

Here we have an example extracted from the StreamingService:

private val service = HttpService {
  case GET -> Root =>
    val streamingData = Process.emit(s"Starting stream intervals\n\n") ++ dataStream(10)
    Ok(streamingData).chunked
}

private def dataStream(n: Int): Process[Task, String] = {
  implicit def defaultScheduler = DefaultTimeoutScheduler
  val interval = 100.millis
  time.awakeEvery(interval)
    .map(_ => s"Current system time: ${System.currentTimeMillis()} ms\n")
    .take(n)
}

To send the chunked response we need to add the Transfer Encoding header as below. What I did was to create an implicit class with a “chunked” function to make thinks easier.

response.putHeaders(`Transfer-Encoding`(TransferCoding.chunked))

And that’s it! Find out more and what is possible to do with Scalaz Streams by taking a look at the examples.

Another cool feature of this library is the Web Sockets support, but I’m not covering this topic now. However you’ll find a very basic example of WS connection in the sample project linked below. And maybe you want to take a look at this demo too.

TEST COVERAGE

As always this development stage is for me the most important. That’s why all the services are fully tested with a test coverage close to ~100% (actually Coveralls has some bugs and it’s showing only 92% but take a look at the coverage report! If you run “sbt clean coverage test” the coverage report shows 97.78%).

This is how it looks one of the unit tests for the ProductService:

"Get the list of products" in {
  val request = new Request()
  val response = service.run(request).run

  response.status should be (Status.Ok)
  val expected = """ [{"id":1,"name":"Book"},{"id":2,"name":"Guitar"}] """.trim
  response.body.asString should be (expected)
}

We are creating a Request and running the ProductService to get the response. Then we have assertions for the Status and the Body.

SAMPLE PROJECT

Check out the complete project on Github!.

Important Note: Only runs under Java 8.

Find out more examples in the official http4s ExampleService.

CONCLUSION

At this moment the documentation it’s a bit poor but I hope to find a better one in the future and many other improvements. Nevertheless I know the guys are working really hard on this powerful library.

Well, this was just a quick research, you are always invited to go deeper and deeper!

Until next post!
Gabriel.