Cassandra Scala drivers

Hi there! After a long time of laziness I came back to write something about what I’ve done during this time of absence apart from my holidays. And BTW happy new year!

In the last year I’ve been working quite a lot with Cassandra, always using the official Java Driver supported by DataStax, the company behind the database. And I remember that I’ve searched before for a Scala driver and reactive if it was possible and I found only one but not very good to me at that moment. The thing is during this week I was challenged to create a program to basically query some of our Cassandra tables by exposing a REST API, so I started looking for a cool Scala driver.

RESEARCH RESULTS

Starting by simply type “scala cassandra driver” in Google these are the first four results:
scala-driver-search

First a reactive type-safe driver, second a Scala wrapper of the official Java driver, third the official web site driver list and last a Stack Overflow topic about it.

WORKING WITH PHANTOM

So I decided to give Phantom a chance because it has a high activity on GitHub, it is an open source driver and I think the DSL is pretty good. However I found a few problems trying to start working with; a basic thing like getting a connection to the database using username/password authentication is not supported by the DSL. And after research for a while I found that a lot of good features are only present in the commercial version.

That’s why after getting the problem mentioned above I kept searching for more Scala drivers and summing up this is the list:

  • scala-cassandra: just a wrapper around the official Java driver. Last activity on GitHub 2 years ago.
  • cascal: one of the drivers mentioned in the official DataStax driver list apart from Phantom. Last activity on GitHub 3 years ago.
  • cassie: developed by Twitter. Last activity on GitHub 3 years ago.
  • scqla: lack of design, no DSL. Last activity on GitHub 2 years ago.

As you can see all of this projects are no longer maintained, so once again I came back to Phantom trying to make it work.

PHANTOM SSL EXTENSION

After a few hours trying to get a connection using Phantom, including opening an issue (hopefully I’ll get a response soon), I created a project to share my workaround solution which I called phantom-ssl-extension, mainly intended to work with Java 8.

It consists on the creation of a CustomSessionProvider mixing Phantom with the official Java driver and a few util functions. I invite you to check the example out in order to see how it works and I hope you had enjoyed this short research post.

Until next time!
Gabriel.

Akka Stream 2.0-M1: Quick Update

Yesterday was announced the first milestone of the second version of Akka Stream and Http. I wanted to give it a try so I updated the previous project in order to work with this new version. You’ll find a new branch called “akka-stream-2.0-M1”.

UPDATED PROJECT

Functions partial and closed of FlowGraph are no longer existing. They were replaced by function create. As you can see for instance in EventInputFlow the change is very straight forward in partial cases.

One of the most significant changes I had to do was in the FlowTestKit where I was using FlowGraph.closed. And as I pointed above it was replaced by the function create. But in this case it’s mandatory to return a ClosedShape. With this new object it’s not possible to invoke the run() function because it’s not runnable like previously. To make it runnable it’s necessary to use RunnableGraph.fromGraph(closedShape) and then you can invoke the run() function.

All the changes introduced in the project can be seen in this comparison with the master branch.

MORE NEW CHANGES

Another big change that I’m not using in the demo project is the wrap() function. It was replaced by descriptive functions depending on the case. If you want to create a Flow from a FlowShape you should use Flow.fromGraph but if you need to create a Flow from a Sink and Source you should use now the function Flow.fromSinkAndSource  or Flow.fromSinkAndSourceMat.

If you were using FlexiMerge and FlexiRoute for custom stream processing you’ll notice that they have been replaced by GraphStage. Take a look at the migration guide from 1.0 to 2.x to see all the changes introduced in this first milestone and if you can give it a try!

FINAL THOUGHTS

It’s promising what is coming in this new Akka Stream 2 world. The API is becoming stronger, powerful and very handy. And talking about performance it’s worth to mention that the stream processing is faster now and it will be much faster in the upcoming versions – with the introduction of GraphStage it will be possible to execute multiple stream processing steps inside one actor, reducing the numbers of thread-hops -. Other libraries such as Akka Http are getting improvements too thanks to Akka Stream joint efforts.

Until next post!
Gabriel.

Akka Cluster 2.4

In the last month I’ve been working with Akka Cluster 2.3 and now we migrated to the version 2.4. We chose Cassandra for persistence.

In this post I’ll try to explain what I learned and the main feature of the version 2.4 that we use: Shard rebalancing (2.3) and Remember entities.

DEFINITION

A cluster is just a group of nodes, being a node just a logical member with it’s own Actor System. So for instance the following image shows a cluster of two nodes:

cluster

DEMO PROJECT

I created a simple demo project that demonstrates the use of the remember entities feature, for me one of the most attractive in the version 2.4.

Note: Please use Apache Cassandra 2.0.16 or 2.1.6 or higher to avoid this bug. Otherwise is not going to work properly.

In this project we have an Entity Actor that is a persistence actor sharded in the cluster and a Message Generator (simple Actor) that generates two messages every minute (so we have time to shutdown nodes and check the behavior). I named the two nodes Dc1 and Dc2. They are configured as seed-nodes. The seed nodes are part of the cluster from the very beginning (initial state of the cluster) being the oldest one who becomes the Leader. Later on more nodes can join the cluster. They only need to know at least one of the seed nodes. Read more about joining nodes here.

There are two good features to test in this simple demo: Shard rebalancing and Remember entities. Both cases happen when nodes become Up and / or Down. So let’s start with the first case!

SHARD REBALANCING

First we have to start Dc1Cluster App and wait for the first two messages. I added some ‘println’ on the functions preStart and postStop of the EntityActor to see when they are called. We will see that preStart is called after Dc1 starts successfully and then it consumes the two messages. So now is the moment to start Dc2. Once it’s started we will see that postStop is called on Dc1 but preStart is not called on Dc2 until a new message is received. Now if we wait to receive two more messages we will see that one message is received by Dc1 and the other one by Dc2. This means that the rebalancing is working well and the remember entities feature is not activated as in version 2.3.

Now we can shutdown Dc2 or Dc1. In the first case postStop will be called in Dc2 and preStart in Dc1 and later on Dc1 will receive the two messages. But maybe is more interesting to shutdown Dc1 and see a similar behavior with the addition that Dc2 will become the leader.

Take a look at the commented logs for DC1 and DC2 for this case of remember entities off.

REMEMBER ENTITIES

This feature is similar to the shard rebalancing case being the main difference that the EntityActor is restarted automatically in a rebalancing scenario. Without this function the preStart in the node will be called only in the case of a new incoming message. The property is configured in the application.conf as akka.cluster.sharding.remember-entities with possible values on/off. To test it follow the previous case by turning on this property and by comparing the behavior. You will see that preStart is called immediately after see postStop in the other node. Here are the docs.

SHUTTING DOWN A NODE

When I say to shutdown a node I mean to kill the process. In this case the property akka.cluster.auto-down-unreachable-after plays it’s role. In the project it is configured to 10 seconds. This means that after that time the leader will remove the down node from the cluster and the shard rebalancing will happen.

But in this new version of Akka we can perform a Graceful Shutdown. In the project I created a MBean that exposes a JMX resource that we can call later to shutdown the node gracefully. The easiest way to do it is by using JConsole and click the button “leaveClusterAndShutdown”. In this case the shard rebalancing will happen first and then the down node will be removed from the cluster.

Java Monitoring & Management Console_024

See the logs for the case of a Graceful Shutdown combined with Remember Entities ON.

WORKING ACROSS TWO MACHINES

All the projects that I found on the web are simple examples starting all the nodes in the same machine. But I’m pretty sure that you want to try using different machines. For this purpose you will need to configure akka.cluster.seed-nodes and akka.remote.netty.tcp properly. After that you can get the same basic example working across a real network. For instance if you want to start Node 1 in Machine 1 with IP 192.168.7.1 and Node 2 in Machine 2 with IP 192.168.7.2 (see the image below), having both machines on the same network, this would be the configuration for the Node 1:

cluster-machines

akka {
  remote = {
  enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "192.168.7.1"
      port = 2551
      bind-hostname = "192.168.7.1"
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://KlasterSystem@192.168.7.1:2551",
      "akka.tcp://KlasterSystem@192.168.7.2:2551"]
  }
}

And for Node 2 you only need to change the IP address in akka.remote.netty.tcp.

CONCLUSION

It’s very interesting what you can achieve by using Akka actors sharded accross the network. You can scale up and out easily. In the next weeks we are going live with this new feature and I’m really excited!

Until next post!
Gabriel.

HTTP API on top of Scalaz Streams

In the last days I’ve been playing with http4s which is defined as a minimal, idiomatic Scala interface for HTTP.

It’s a powerful library, type safe, composable and asynchronous. And it supports different servers like Blaze, Jetty and Tomcat.

Although the project remains a lot of work it’s always good to give it a try.

CREATING THE FIRST HTTP SERVICE

I started creating a basic service that returns a simple string when accessing the root level (localhost:8080/). This is how it looks:

object HomeService {

  def apply(): HttpService = service

  private val service = HttpService {
    case GET -> Root =>
      Ok("Http4s API")
  }

}

And here is the main application that runs the Blaze server:

object Api extends App {

  BlazeBuilder.bindHttp(8080)
    .mountService(HomeService(), "/")
    .run
    .awaitShutdown()

}

Fair enough to get a server running and serving a GET resource. Until here we have the “hello world” example of an HTTP service. After this we can create more services and add them to the server by invoking the mountService function of the server builder.

What I did was to create two similar services for Products and Users serving just mocking data. However the main difference is that the ProductService serves Json data using Play Json and the UserService exposes Json data using Circe.

PRODUCT SERVICE

This is the code for one of the GET resources for the ProductService:

case GET -> Root =>
  val products = List(Product(1, "Book"), Product(2, "Calc"), Product(3, "Guitar"))
  Ok(Json.toJson(products))

It’s very handy. However to get this code working you need some implicit values in the scope:

  • The Writer for the Play Json library.
  • The EntityEncoder[T] for http4s.

To accomplish this requirements I created the following object that is imported in the ProductService:

object PlayJsonImplicits {

  implicit val playJsonEncoder: EntityEncoder[JsValue] =
    EntityEncoder
      .stringEncoder(Charset.`UTF-8`)
      .contramap { json: JsValue => json.toString() }
      .withContentType(`Content-Type`(MediaType.`application/json`, Charset.`UTF-8`))

  implicit val productJsonFormat = Json.format[Product]

}

USER SERVICE

This is the code for one of the GET resources of the UserService:

case GET -> Root / id =>
  Ok(User(id.toLong, s"User$id", s"user$id@mail.com").asJson)

And here it happens something similar to the service above. In this case we need to import the Circe implicit values and to provide an EntityEncoder[T]. This is how it looks:

import io.circe.generic.auto._
import io.circe.syntax._
object CirceImplicits {

  implicit val circeJsonEncoder: EntityEncoder[CirceJson] =
    EntityEncoder
      .stringEncoder(Charset.`UTF-8`)
      .contramap { json: CirceJson => json.noSpaces }
      .withContentType(`Content-Type`(MediaType.`application/json`, Charset.`UTF-8`))

}

Until here we have a few services serving Json data but the most attracting feature of this library is the streaming one. So let’s move on some examples.

STREAMING DATA

Http4s was built on top of Scalaz Streams and every Request is transformed into an asynchronous Scalaz Task[Response]. This means that you can use any function that returns an async Task as a Http Response using the helpers provided by http4s.

Here we have an example extracted from the StreamingService:

private val service = HttpService {
  case GET -> Root =>
    val streamingData = Process.emit(s"Starting stream intervals\n\n") ++ dataStream(10)
    Ok(streamingData).chunked
}

private def dataStream(n: Int): Process[Task, String] = {
  implicit def defaultScheduler = DefaultTimeoutScheduler
  val interval = 100.millis
  time.awakeEvery(interval)
    .map(_ => s"Current system time: ${System.currentTimeMillis()} ms\n")
    .take(n)
}

To send the chunked response we need to add the Transfer Encoding header as below. What I did was to create an implicit class with a “chunked” function to make thinks easier.

response.putHeaders(`Transfer-Encoding`(TransferCoding.chunked))

And that’s it! Find out more and what is possible to do with Scalaz Streams by taking a look at the examples.

Another cool feature of this library is the Web Sockets support, but I’m not covering this topic now. However you’ll find a very basic example of WS connection in the sample project linked below. And maybe you want to take a look at this demo too.

TEST COVERAGE

As always this development stage is for me the most important. That’s why all the services are fully tested with a test coverage close to ~100% (actually Coveralls has some bugs and it’s showing only 92% but take a look at the coverage report! If you run “sbt clean coverage test” the coverage report shows 97.78%).

This is how it looks one of the unit tests for the ProductService:

"Get the list of products" in {
  val request = new Request()
  val response = service.run(request).run

  response.status should be (Status.Ok)
  val expected = """ [{"id":1,"name":"Book"},{"id":2,"name":"Guitar"}] """.trim
  response.body.asString should be (expected)
}

We are creating a Request and running the ProductService to get the response. Then we have assertions for the Status and the Body.

SAMPLE PROJECT

Check out the complete project on Github!.

Important Note: Only runs under Java 8.

Find out more examples in the official http4s ExampleService.

CONCLUSION

At this moment the documentation it’s a bit poor but I hope to find a better one in the future and many other improvements. Nevertheless I know the guys are working really hard on this powerful library.

Well, this was just a quick research, you are always invited to go deeper and deeper!

Until next post!
Gabriel.

Akka Streams: Designing a full project

In the previous post I talked about how we use Akka Streams as a replacement for the Spring Integration behavior. Now I’ll show you how we solve some problems and remove boilerplate code by extracting common design patterns. In order to start, I created a demonstration project that you can find on Github.

The first thing we did was to create an utils object extracting the code to create all the common Partial Flow Graphs. And we got the object PartialFlowGraphUtils.

EXTRACTING COMMON FUNCTIONS

We said before that every time we have a Filter we need a Broadcast of two outputs (in our case, because we are replacing the behavior of the Spring Integration Filter that has a Discard Channel), so we defined a method to create the FanOutShape as a Partial Flow Graph:

def filterPartialFlow(filterFunction: FlowMessage => Boolean) = FlowGraph.partial() { implicit b =>
  val bcast = b.add(Broadcast[FlowMessage](2))
  val filter = b.add(Flow[FlowMessage] filter (filterFunction(_)))
  val notFilter = b.add(Flow[FlowMessage] filter (!filterFunction(_)))

  bcast ~> filter
  bcast ~> notFilter

  UniformFanOutShape(bcast.in, filter.outlet, notFilter.outlet)
}

Also we defined a function to create a FlowShape:

def partialFlow(function: FlowMessage => FlowMessage) = Flow[FlowMessage] map (function(_))

And finally a FlowShape with a function that adds Headers to the FlowMessage:

def partialFlowWithHeader(header: MessageHeader) = partialFlow(fm => addHeader(fm, header))

def addHeader(message: FlowMessage, header: MessageHeader): FlowMessage = {
  val headers = message.headers + (header.key, header.value)
  message.copy(headers, message.event)
}

DEFINING EVERY PARTIAL FLOW

Once we had the Utils object we created a Trait for every partial flow. This is how it looks the EventInputFlow:

trait EventInputFlow {

  this: EventTypeFilteredFlow =>

  lazy val eventInputFlow = FlowGraph.partial() { implicit b =>
    val headersProcess = b.add(partialFlowWithHeader(MessageHeader("starting", System.currentTimeMillis())))

    val eventTypeFilterFlow = b.add(filterPartialFlowGraph(_.event.`type` == "TENNIS"))
    val headersFilterFlow = b.add(filterPartialFlowGraph(_.headers.contains("MatchSession")))
    val eventTypeFiltered = b.add(eventTypeFilteredFlow)

    headersProcess ~> eventTypeFilterFlow
                      eventTypeFilterFlow.out(0) ~> headersFilterFlow
                      eventTypeFilterFlow.out(1) ~> eventTypeFiltered

    UniformFanOutShape(headersProcess.inlet, headersFilterFlow.out(0), headersFilterFlow.out(1), eventTypeFiltered.outlet)
  }.named("eventInputFlow")

}

Can you see the differences regarding the last definition of this flow? Clearly the code is much better now.

The others flows have a similar design to the EventInputFlow (just take a look at the project). And as you can see here, the EventInputFlow depends on EventTypeFilteredFlow (indicated by the self type reference this: Type =>), so we need to provide it. It’s kinda a Thin Cake Pattern. The responsible for the dependency injection that put all the pieces together is the EventPipelineFlow:

trait EventPipelineFlow extends EventInputFlow
                        with HeadersValidationFlow
                        with EventTypeFilteredFlow
                        with EventProcessorFlow {

  lazy val eventPipelineFlow = FlowGraph.partial() { implicit b =>
    val pipeline = b.add(partialEventPipeline)
    pipeline.out(1) ~> Sink.ignore
    pipeline.out(2) ~> Sink.ignore

    FlowShape(pipeline.in, pipeline.out(0))
  }.named("eventPipelineFlow")

  lazy val partialEventPipeline = FlowGraph.partial() { implicit b =>
    val eventInput = b.add(eventInputFlow)
    val headersValidation = b.add(headersValidationFlow)
    val processorMerge = b.add(Merge[FlowMessage](2))
    val eventProcessor = b.add(eventProcessorFlow)

    eventInput.out(0) ~> processorMerge
    eventInput.out(1) ~> headersValidation ~> processorMerge
    processorMerge ~> eventProcessor

    UniformFanOutShape(eventInput.in, eventProcessor.out(0), eventInput.out(2), eventProcessor.out(1))
  }.named("partialEventPipeline")

}

MATERIALIZING AND STREAMING!

Now that we defined all the flows we need to connect a Source and a Sink to the Pipeline. This is done in the main class StreamsApp where the blueprint of the stream is  materialized. In our case, we are using an ActorRef as a Source that only accept messages of type FlowMessage and a Sink that returns a future when the streaming finishes.

val actorSource: Source[FlowMessage, ActorRef] = Source.actorRef[FlowMessage](1000, OverflowStrategy.fail)
val pipelineActor: ActorRef = source.via(eventPipelineFlow).to(Sink.ignore).run()

pipelineActor ! message
pipelineActor ! PoisonPill

ERROR HANDLING

The stream can be completed successfully by sending a Status.Success or PoisonPill message to the pipelineActor. And it can be completed with failure by sending Status.Failure message. But what happens if there’s an exception while executing? Well, the actor will be stopped and the stream will be finished with a failure. As you can read in the documentation, the actor will be stopped when the stream is completed, failed or canceled from downstream.

One solution could be to create a Guardian Actor as a watcher of the Pipeline Actor and get notified when an exception occurs. But even better, Akka Stream provides a Supervision Strategy that you can use globally when the ActorMaterializer is created by passing an ActorMaterializerSettings or you can define a Supervision Strategy individually for each flow, source or sink. The error handling strategies are inspired by actor supervision strategies, so you’ll find this as a known pattern if you’ve been working with the Actor Model. We chose the restart strategy that drop the message that caused the error and create a new streaming.

val decider = ActorAttributes.supervisionStrategy(Supervision.restartingDecider)
val pipelineActor = source.via(eventPipelineFlow.withAttributes(decider)).to(Sink.ignore).run()

You can see how we tested the Resilient to Failures of the streaming using supervision strategies.

TESTING

Now the project is more readable and maintainable, but we are missing a very important part: the Testing Phase. But don’t worry, we were working and designing our code always thinking in an easy way to test it and now I’m going to explain how we achieve this.

Our focus is to test every partial flow as a “black box” and finally test the whole streaming. Akka Stream provides a nice test kit and it’s quite simple. At this moment the documentation hasn’t too much examples but you always can take a deeper look into the code.

IMPORTANT NOTE:
As you can see in the first post’s graphics and code we are defining a Sink.ignore within the EventInputFlow and the EventProcessorFlow. Well, this was a BIG MISTAKE. Event though it works, it’s impossible to test it as a black box with the given design. What we need is to expose the final output before connecting it to the Sink in order to be able to test it. That’s what we did and you maybe already noticed about the changes in the code.

Also we figured out that it’s a Bad Smell in Code when you need two or more inputs and two or more outputs at the same time. In this case, what you need to do is to split the partial flow in two or more smaller partial flows. That was the case of the EventProcessorFlow. Now it has one input and two outputs because we removed the input merge. Now the merge is happening withing the EventPipelineFlow.

Now let’s start testing the input and the three outputs of the EventInputFlow (that’s what I mean with “black box testing”). First of all we created a base class that every Spec will extends. It looks like this:

class StreamFlowSpec extends TestKit(ActorSystem("StreamFlowSpec"))
                     with WordSpecLike
                     with Matchers
                     with BeforeAndAfterAll {

  implicit val materializer = ActorMaterializer()

  def collector = genericCollector[FlowMessage]
  private def genericCollector[T]: Sink[T, Future[T]] = Flow[T].toMat(Sink.head)(Keep.right)

  //... COMMON TEST CODE ...

}

Then we have an EventInputFlowSpec that extends the base class and defines the proper unit tests:

class EventInputFlowSpec extends StreamFlowSpec {

  object EventInputMock extends EventInputFlow with EventTypeFilteredFlow

  private def flowGraph(message: FlowMessage) = FlowGraph.closed(collector, collector, collector)((_, _, _)) { implicit b => (out0, out1, out2) =>
    val eif = b.add(EventInputMock.eventInputFlow)
    Source.single(message) ~> eif
                              eif.out(0) ~> out0
                              eif.out(1) ~> out1
                              eif.out(2) ~> out2
  }.run()

  "Event Input Flow" should {

    val sessionHeaders = Map("MatchSession" -> 5426)

    "Have messages in the filter output" in withMessage(sessionHeaders) { message =>

      val (filterOut, notFilterOut, suppressedOut) = flowGraph(message)

      val result = Await.result(filterOut, 1000.millis)

      result.headers should contain key ("starting")

      // Should be an Empty stream
      intercept[NoSuchElementException] {
        Await.result(notFilterOut, 1000.millis)
      }

      intercept[NoSuchElementException] {
        Await.result(suppressedOut, 1000.millis)
      }
    }
  }
}

In the first line we are defining an object that mixes every trait that we need to test. Then we are creating a Runnable Graph (using FlowGraph.closed) that will return one future for every output. That’s why we are passing the collector function defined in the base class as a parameter that it’s actually a Sink.head[T]. And finally we defined the assertions. When an output doesn’t receive any message it throws a NoSuchElementException(“Empty stream”) if we are waiting for a value. In this case we are intercepting the exception to prove it.

The tests for the other flows are similar to this one, so I’m sure you’ll understand it after explaining this case. Just take a look to the project!

Finally, after test every flow, the last thing to do is to test the complete streaming. In this case we have the EventPipelineFlowSpec:

class EventPipelineFlowSpec extends StreamFlowSpec {

  object EventPipelineMock extends EventPipelineFlow

  private def flowGraph(message: FlowMessage) = FlowGraph.closed(collector, collector, collector)((_, _, _)) { implicit b => (out0, out1, out2) =>
    val epf = b.add(EventPipelineMock.partialEventPipeline)
    Source.single(message) ~> epf
                              epf.out(0) ~> out0
                              epf.out(1) ~> out1
                              epf.out(2) ~> out2
  }.run()

  "Event Pipeline Flow" should {

    val sessionHeaders = Map("MatchSession" -> 5426)

    "Have messages in the successful output" in withMessage(sessionHeaders) { message =>

      val (successfulOut, eventTypeSuppressed, eventDeletedLogger) = flowGraph(message)

      val result = Await.result(successfulOut, 1000.millis)

      result.headers should contain key ("starting")

      // Should be an Empty stream
      intercept[NoSuchElementException] {
        Await.result(eventTypeSuppressed, 1000.millis)
      }

      intercept[NoSuchElementException] {
        Await.result(eventDeletedLogger, 1000.millis)
      }

    }
  }
}

CONCLUSION

This is a demonstration project with the current design that we are using. However we’re still working hard, learning from the community and our mistakes, and improving our code every day to make it more reliable. And the most important thing we’re having a lot of fun here! Hope you find this post useful.

Until next one!
Gabriel.

Akka Streams: A Real World Case

With the arrival of the first stable release of the still experimental Akka Streams a world of possibilities came into my mind (and I’m pretty sure many other people felt the same). I started playing around with some milestone versions a few months ago and it was really promising, that’s why I’m currently working in a system using this stable release that reached the production environment the last week.

Today I want to share with all of you the things I learnt in this weeks working with Akka Streams v1.0. Now we have an extensively documentation, so I won’t explain some basics, I’m going directly to the interesting part: A Real World Case.

INTRODUCTION

We have many projects. Around 60% of them are coding in Java and we had a problem with one of them that it was using Spring Integration, a framework inspired by the Enterprise Integration Patterns. I don’t want to go further explaining this library but I need to introduce it for you to understand why Akka Streams was a great replacement.

Basically Spring Integration is a chain of messages where the messages can be filtered, transformed and / or processed. I found that this chain of messages is like a streaming of data. You can realize the same if you pay attention to the following analogy:

SPRING INTEGRATION AKKA STREAMS
Message Type of the streaming data
Message Channel Just a Flow
Message Endpoint A single stage of the stream
Transformer A Flow using map
Filter A Flow using filter
Router Connections between the components
Splitter A Fan Out component (Broadcast, Balance)
Aggregator A Fan In component (Merge, Concat)
Service Activator A Flow using map
Channel Adapter A Source or a Sink

This is how the configuration looks like:

<int:chain id="main-chain" input-channel="in" output-channel="out">
    <int:header-enricher>
      <int:header name="name" value="Many" />
    </int:header-enricher>
    <int:service-activator>
      <bean class="org.foo.SampleService" />
    </int:service-activator>
    <int:gateway request-channel="inputA"/>  
</int:chain>

<int:chain id="nested-chain-a" input-channel="inputA">
    <int:header-enricher>
        <int:header name="name" value="Moe" />
    </int:header-enricher>
    <int:gateway request-channel="inputB"/> 
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

<int:chain id="nested-chain-b" input-channel="inputB">
    <int:header-enricher>
        <int:header name="name" value="Jack" />
    </int:header-enricher>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

DRAWING THE FLOW GRAPHICS

Do you remember those ugly XML files? I felt the same and I started redefining the flow using Akka Streams. The first step was to draw a graphic of the streaming for a better understanding. So we got something like this:

01-event-input-flow

02-event-type-filtered-flow  03-headers-validation-flow

04-event-processor-flow

Putting all the pieces together we got this final flow:

00-event-pipeline-flow

The streaming flows from left to right. The circle represents inputs and outputs. So for instance, the Event Input Flow has 1 Input and 2 Outputs and the Event Processor Flow has 2 Inputs and 1 Output.

START CODING

Translate this graphics into code is the funniest part. Akka Streams make this task easy and fully expressive. Take a look at the following code for the Event Input Flow, but first of all, let’s see how it looks our data type:

case class Event(id: Long, `type`: String, origin: String, destination: String)
case class FlowMessage(headers: Map[String, AnyRef], event: Event)
  lazy val eventInputFlow = FlowGraph.partial() { implicit builder =>
    val headersProcess = builder.add(Flow[FlowMessage] map (addHeaders(_)))
    val b1, b2 = builder.add(Broadcast[FlowMessage](2))
    val eventTypeFilter = builder.add(Flow[FlowMessage] filter (m => m.event.`type` == "TENNIS"))
    val eventTypeNotFilter = builder.add(Flow[FlowMessage] filter (m => m.event.`type`!= "TENNIS"))
    val eventTypeFiltered = builder.add(eventTypeFilteredFlow)
    val sinkIgnore = builder.add(Sink.ignore)
    val headersFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.contains("MatchSession")))
    val headersNotFilter = builder.add(Flow[FlowMessage] filter (m => !m.headers.contains("MatchSession")))

    headersProcess ~> b1 ~> eventTypeNotFilter ~> eventTypeFiltered ~> sinkIgnore
                      b1 ~> eventTypeFilter ~> b2 ~> headersFilter
                                               b2 ~> headersNotFilter

    UniformFanOutShape(headersProcess.inlet, headersFilter.outlet, headersNotFilter.outlet)
  }.named("eventInputFlow")

Can you see how the connections in the code are similar to the graphics? This is possible thanks to the very expressive DSL provided by Akka. Here is the code representing the Event Type Filtered Flow:

  lazy val eventTypeFilteredFlow = FlowGraph.partial() { implicit builder =>
    val logger = builder.add(Flow[FlowMessage] map { m => println(s"Event Filtered: $m"); m })
    val sender = builder.add(Flow[FlowMessage] map (sendToExternalService(_)))

    FlowShape(logger.inlet, sender.outlet)
  }.named("eventTypeFilteredFlow")

Headers Validation Flow:

  lazy val headersValidationFlow = FlowGraph.partial() { implicit builder =>
    val logger = builder.add(Flow[FlowMessage] map { m => println(s"Headers Validation: $m"); m })
    val sender = builder.add(Flow[FlowMessage] map (completeHeaders(_)))

    FlowShape(logger.inlet, sender.outlet)
  }.named("headersValidationFlow")

Event Processor Flow:

  lazy val eventProcessorFlow = FlowGraph.partial() { implicit builder =>
    val inputMerge, filteredMerge = builder.add(Merge[FlowMessage](2))
    val b1, b2 = builder.add(Broadcast[FlowMessage](2))
    val originFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.get("Origin") == "providerName"))
    val originNotFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.get("Origin") != "providerName"))
    val destinationFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.get("Destination") == "destName"))
    val destinationNotFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.get("Destination") != "destName"))
    val eventProcessor = builder.add(Flow[FlowMessage] map (process(_)))
    val eventProcessorSender = builder.add(Flow[FlowMessage] map (sendProcessedEvent(_)))
    val originLogger = builder.add(Flow[FlowMessage] map { m => println(s"Origin Filtered: $m"); m })
    val destinationLogger = builder.add(Flow[FlowMessage] map { m => println(s"Destination Filtered: $m"); m })
    val eventLogger = builder.add(Flow[FlowMessage] map { m => println(s"Event Filtered: $m"); m })
    val sinkIgnore = builder.add(Sink.ignore)

    inputMerge ~> b1 ~> originNotFilter ~> originLogger ~> filteredMerge
                  b1 ~> originFilter ~> b2 ~> destinationNotFilter ~> destinationLogger ~> filteredMerge
                                        b2 ~> destinationFilter ~> eventProcessor ~> eventProcessorSender
    filteredMerge ~> eventLogger ~> sinkIgnore

    UniformFanInShape(eventProcessorSender.outlet, inputMerge.in(0), inputMerge.in(1))
  }.named("eventProcessorFlow")

And finally the flow that connects every partial flow, the Event Pipeline Flow:

  lazy val eventPipelineFlow = FlowGraph.partial() { implicit builder =>
    val eventInput: UniformFanOutShape[FlowMessage, FlowMessage] = builder.add(eventInputFlow)
    val headersValidation: FlowShape[FlowMessage, FlowMessage] = builder.add(headersValidationFlow)
    val eventProcessor: UniformFanInShape[FlowMessage, FlowMessage] = builder.add(eventProcessorFlow)

    eventInput.out(0) ~> eventProcessor
    eventInput.out(1) ~> headersValidation ~> eventProcessor

    FlowShape(eventInput.in, eventProcessor.out)
  }.named("eventPipelineFlow")

MATERIALIZING THE STREAM

Now we are ready to connect a Source and a Sink to the Event Pipeline Flow and Materialize the stream. This is one way to do it using an ActorRef as a Source:

val source: Source[FlowMessage, ActorRef] = Source.actorRef[FlowMessage](1000, OverflowStrategy.fail)
val actorRef: ActorRef = Flow.wrap(eventPipelineFlow).to(Sink.ignore).runWith(source)
  
actorRef ! message

If you need to know when the streaming finish you can get a Future. Let’s see how to do it:

val sinkFold = Sink.fold[FlowMessage, FlowMessage](null){ (_, message) =>
  println(message)
  message
}

val future: Future[FlowMessage] = Source.single(message)
    .via(Flow.wrap(eventPipelineFlow))
    .runWith(sinkFold)

And that’s it! Depends on what you need to do you will find a way. It’s all in the official documentation.

CONCLUSION

In this flows you can find many things to improve. You can see repeated code, for example, every time you have a filter and a filterNot you will need a Broadcast of two outputs before. There many common patterns to extract and generate a better code and we already did it.

This example is similar to our case but not the real one. However, it represents exactly in what we are working on.

In the next posts I’m going to explain some advanced designs and patterns that we are using to give a better use of this wonderful library.

Until soon!
Gabriel.

Light Weight REST API Using Play! Framework 2.4.x

Hi all!

I’m starting this blog writing about a simple project that I created a few weeks ago. Play! 2.4 introduced a new way to define a Router in addition to the old static Router: using dependency injection with Guice.

Taking advance of this feature I created a simple but completed project fundamentally focused on clean coding and test coverage. It’s currently using Codeship for Continuous Integration and Continuous Deployment, Coveralls for the Tests Coverage Reports and Codacy for automated code review.

Take a look at the project!

In addition to this quite new project I want to share with you another project that I created a few months ago: Social Graph API. It’s a full tested project but using Play! Framework 2.3.8. SGA is a REST API using Silhouette with JWT (Json Web Token) and Redis for authentication, and Neo4j as a Graph DB with the Scala client AnormCypher.

Cheers,
Gabriel.