Search This Blog


Unfolding streams like a boss (part 1)

How we utilize Akka streams in CM-Well

Unfolding is not a new concept, and this blog post isn’t about explaining a needlessly obfuscated ivory tower FP term. This post is about how a simple concept, of building a sequence from an initial state, and a function that generated a tuple of the next state, and an element, is used in CM-Well.

How our story begins

Initially, CM-Well didn’t have streaming APIs. The closest thing our users could get, was iterator API. There’s not much in this API regarding CM-Well’s logic, we just wrap Elasticsearch scroll API, with or without enriching the results by fetching data from Cassandra. We also use a little trick of storing the scrollId received from Elasticsearch in a short lived actor, and return the actor address instead, and while doing so, ensuring that the iterator id token isn’t “long”. We did this, to allow users to always use GET with a query parameter, what isn’t possible with Elasticsearch’s original scrollId, since it may not fit in a URL (> 4K).

Well, of course, users who used it, just wanted to get all the data. There really isn’t much point in having users do a request for every chunk. We wanted to just provide a streaming chunked response with all the data.

The story unfolds

The concept of unfold isn’t just about streams, as can be seen in CM-Well’s util, we also implemented it for regular collections1. But using it to unfold streams of data is a natural fit. Elasticsearch scroll API, makes a great example for how unfolding streams of data gets simple using unfold. We have the state (scrollId), and element (scroll results). So, we experimented with RX, Akka streams (which was very young, and experimental back then), and play’s iteratees. Trying to see which will unfold our data stream best.

We ended up with a fairly reasonable implementation with play’s iteratees. After all, the webservice was built with play, and at the time (play 2.3), we had to convert to iteratees anyway. Personally, I liked the concept, and hated the DSL2. But I do think one of the biggest drawbacks of iteratees, is that it perceives as “too complicated” for newbies. And it does have a steep leaning curve. Anyway, it served us pretty well, but as akka streams kept evolving, I began to squint back in it’s direction.

The akka-stream chapter

My initial go at unfolding a stream with akka streams was a low level implementation of a publisher actor (I even have an old gist of that implementation). It worked, but felt not very “akka-streamy”, and too low level. So I decided to try and submit my first PR to akka, and add unfold to official API. This initial PR, even show you can unfold an infinite stream with what akka had back then (unfoldInf was later removed, as it really didn’t add much value…). but unfolding finite and async streams, had to be taken care of in a custom GraphStage. Basically, I shamelessly copied play’s iteratees API, and implemented it in akka streams. I even named the methods unfold & unfoldM for the async version at first, following the iteratees convention (as can be seen in the original issue). Back to CM-Well, we used unfoldAsync to create a new API. The resulting stream API is pretty useful, and provides a simple way for users to downloads a complete dataset, with a single curl/wget command.

Time went by, CM-Well kept evolving, and we started using akka-stream more and more. Many parts of CM-Well, act as clients for the webservice. e.g: cmwell-dc, which is responsible (among other things) to inter data center synchronization. Or cmwell-data-tools which provides ways to download data from CM-Well, ingest data to CM-Well, or process with SPARQL queries using CM-Well. These modules uses an API that is tailored for akka-stream/akka-http consumption, I am referring to consume API. In this API, which is meant for stable resumable timebased data consumeption, and is based on consecutive searches with range filters rather than scroll in Elasticsearch, we decided to take a slightly different approach than what we did with the iterator API. influenced greatly by the way akka-http handles responses, we decided to provide the token in the response headers. After all, an HttpResponse in akka-http, has response headers available directly, where’s the response body has to be consumed from a Source[ByteString,Any]. The resulting code looked roughly like:

val mkRequest: String => Future[HttpResponse] = { token: String =>
  Http().singleRequest(HttpRequest( ... ))
}

Source.unfoldAsync(initialTokenRetrievedFromCreateConsumerAPI) { currentToken =>
  val res: Future[HttpResponse] = mkRequest(token)
  res.map {
    case HttpResponse(s, h, e, _) if s.isSuccess() => {
      h.find(_.name() == "X-CM-WELL-POSITION").flatMap { nextToken =>
        val next = nextToken.value()
        if(currentToken == next) None
        else {
          val data = e.dataBytes
          Some(next -> data)
        }
      }
    }
  }
}

But, if we are to take this one step further, we know that akka-http uses connection pool flows under the hood. And why not use the flow directly within our nice streamy code?

That’s exactly what I thought when I opened yet another pull request to akka (this time, to stream-contrib module), adding unfoldFlow. The snippet above could be re-written using unfoldFlow to something like:

val mkRequest: String => HttpRequest = (token: String) => ...
val connFlow = Http().superPool[String]()

SourceGen.unfoldFlow(initialTokenRetrievedFromCreateConsumerAPI)(
  Flow.fromFunction(mkRequest)
      .via(connFlow)
      .map {
        case (Success(HttpResponse(s, h, e, _)),currentToken) if s.isSuccess() => {
          h.find(_.name() == "X-CM-WELL-POSITION").flatMap { nextToken =>
            val next = nextToken.value()
            if(currentToken == next) None
            else {
              val data = e.dataBytes
              Some(next -> data)
            }
          }
        }
      }
)

There’s a lot not shown in this snippet, e.g: handling failures, or customizing connection pool, etc’… But this demonstrates how we built an API that fits perfectly to akka stream, and how we utilize it efficiently.

To be continued…

Next post, will be a bit more technical, and will show how users can unfold their own streams with akka-stream & CM-Well API more efficiently. So stay tuned for “parallelizing resumable bulk consumes with CM-Well & akka-stream”!


  1. Frankly, I’m surprised it’s not a built-in part of scala collections.↩︎

  2. I kept being asked to explain all the fish in the code…↩︎

A tale of bad framework choices

and how it led to interesting code

One of the most interesting pieces of code in CM-Well, IMO, is the http client util code. In short, it defines an API for a http client, and wraps akka-http which serves as the “http client engine”. Why not just use akka’s API, you might ask…? Well, we’ll get there, but first, a bit of history. (TL;DR: A story of why suffered from tightly coupled code, and how we solved it elegantly + code samples)

A long time ago

(In a galaxy far far away) CM-Well was written mostly in java1 (oh boy…), only the web-service logic was written in scala. The scala eco-system looked very different compared to what it is today. The most popular scala web framework at the time was lift, which was much more relevant than play! Back then play was very new, maybe 1.0 or 1.1, and was still written in java. So it was decided to write the CM-Well web service with lift. But not only the web service… You see, lift classes were already on the classpath. Especially lift’s HttpSupport from the testkit, which had reasonable API. So all of CM-Well’s integration tests were written using it. It served it’s purpose, and tests kept pilling up, until we decided to upgrade scala from 2.9 to 2.10. lift was holding us back, and play started to look more and more attractive. We replaced lift with play! but decided to rewrite the integration tests without using play’s testkit or http client. After all, what if down the road we would want to replace it with something else? We didn’t want to be tied up to much to the web framework, so we minimized our dependency in play as much as possible. For the integration tests, we decided to go with a popular scala http client instead. The infamous Dispatch (don’t judge… back then it was popular). For those who are not familiar with dispatch, it’s a scala http client library, which makes heavy use of symbolic operators (check out the Periodic Table). Other than making the code less readable with all those fancy operators to new-comers, it served it’s purpose pretty good. That is until… Yup. It was time to upgrade scala again, and move from version 2.10 to 2.11. And again, our integration tests (and other code that made use of an http client), were written with a library that didn’t move fast enough. It held us back, causing a “jar hell” of unwanted & outdated dependencies… and we grew tired of all the weird operators. But no one wanted to rewrite all the tests again… that’s a lot of dirty work. We hacked, and managed to get by with using it only in tests, so at least we didn’t had the jar hell on our main artifacts classpath, just in tests. Other code in main artifacts that needed a http client, used whatever was on the classpath directly. Be it play’s WS, or spray’s client, it didn’t really mattered. But time went by, and tests kept pilling up, and it was way due to clean the code. Being a bit wiser from the bad experience, we decided to make sure that tests code will never hold us back again from changing libraries and frameworks. We decided to write a thin wrapper with a sane & simple asynchronous http client API. but wrapper for what? well… it doesn’t matter. That was the whole point; if the wrapping layer is small enough, we can always change the underlying library easily, and won’t have to patch up thousands lines of testing code if we ever switch to another library. Anyway, we needed to pick up something, and at the time, we were really excited about the recent developments in akka. It was 2015, and the akka-stream & akka-http experimental modules came out. We decided to check it out, but the experimental modules were too risky for production code, which made it a perfect candidate to serve in our tests as a dispatch replacement, without affecting production code. This was 2 birds in 1 stone - evaluating an interesting technology in it’s early stages, with real code, without risking anything crucial, and using our thin wrapper to decouple test’s logic from the http client library. P.S. to be on the safe side, and for the sport of it, we started to implement the same thin API on top of ning’s AsyncHttpClient, but never really continued with it, since akka-http got the job done perfectly. But some remnants stayed commented out in the sources, waiting for the day that will never come.

We ❤ Akka-http

Choosing akka was challenging. It introduces this new concept of streams, which, at least in tests, we wanted to abstract away for simplicity. But then again, just consuming everything eagerly, hiding completely the reactive nature of the API, and returning a future of response when everything is done, is asking for performance troubles. We needed a simple default, with an easy way of exploiting the asynchrony & reactive capabilities of akka-http. For that, we made heavy use of type classes, in a slightly adapted version of what is known as the magnet pattern.

The gory details

Let’s see some code, shall we..? starting with the API itself:

object SimpleHttpClient {

  // web sockets API is more complex, and out of the scope for this post,
  // but is shown here for completeness, as it is part of the API.
  // You are more than welcome to check out the source code.
  def ws[T : SimpleMessageHandler](uri: String,
         initiationMessage: T,
         subprotocol: Option[String] = None,
         queryParams: Seq[(String,String)] = Nil,
         headers: Seq[(String,String)] = Nil)(react: T => Option[T])
        (implicit ec: ExecutionContext,
         as: ActorSystem = this.sys,
         mat: Materializer = this.mat) = ...

  def get[T : SimpleResponseHandler](uri: String,
          queryParams: Seq[(String,String)] = Nil,
          headers: Seq[(String,String)] = Nil)
         (implicit ec: ExecutionContext,
          as: ActorSystem = this.sys,
          mat: Materializer = this.mat) = ...

  def put[T : SimpleResponseHandler](uri: String,
          body: Body,
          contentType: Option[String] = None,
          queryParams: Seq[(String,String)] = Nil,
          headers: Seq[(String,String)] = Nil)
         (implicit ec: ExecutionContext,
          as: ActorSystem = this.sys,
          mat: Materializer = this.mat) = ...

  def post[T : SimpleResponseHandler](uri: String,
           body: Body,
           contentType: Option[String] = None,
           queryParams: Seq[(String,String)] = Nil,
           headers: Seq[(String,String)] = Nil)
          (implicit ec: ExecutionContext,
           as: ActorSystem = this.sys,
           mat: Materializer = this.mat) = ...

  def delete[T : SimpleResponseHandler](uri: String,
             queryParams: Seq[(String,String)] = Nil,
             headers: Seq[(String,String)] = Nil)
            (implicit ec: ExecutionContext,
             as: ActorSystem = this.sys,
             mat: Materializer = this.mat) = ...

}

Overall, this looks like a pretty straight forward http client API. Let’s try to clear the fog from the unclear parts: Each of the methods returns a Future[SimpleResponse[T]]. I know what you might be thinking… I said simple API, and here I am, showing some fancy code with weird classes, right…? I’ll list down what might be interesting here:

  • implicit as: ActorSystem = this.sys & mat: Materializer = this.mat
  • Body
  • SimpleResponse[T] & SimpleResponseHandler

ActorSystem & Materializer

In akka-http, in order to handle http requests & reponses, you’ll need to get a hold of a HttpExt, which takes an ActorSystem in Http’s factory method. Also, a connection flow to build the request graph around it is needed. To make things simple, we use superPool which returns a flow that routes requests through cached (per host) connection pools, and is managed by akka. It needs a Materializer. We also need a Materializer for running a simple graph per request. something like:

Source.single(request -> context).via(connectionPool).runWith(Sink.head)

Which performs the request and return a Future[Try[HttpResponse]]. Lastly, we’ll need the Materializer also to handle akka’s HttpResponse, which returns the payload in the form of Source[ByteString,_]. Remember, we wanted to abstract away anything that binds us to the library, so we can’t leak (unless we want to, more on that is to follow) akka’s classes. Not Source nor ByteString. We need to convert it to something else. Anyway, as you can see, it’s needed. But if you pay attention, you’ll see it has default values. This let’s us provide reasonable defaults, which can be configured freely using standard typesafe’s config. The provided reference.conf only defines the bare minimum:

cmwell.util.http {
  akka {
    actor {
      provider = "akka.actor.LocalActorRefProvider"
    }
    http {
      host-connection-pool {
        max-open-requests = 1024
      }
    }
  }
}

And as you might have already guessed, the provided actor system is configured using:

ActorSystem("SimpleHttpClient",ConfigFactory.load().getConfig("cmwell.util.http"))

Also, the provided Materializer & ActorSystem are LAZY (as in lazy val), So it won’t even get instantiated if this code is run within production code which makes sure to supply a fully configured ActorSystem & Materializer. But, you might ask: isn’t this binds us to akka? well, technically, yes. in practice, materializer and actor system are passed implicitly, so it’s not written in code (keeping aside some very rare cases). I.E: in the tests, you don’t see any reference to any materializer or actor system, and we are still loosely coupled, thanks to scala being such a flexible language when it comes to defaults & implicits.

Body

The post & put methods also take a mysterious body: Body, so what is it? Of course, as the name suggests it’s the request body. But, you might ask: Should a user be troubled with creating such objects? The answer is no. The Body companion object hosts some pretty useful implicits:

sealed trait Body {
  def entity(contentType: Option[String]): RequestEntity
  def contentType(ct: String): akka.http.scaladsl.model.ContentType = ...
}

object Body {
  import scala.language.implicitConversions

  implicit def apply(body: String): Body = new BodyFromString(body)
  implicit def apply(body: Array[Byte]): Body = new BodyFromBytes(body)
  implicit def apply(body: ByteString): Body = new BodyFromByteString(body)

  private class BodyFromString(body: String) extends Body {  ...  }
  private class BodyFromBytes(body: Array[Byte]) extends Body {  ...  }
  private class BodyFromByteString(body: ByteString) extends Body {  ...  }
}

This means, that you may pass the body argument as whatever you want, be it a String, a Array[Byte] or even akka’s ByteString. And if we ever need something else, it’s very easy to add more automatically acceptable types. We can just add another implicit conversion in Body’s companion object. or, if it’s a special case, then just instantiate a new Body locally, or write your own implicit conversions.

SimpleResponse[T] & SimpleResponseHandler

SimpleResponse is the reponse we get back from executing the request, it’s a pretty simple case class:

object SimpleResponse {

  type ContentType = String
  type ResponseBody[T] = (ContentType, T)

  ...
}

case class SimpleResponse[T : SimpleResponseHandler](status: Int,
                                                     headers: Seq[(String,String)],
                                                     body: ResponseBody[T]) {
  def contentType = body._1
  def payload = body._2

  override def toString() = ...
}

It has an Int for status, a Seq[(String,String)] for headers, and a ResponseBody[T], which is just a tuple of the mimetype (String) and the body, which can be anything that has a SimpleResponseHandler. All methods in exposed API has a type parameter T that are context bound to SimpleResponseHandler, which is the type class responsible of generating the appropriate response for us from the response returned by underlying library - i.e: akka. It means we need an implicit SimpleResponseHandler[T] in scope. Now, please look carefully at the methods signature; none of the parameters has type T. So, you might think this means the compiler cannot infer the type, and user always have to explicitly write it down? The answer, is no. let’s try it out in the REPL:

scala> val res = SimpleHttpClient.get("http://google.com")
res: scala.concurrent.Future[cmwell.util.http.SimpleResponse[Array[Byte]]] = ...

What happens here, is that there is a single implicit that’s available in SimpleResponseHandler companion, and thus is taken by the compiler (If only one implicit instance of SimpleResponseHandler for some T can be found, than that is what’s being picked up, regardless of what T is). This one implicit has the most general type for a response body. It is simply an Array[Byte]. So, if the reponse can fit in memory, it’ll be returned as an Array[Byte]:

trait SimpleResponseHandler[T] {
  def mkStringRepr(t: T): String
  def mkResponseOf(status: Int,
                   headers: Seq[(String,String)],
                   contentType: String,
                   dataBytes: Source[ByteString,Any])
                  (implicit ec: ExecutionContext): Future[SimpleResponse[T]]
}

object SimpleResponseHandler {

  implicit object ByteArrayHandler extends SimpleResponseHandler[Array[Byte]] {
    ...
  }
}

But, if you want something else, all you have to do is import the appropriate implicit (more sensible implicit instances of SimpleResponseHandler can be found in SimpleResponse.Implicits:

object SimpleResponse {

  ...

  // if you want a SimpleResponse[T] for T != Array[Byte],
  // import a SimpleResponseHandler[T] from here (or implement your own)
  object Implicits {

    implicit object InputStreamHandler extends SimpleResponseHandler[InputStream] {
      ...
    }

    implicit object UTF8StringHandler extends SimpleResponseHandler[String] {
      ...
    }
  }
}

So if response body cannot fit in memory, for instance, simply import the appropriate implicit handler:

scala> import cmwell.util.http.SimpleResponse.Implicits.InputStreamHandler
import cmwell.util.http.SimpleResponse.Implicits.InputStreamHandler

scala> val res = SimpleHttpClient.get("http://google.com")
res: scala.concurrent.Future[cmwell.util.http.SimpleResponse[java.io.InputStream]] = ..

It just works because imported implicits takes precedence over implicits defined in the type class’ companion. Of course, If you import more than one handler, you’ll have to explicitly mention the type, or you’ll get a compiler error for “ambiguous implicit values”.

Nice, so… it seems pretty solid. What else?

The implementation we have in CM-Well’s code base is far from complete:

  • Not all http methods are defined (e.g: HEAD)
  • One can think of way more sensible generic response handlers to add in Implicits object (e.g: one that simply returns the Source[ByteString,_] from akka’s response directly)
  • Classes can be arranged better, in separate files.

Basically, this API is being extended lazily, when we need to add something new, and is not a complete solution.

OK, let’s wrap it up

This post is already getting longer than I thought, and we haven’t covered web sockets API, or how we convert akka’s classes to simple types. So lets leave it as an exercise for the reader 😉. The bottom line of this post, is how we ended up with a nice API, which is very flexible, extendable, and implemented in not too many lines of code. The road was a bit bumpy, but the lesson learned was worth it.

P.S.

If you want to get your hands dirty, we would love to get some PRs! (regarding the code shown here, or any other part of CM-Well).


  1. In fact, CM-Well started out as a POC with python & django, and only later was implemented in java & scala.↩︎

CM-Well is OSS!

Ummm… what is this CM-Well thing?

CM-Well is the project Iv’e worked on in Thomson Reuters for the past few years. I’m not gonna write here about the project itself (there’s more than enough information in the project docs), instead, I’ll write on my personal experience working on it.

So what’s now?

Well, this is just the first post in a series of posts I plan to publish, dealing with CM-Well development. You can expect follow-up posts on, e.g: “A tale of bad framework choises (and how it led to interesting code)”, or “CM-Well & OSS before it became OSS”, and more…

Meanwhile…

Go ahead and check it out:
github.com/thomsonreuters/CM-Well