Akka Streams: A Real World Case

With the arrival of the first stable release of the still experimental Akka Streams a world of possibilities came into my mind (and I’m pretty sure many other people felt the same). I started playing around with some milestone versions a few months ago and it was really promising, that’s why I’m currently working in a system using this stable release that reached the production environment the last week.

Today I want to share with all of you the things I learnt in this weeks working with Akka Streams v1.0. Now we have an extensively documentation, so I won’t explain some basics, I’m going directly to the interesting part: A Real World Case.

INTRODUCTION

We have many projects. Around 60% of them are coding in Java and we had a problem with one of them that it was using Spring Integration, a framework inspired by the Enterprise Integration Patterns. I don’t want to go further explaining this library but I need to introduce it for you to understand why Akka Streams was a great replacement.

Basically Spring Integration is a chain of messages where the messages can be filtered, transformed and / or processed. I found that this chain of messages is like a streaming of data. You can realize the same if you pay attention to the following analogy:

SPRING INTEGRATION AKKA STREAMS
Message Type of the streaming data
Message Channel Just a Flow
Message Endpoint A single stage of the stream
Transformer A Flow using map
Filter A Flow using filter
Router Connections between the components
Splitter A Fan Out component (Broadcast, Balance)
Aggregator A Fan In component (Merge, Concat)
Service Activator A Flow using map
Channel Adapter A Source or a Sink

This is how the configuration looks like:

<int:chain id="main-chain" input-channel="in" output-channel="out">
    <int:header-enricher>
      <int:header name="name" value="Many" />
    </int:header-enricher>
    <int:service-activator>
      <bean class="org.foo.SampleService" />
    </int:service-activator>
    <int:gateway request-channel="inputA"/>  
</int:chain>

<int:chain id="nested-chain-a" input-channel="inputA">
    <int:header-enricher>
        <int:header name="name" value="Moe" />
    </int:header-enricher>
    <int:gateway request-channel="inputB"/> 
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

<int:chain id="nested-chain-b" input-channel="inputB">
    <int:header-enricher>
        <int:header name="name" value="Jack" />
    </int:header-enricher>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

DRAWING THE FLOW GRAPHICS

Do you remember those ugly XML files? I felt the same and I started redefining the flow using Akka Streams. The first step was to draw a graphic of the streaming for a better understanding. So we got something like this:

01-event-input-flow

02-event-type-filtered-flow  03-headers-validation-flow

04-event-processor-flow

Putting all the pieces together we got this final flow:

00-event-pipeline-flow

The streaming flows from left to right. The circle represents inputs and outputs. So for instance, the Event Input Flow has 1 Input and 2 Outputs and the Event Processor Flow has 2 Inputs and 1 Output.

START CODING

Translate this graphics into code is the funniest part. Akka Streams make this task easy and fully expressive. Take a look at the following code for the Event Input Flow, but first of all, let’s see how it looks our data type:

case class Event(id: Long, `type`: String, origin: String, destination: String)
case class FlowMessage(headers: Map[String, AnyRef], event: Event)
  lazy val eventInputFlow = FlowGraph.partial() { implicit builder =>
    val headersProcess = builder.add(Flow[FlowMessage] map (addHeaders(_)))
    val b1, b2 = builder.add(Broadcast[FlowMessage](2))
    val eventTypeFilter = builder.add(Flow[FlowMessage] filter (m => m.event.`type` == "TENNIS"))
    val eventTypeNotFilter = builder.add(Flow[FlowMessage] filter (m => m.event.`type`!= "TENNIS"))
    val eventTypeFiltered = builder.add(eventTypeFilteredFlow)
    val sinkIgnore = builder.add(Sink.ignore)
    val headersFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.contains("MatchSession")))
    val headersNotFilter = builder.add(Flow[FlowMessage] filter (m => !m.headers.contains("MatchSession")))

    headersProcess ~> b1 ~> eventTypeNotFilter ~> eventTypeFiltered ~> sinkIgnore
                      b1 ~> eventTypeFilter ~> b2 ~> headersFilter
                                               b2 ~> headersNotFilter

    UniformFanOutShape(headersProcess.inlet, headersFilter.outlet, headersNotFilter.outlet)
  }.named("eventInputFlow")

Can you see how the connections in the code are similar to the graphics? This is possible thanks to the very expressive DSL provided by Akka. Here is the code representing the Event Type Filtered Flow:

  lazy val eventTypeFilteredFlow = FlowGraph.partial() { implicit builder =>
    val logger = builder.add(Flow[FlowMessage] map { m => println(s"Event Filtered: $m"); m })
    val sender = builder.add(Flow[FlowMessage] map (sendToExternalService(_)))

    FlowShape(logger.inlet, sender.outlet)
  }.named("eventTypeFilteredFlow")

Headers Validation Flow:

  lazy val headersValidationFlow = FlowGraph.partial() { implicit builder =>
    val logger = builder.add(Flow[FlowMessage] map { m => println(s"Headers Validation: $m"); m })
    val sender = builder.add(Flow[FlowMessage] map (completeHeaders(_)))

    FlowShape(logger.inlet, sender.outlet)
  }.named("headersValidationFlow")

Event Processor Flow:

  lazy val eventProcessorFlow = FlowGraph.partial() { implicit builder =>
    val inputMerge, filteredMerge = builder.add(Merge[FlowMessage](2))
    val b1, b2 = builder.add(Broadcast[FlowMessage](2))
    val originFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.get("Origin") == "providerName"))
    val originNotFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.get("Origin") != "providerName"))
    val destinationFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.get("Destination") == "destName"))
    val destinationNotFilter = builder.add(Flow[FlowMessage] filter (m => m.headers.get("Destination") != "destName"))
    val eventProcessor = builder.add(Flow[FlowMessage] map (process(_)))
    val eventProcessorSender = builder.add(Flow[FlowMessage] map (sendProcessedEvent(_)))
    val originLogger = builder.add(Flow[FlowMessage] map { m => println(s"Origin Filtered: $m"); m })
    val destinationLogger = builder.add(Flow[FlowMessage] map { m => println(s"Destination Filtered: $m"); m })
    val eventLogger = builder.add(Flow[FlowMessage] map { m => println(s"Event Filtered: $m"); m })
    val sinkIgnore = builder.add(Sink.ignore)

    inputMerge ~> b1 ~> originNotFilter ~> originLogger ~> filteredMerge
                  b1 ~> originFilter ~> b2 ~> destinationNotFilter ~> destinationLogger ~> filteredMerge
                                        b2 ~> destinationFilter ~> eventProcessor ~> eventProcessorSender
    filteredMerge ~> eventLogger ~> sinkIgnore

    UniformFanInShape(eventProcessorSender.outlet, inputMerge.in(0), inputMerge.in(1))
  }.named("eventProcessorFlow")

And finally the flow that connects every partial flow, the Event Pipeline Flow:

  lazy val eventPipelineFlow = FlowGraph.partial() { implicit builder =>
    val eventInput: UniformFanOutShape[FlowMessage, FlowMessage] = builder.add(eventInputFlow)
    val headersValidation: FlowShape[FlowMessage, FlowMessage] = builder.add(headersValidationFlow)
    val eventProcessor: UniformFanInShape[FlowMessage, FlowMessage] = builder.add(eventProcessorFlow)

    eventInput.out(0) ~> eventProcessor
    eventInput.out(1) ~> headersValidation ~> eventProcessor

    FlowShape(eventInput.in, eventProcessor.out)
  }.named("eventPipelineFlow")

MATERIALIZING THE STREAM

Now we are ready to connect a Source and a Sink to the Event Pipeline Flow and Materialize the stream. This is one way to do it using an ActorRef as a Source:

val source: Source[FlowMessage, ActorRef] = Source.actorRef[FlowMessage](1000, OverflowStrategy.fail)
val actorRef: ActorRef = Flow.wrap(eventPipelineFlow).to(Sink.ignore).runWith(source)
  
actorRef ! message

If you need to know when the streaming finish you can get a Future. Let’s see how to do it:

val sinkFold = Sink.fold[FlowMessage, FlowMessage](null){ (_, message) =>
  println(message)
  message
}

val future: Future[FlowMessage] = Source.single(message)
    .via(Flow.wrap(eventPipelineFlow))
    .runWith(sinkFold)

And that’s it! Depends on what you need to do you will find a way. It’s all in the official documentation.

CONCLUSION

In this flows you can find many things to improve. You can see repeated code, for example, every time you have a filter and a filterNot you will need a Broadcast of two outputs before. There many common patterns to extract and generate a better code and we already did it.

This example is similar to our case but not the real one. However, it represents exactly in what we are working on.

In the next posts I’m going to explain some advanced designs and patterns that we are using to give a better use of this wonderful library.

Until soon!
Gabriel.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s