Search This Blog


Showing posts with label akka-stream. Show all posts
Showing posts with label akka-stream. Show all posts

Unfolding streams like a boss (part 2)

Parallelizing resumable bulk consumes with CM-Well & akka-stream

In the previous post we introduced CM-Well’s consume API, and showed how it is tailored to be used with akka-http & akka-stream. This post will get into the gory details of how to squeeze the most out of CM-Well, using the bulk-consume API with akka-http & akka-stream.

The consume API in depth

There are two types of “consumes”, consume & bulk-consume. We will focus on the latter. But a few words on consume to not leave you hanging: consume just wraps a regular search, with a few extra filters. in terms of CM-Well’s qp, it translate to the following:

Given filters qp=$QP and an timestamp index-time=${ITIME:-0}, CM-Well generates the following (loosely) equivalent search parameters:

# set to current time minus 30 seconds
NOW=$(
  MILLIS=$(  date +%s%N | cut -b1-13 )
  calc $MILLIS - 30000
)

"?op=search
 &qp=system.indexTime>$ITIME,system.indexTime<$NOW,[$QP]
 &sort-by=system.indexTime
 &length=100"

It then fetches those (up to) 100 (by default) sorted results, and: if all results has the same index time SOME_ITIME, it will replace the previous op=search with op=stream, and previous qp=system.indexTime>$ITIME,system.indexTime<$NOW,[$QP] with qp=system.indexTime:$SOME_ITIME,[$QP]. else it will have multiple values, all sorted. it will drop all the tailing results with index time = $MAX_ITIME, and return the rest, with a token in the header setting the next $ITIME to be $MAX_ITIME - 1.

These are the basics, the are a few more concerns to take into consideration, and if thats interest you, go ahead and check the source code.

Understanding bulk-consume

In contrast to consume API, bulk-consume tries to be more efficient, and retrieve a lot more infotons per request à la stream style. Under the hood it uses Elasticsearch’s scroll API in a similar way to how we described stream API is made in the previous post. The problem is, that you can’t get sorted results with scroll from Elasticsearch. So, instead of advancing the timeline using sorted search, we filter results in advance.

This means there’s a pre-processing phase where we try to find a from and to timestamps, that are not “too far” apart, in terms of number of results, but enough to stream efficiently. CM-Well does it using a simple binary search to do so, and it tries to return a chunked response with O(1M) results (by default). There are many edge cases covered, like an early cutoff, if the binary search doesn’t converged fast enough, And dealing with near current time results, etc’…

Like consume, bulk-consume returns a position token in headers. In fact, the tokens are interchangeable between the 2 APIs, But those returned from a bulk-consume request, might contain some extra attributes. It turns out, that many pre-processing phases can be avoided if previous request stored an optional next “to” timestamp it might have encounterd during the binary search. So, what’s so great about the bulk-consume API? pipelined parallelization! You see, the next position token is given eagerly in the response headers, and a user can use it right away to fire up the next request. Since it will probably take some time to fetch all those O(1M) results, you could end up with as many parallel streams of data that you can handle.

But, you might ask: “What about failures? retrying?”, the answer is, that bulk consume also let’s you set upper bound timestamp explicitly. If your token was an optimized one, you can reuse it safely. If not, a new binary search might yield different time range, and you could end up with duplicate results, or worse, data gaps. To overcome this, you should supply a timestamp explicitly when retrying. But, what should you supply? Well, there’s another header for that. Other than X-CM-WELL-POSITION header, you also get X-CM-WELL-TO header, and the value is the upper bound timestamp found in the binary search. You should supply this timestamp using to-hint query parameter, and retry the bulk-consume request with it. Note that if the position token is optimized, to-hint will be ignored.

OK, got it. let’s write some code

As implied, we will show how to build an akka-stream Source of data from CM-Well, using unfoldFlow, Retry, and other cool constructs you can find on akka-stream & akka-stream-contrib libs.

The easy part (motivation)

Assuming we can somehow get:

type PositionToken = String
val initialPosition: PositionToken = ???
val consume: Flow[PositionToken,(PositionToken,ByteString),_] = ???

The work left is ridiculously easy thanks to unfoldFlow:

SourceGen.unfoldFlow(initialPosition)(consume)

And we’re done! OK… not really… It’s too simplified. unfoldFlow can’t unfold the next element until it gets the previous generated state. This means that all our fancy talk about pipelining parallelization isn’t being taken into consideration here. So let’s try and improve that. How ’bout:

val consume: Flow[PositionToken,(PositionToken,Source[ByteString,_]),_] = ???
SourceGen.unfoldFlow(initialPosition)(consume)
         .flatMapConcat(List.apply)

This is already much better. Each bulk-consume Source is being queried eagerly. But we still have a downside here… bulks are not evenly sized, and size is counted as the number of infotons in the bulk. Not their actual size… Moreover, we mentioned retries are supported using to-hint with X-CM-WELL-TO header’s value. So, if we are going to retry some streams, this means we need to buffer an entire chunk, and only emit once we know it is complete, so we don’t get duplicate results from retries. This implies a single bulk can get us “stuck” waiting for it. The 2 major problems are: * No matter how high we set up our parallelization factor, we could still end up back-pressuring our slow source (by slow, I mean that whatever the use-case, we must assume a fast consumer. e.g: flush to disk, which is much faster than our network calls). * Having $parallelization-factor × O(1M) all buffer into memory, makes our GC inefficient, due to objects kept in memory for long time. And also we cause our downstream to be in starvation until we “unstuck” the current bulk.

So, since bulk are not sorted according to timeline anyway, then no reason not to use merge instead of concat:

SourceGen.unfoldFlow(initialPosition)(consume)
         .flatMapMerge(identity)

Also, we will try to optimize even further. Our queries to bulk-consume are our “bottle-neck”. So, it is better to not pull in all the data with the bulk. let’s use a thin format, like tsv, which won’t return data itself, only a tuple consisting of infoton’s path, date, uuid, and indexTime. This way, we can at a later stage pull in data of small batches of infotons we got from the bulk consume. So our final higher level stream should either look like:

val consume: Flow[PositionToken,(PositionToken,Source[List[ByteString],_]),_] = ???
val addData: Flow[List[ByteString],ByteString,_] = ???
SourceGen.unfoldFlow(initialPosition)(consume)
         .flatMapMerge(identity)
         .mapConcat(identity)
         .via(addData)

where addData flow utilizes Balance to fan out and parallelize data fetching job, and then fan back in to construct a Flow shape which takes care of parallelization internally. Another option, is to use a simpler mapAsync(parallelizationFactor)(...) to get an easier way to parallelize the data fetching job. Or, it can look like:

val consume: Flow[PositionToken,(PositionToken,Source[List[ByteString],_]),_] = ???
val addData: Flow[List[ByteString],ByteString,_] = ???
SourceGen.unfoldFlow(initialPosition)(consume)
         .flatMapMerge(_.via(addData))

OK, armed with our goal in mind, let’s implement the consume flow: ### The detailed part Let’s try to break down & visualize the stream:

  • Retry:
    • bulk-consume ~> fold results into a single element
    • if #lines != X-CM-WELL-N header or any other failure:
      • retry with to-hint=${X-CM-WELL-TO}
    • else
      • emit results
  • Enrich with data:
    • batch paths
    • Balance vs. mapAsync parallelization of fetching (we’ll introduce both approaches)

Wrapping up

This might be covered in a future post. Currently, It has been sitting untouched for too long, and I’m “flushing” it. For now, implementation details are left as an excersize for the reader ;)

Unfolding streams like a boss (part 1)

How we utilize Akka streams in CM-Well

Unfolding is not a new concept, and this blog post isn’t about explaining a needlessly obfuscated ivory tower FP term. This post is about how a simple concept, of building a sequence from an initial state, and a function that generated a tuple of the next state, and an element, is used in CM-Well.

How our story begins

Initially, CM-Well didn’t have streaming APIs. The closest thing our users could get, was iterator API. There’s not much in this API regarding CM-Well’s logic, we just wrap Elasticsearch scroll API, with or without enriching the results by fetching data from Cassandra. We also use a little trick of storing the scrollId received from Elasticsearch in a short lived actor, and return the actor address instead, and while doing so, ensuring that the iterator id token isn’t “long”. We did this, to allow users to always use GET with a query parameter, what isn’t possible with Elasticsearch’s original scrollId, since it may not fit in a URL (> 4K).

Well, of course, users who used it, just wanted to get all the data. There really isn’t much point in having users do a request for every chunk. We wanted to just provide a streaming chunked response with all the data.

The story unfolds

The concept of unfold isn’t just about streams, as can be seen in CM-Well’s util, we also implemented it for regular collections1. But using it to unfold streams of data is a natural fit. Elasticsearch scroll API, makes a great example for how unfolding streams of data gets simple using unfold. We have the state (scrollId), and element (scroll results). So, we experimented with RX, Akka streams (which was very young, and experimental back then), and play’s iteratees. Trying to see which will unfold our data stream best.

We ended up with a fairly reasonable implementation with play’s iteratees. After all, the webservice was built with play, and at the time (play 2.3), we had to convert to iteratees anyway. Personally, I liked the concept, and hated the DSL2. But I do think one of the biggest drawbacks of iteratees, is that it perceives as “too complicated” for newbies. And it does have a steep leaning curve. Anyway, it served us pretty well, but as akka streams kept evolving, I began to squint back in it’s direction.

The akka-stream chapter

My initial go at unfolding a stream with akka streams was a low level implementation of a publisher actor (I even have an old gist of that implementation). It worked, but felt not very “akka-streamy”, and too low level. So I decided to try and submit my first PR to akka, and add unfold to official API. This initial PR, even show you can unfold an infinite stream with what akka had back then (unfoldInf was later removed, as it really didn’t add much value…). but unfolding finite and async streams, had to be taken care of in a custom GraphStage. Basically, I shamelessly copied play’s iteratees API, and implemented it in akka streams. I even named the methods unfold & unfoldM for the async version at first, following the iteratees convention (as can be seen in the original issue). Back to CM-Well, we used unfoldAsync to create a new API. The resulting stream API is pretty useful, and provides a simple way for users to downloads a complete dataset, with a single curl/wget command.

Time went by, CM-Well kept evolving, and we started using akka-stream more and more. Many parts of CM-Well, act as clients for the webservice. e.g: cmwell-dc, which is responsible (among other things) to inter data center synchronization. Or cmwell-data-tools which provides ways to download data from CM-Well, ingest data to CM-Well, or process with SPARQL queries using CM-Well. These modules uses an API that is tailored for akka-stream/akka-http consumption, I am referring to consume API. In this API, which is meant for stable resumable timebased data consumeption, and is based on consecutive searches with range filters rather than scroll in Elasticsearch, we decided to take a slightly different approach than what we did with the iterator API. influenced greatly by the way akka-http handles responses, we decided to provide the token in the response headers. After all, an HttpResponse in akka-http, has response headers available directly, where’s the response body has to be consumed from a Source[ByteString,Any]. The resulting code looked roughly like:

val mkRequest: String => Future[HttpResponse] = { token: String =>
  Http().singleRequest(HttpRequest( ... ))
}

Source.unfoldAsync(initialTokenRetrievedFromCreateConsumerAPI) { currentToken =>
  val res: Future[HttpResponse] = mkRequest(token)
  res.map {
    case HttpResponse(s, h, e, _) if s.isSuccess() => {
      h.find(_.name() == "X-CM-WELL-POSITION").flatMap { nextToken =>
        val next = nextToken.value()
        if(currentToken == next) None
        else {
          val data = e.dataBytes
          Some(next -> data)
        }
      }
    }
  }
}

But, if we are to take this one step further, we know that akka-http uses connection pool flows under the hood. And why not use the flow directly within our nice streamy code?

That’s exactly what I thought when I opened yet another pull request to akka (this time, to stream-contrib module), adding unfoldFlow. The snippet above could be re-written using unfoldFlow to something like:

val mkRequest: String => HttpRequest = (token: String) => ...
val connFlow = Http().superPool[String]()

SourceGen.unfoldFlow(initialTokenRetrievedFromCreateConsumerAPI)(
  Flow.fromFunction(mkRequest)
      .via(connFlow)
      .map {
        case (Success(HttpResponse(s, h, e, _)),currentToken) if s.isSuccess() => {
          h.find(_.name() == "X-CM-WELL-POSITION").flatMap { nextToken =>
            val next = nextToken.value()
            if(currentToken == next) None
            else {
              val data = e.dataBytes
              Some(next -> data)
            }
          }
        }
      }
)

There’s a lot not shown in this snippet, e.g: handling failures, or customizing connection pool, etc’… But this demonstrates how we built an API that fits perfectly to akka stream, and how we utilize it efficiently.

To be continued…

Next post, will be a bit more technical, and will show how users can unfold their own streams with akka-stream & CM-Well API more efficiently. So stay tuned for “parallelizing resumable bulk consumes with CM-Well & akka-stream”!


  1. Frankly, I’m surprised it’s not a built-in part of scala collections.↩︎

  2. I kept being asked to explain all the fish in the code…↩︎