Search This Blog


Showing posts with label scala. Show all posts
Showing posts with label scala. Show all posts

Trick or Trait

tl;dr

Scala’s traits are tricky. There are many pitfalls. especially, if you’re dealing with composed/stacked traits. I recently had an interesting conversation on whether it is best to extend a trait in another trait, or enforce a mixin with self typing (which apparently, can be done in several ways). This led me to some new findings (for me at least), and insights, on how and when to use the different approaches.

What’s wrong with the good old abstract class?

Scala has abstract classes, but they are limited. You cannot inherit more than one class or abstract class. Scala’s way to achieve “multiple inheritance” is via “trait mixins”. It also allows you to extend a trait with another trait, but according to the specs:

… A template \(sc \text{ with } mt_1 \text{ with } \ldots \text{ with } mt_n \{ stats \}\) consists of a constructor invocation \(sc\) which defines the template’s superclass, trait references \(mt_1,\ldots,mt_n (n≥0)\), which define the template’s traits, and a statement sequence stats which contains initialization code and additional member definitions for the template.

Each trait reference \(mt_i\) must denote a trait. By contrast, the superclass constructor \(sc\) normally refers to a class which is not a trait. It is possible to write a list of parents that starts with a trait reference, e.g. \(mt_1 \text{ with } \ldots \text{ with } mt_n\). In that case the list of parents is implicitly extended to include the supertype of \(mt_1\) as first parent type. The new supertype must have at least one constructor that does not take parameters. In the following, we will always assume that this implicit extension has been performed, so that the first parent class of a template is a regular superclass constructor, not a trait reference.

This is not something you would normally do. And there’s a good reason for it.

When should you use extends on a trait?

Scala’s traits are “stackable”, and can be used for “stackable modifications”. This feature is well blogged on, and not the main purpose of this current post, so go ahead and take a look at the basic example from Programming in Scala book.

The reason it works so well, is because each trait stacked extend IntQueue, and thus enforcing it’s own place in class linearization to the left of the implementing class, so super calls are always called in proper order. If we would not have extended, but merely enforce a mixin with self type, we wouldn’t be able to call super, thus not be able to stack operations.

import scala.collection.mutable.ArrayBuffer

abstract class IntQueue {
  def get(): Int
  def put(x: Int)
}

trait Doubling extends IntQueue {
  abstract override def put(x: Int) = { super.put(2 * x) }
}

trait Incrementing extends IntQueue {
  abstract override def put(x: Int) = { super.put(x + 1) }
}

// replacing definition with commented out
// self typing code won't compile:
//
// trait Filtering { this: IntQueue =>
trait Filtering extends IntQueue {
  abstract override def put(x: Int) = {
    if (x >= 0) super.put(x)
  }
}

class BasicIntQueue extends IntQueue {
  private val buf = new ArrayBuffer[Int]
  def get() = buf.remove(0)
  def put(x: Int) = { buf += x }
}

Usage:

scala> val q = new BasicIntQueue with Doubling with Filtering with Incrementing
q: BasicIntQueue with Doubling with Filtering with Incrementing = $anon$1@5e3dd1f3

scala> q.put(-3);q.put(0);q.put(-1);q.put(1)

scala> q.get()
res1: Int = 2

scala> q.get()
res2: Int = 0

scala> q.get()
res3: Int = 4

scala> q.get()
java.lang.IndexOutOfBoundsException: 0
  at scala.collection.mutable.ResizableArray.apply(ResizableArray.scala:46)
  at scala.collection.mutable.ResizableArray.apply$(ResizableArray.scala:45)
  at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:49)
  at scala.collection.mutable.ArrayBuffer.remove(ArrayBuffer.scala:173)
  at BasicIntQueue.get(IntQueue.scala:30)
  ... 36 elided

class linearization?

The specs define linearization according to the following formula:

$$ \mathcal{L}\big(\mathcal{C}\big)=\mathcal{C},\mathcal{L}\big(\mathcal{C_n}\big)\vec{+}\ldots\vec{+}\mathcal{L}\big(\mathcal{C_1}\big) $$

Where \(\vec{+}\) means you add new traits to the right, but only keep the right most appearance of the trait.

$$ \begin{alignat*}{3} a,A\vec{+}B&= a,\big(A\vec{+}B\big) && \textbf{ if }a\notin B \\\\ &= A\vec{+}B && \textbf{ if }a\in B \end{alignat*} $$

This means a class \(\mathcal{C}\), or in our case q, is linearized as:

val q = new BasicIntQueue with Doubling with Filtering with Incrementing

q = \(\mathcal{C}\)
BasicIntQueue = \(\mathcal{L}\big(\mathcal{C}_1\big)=\{BasicIntQueue,IntQueue,AnyRef,Any\}\)
Doubling = \(\mathcal{L}\big(\mathcal{C}_2\big)=\{Doubling,IntQueue,AnyRef,Any\}\)
Filtering = \(\mathcal{L}\big(\mathcal{C}_3\big)=\{Filtering,IntQueue,AnyRef,Any\}\)
Incrementing = \(\mathcal{L}\big(\mathcal{C}_4\big)=\{Incrementing,IntQueue,AnyRef,Any\}\)

$$ \begin{aligned} & q,\mathcal{L}\big(\mathcal{C}_4\big) \vec{+} \mathcal{L}\big(\mathcal{C}_3\big) \vec{+} \mathcal{L}\big(\mathcal{C}_2\big) \vec{+} \mathcal{L}\big(\mathcal{C}_1\big) \\ & q,\mathcal{L}\big(\mathcal{C}_4\big) \vec{+} \big(\mathcal{L}\big(\mathcal{C}_3\big) \vec{+} \big(\mathcal{L}\big(\mathcal{C}_2\big) \vec{+} \mathcal{L}\big(\mathcal{C}_1\big)\big)\big) \\ & q,Incrementing,\mathcal{L}\big(\mathcal{C}_3\big) \vec{+} \big(\mathcal{L}\big(\mathcal{C}_2\big) \vec{+} \mathcal{L}\big(\mathcal{C}_1\big)\big) \\ & q,Incrementing,Filtering,\mathcal{L}\big(\mathcal{C}_2\big) \vec{+} \mathcal{L}\big(\mathcal{C}_1\big) \\ & q,Incrementing,Filtering,Doubling,\mathcal{L}\big(\mathcal{C}_1\big) \\ & q,Incrementing,Filtering,Doubling,BasicIntQueue,IntQueue,AnyRef,Any \end{aligned} $$

Why should you care?

Well, to understand the subtle differences between extending or enforcing a mixin, you need to know about how class linearization is performed. Now, notice how when we defined the traits with extends, the linearization of that trait transitively contained the extended other trait. e.g: Doubling class linearization, contained IntQueue. This means, that as a user, no matter how I mix Doubling in my bottom type, in the linearization, IntQueue will always going to be found right to Doubling, and will always be the super. More importantly, IntQueue is going to be initialized prior to Doubling since initialization order takes effect from the right most element in the linearization, and advancing to the left. This is of-course not a problem with IntQueue case, and exactly what we want and expect, but sometimes, you would want to let the end user be in charge of initialization order.

The weird case of the val in the trait

As you probably know, traits are not interfaces. A trait can hold non abstract members, whether defs, vals, etc’… normally, you shouldn’t care about the linearization of your class. But if your traits interact with each other, and contain (possibly uninitialized) vals, you might (depends on how you defined your class hierarchy and inter-trait interaction) encounter some puzzling NullPointerExceptions. In these cases, since scalac prohibits any form of circular inheritance, a user can re-arrange mixins order of the bottom type, and carry on. Given of-course the user has full control of the class linearization. When you enforce a mixin using self typing, your trait, and the trait you enforce mixin with, can appear in any order once linearized. The user is in full control. And you can (though not necessarily should) use anything from the enforced mixin, as if it was “extended regularly”. As long as you also type alias this.

this aliasing?

this aliasing isn’t something new or unknown. There are many good reasons to do so1, but for now, just know there are several ways to do it, with very subtle differences between them.

trait A { self => ... }
trait A { self: B => ... }
trait A { this: B => ... }
trait A { _: B => ... }

In the first option, you give an alias to this, usually, so you would be able to refer to it from an inner class (which is quite handy if you utilize path dependent types). The second is interesting, it will force implementing classes to also mix in B trait. Options 3 & 4 are equivalent as much as I know (please correct me if I’m wrong).

conclusion

Adhering to the principle of least power, you should choose the most restrictive approach you can get by with. If all you need is to know your trait is always mixed with another trait2, use only a type (option 4). Also if you need to use another trait capabilities, but only from within a method, or in any way not during initialization, use self typing (options 2, 3, 4). If you also have inner classes and refers to this in the code, alias it as something else (convention is “self”) to ease readability. If you depend on another trait during initialization, then extend it to ensure correct ordering in class linearization. But do it only if you really must.


  1. maybe more on that in a future post.↩︎

  2. you may think what would be a good usecase. Well, I’m wondering about it myself. Maybe if you have a sealed trait, and you want the implementing classes to have some functionality, but you don’t want any other class to have that functionality and still want to put it in a different file. This way you enforce a trait from another file can only be mixed in with your sealed trait, and offer functionality without overloading too much logic in a single file. got any better idea? I’d love to hear :)↩︎

FP-ish patterns

Background

My twitter feed is becoming more and more (pure) FP oriented. Many of the people I follow from the Scala community are advocating for Haskell over Scala lately. To be honest, I was always intrigued about pure FP, but never got to use it in the “real world”1. I always worked in teams that had OOP mindset, and codebase. With Scala I was able to shift the balance a little bit towards FP. I advocated for referential transparency, strongly typed over stringly typed code, higher order functions, higher kinded types, typeclasses, etc’… I aimed to be more FP in an OO world. But I never managed to get my peers into learning together, and using, “pure FP” with libraries like cats or scalaz. So I wonder, does pure FP really is the answer - the silver bullet - I’m looking for, or should we tilt the balance less towards purity in Scala?

Harsh criticism against impure Scala

In the latest scalapeno conference, I attended John De Goes’ keynote “The Last Hope for Scala’s Infinity War”. In the talk, John claimed that: > “Some of the proposed features in Scala 3 are targeting someone who doesn’t exist”

He referred to features like Type Classes & Effect System. features for no one

But the truth is, that john was wrong. There is at least one person that wants such features2. There is a middle-ground. And I want to believe I’m not alone, and some other people see the value Scala has to offer for FP lovers (wannabes?) working with traditional OO programmers, trying to make a difference. neither

Like John, I love Scala. but unlike John, I don’t think we need to sacrifice half the community

And I don’t want to see all the crossed out features removed from Scala: liabilities?

Scala is expressive

I was recently asked, why I love Scala. I answered that with Scala, It’s very easy for me to express exactly what I mean for the program to do, and that it’s easy to expose beautiful (and safe) functional interface, while encapsulating impure logic and gaining the benefit of both worlds. I was then asked to give an example, of short code that I find easy to express in Scala, and hard(er) to write in other languages. I choose something I’ve written a while ago, and I find as a nice example for such FP-ish pattern.

def mapr[A,B](as:         Vector[A])
             (f:    A  => Future[B])
             (g: (B,B) => Future[B]): Future[B]

So… what’s so special about mapr? A quick scala thinker will implement it with something like3:

Future.traverse(as)(f)
      .flatMap(_ reduce g) // if g were (B,B) => B

And it can be fine, considering the usecase. It wasn’t fine in our case.

The “real world” case

Consider A to be file names to fetch from a S3 bucket. But not just any file, it’s a CSV with time series events. B is a time based aggregated states series of the events. f can be “fetch & aggregate” a single file. But since many “sources” (S3 files) can add data, we should think about merging (reducing) multiple sources into one uber states series4. Thus we need g to “merge” two time based aggregated states into a single time based state series. Now, file sizes range from a few MBs, up to ~10GB. And this is important, because in the simple solution, we cannot start reduce-ing the small files, until we are done fetching and transforming the bigger 10GB files. Also, if the first A in the Vector is also mapped to the heaviest B, the reduce will be expansive since we always merging ~10GB B.

Lawful properties

In this case, there are interesting things to note about g5:

  • g is assocciative, it doesn’t matter how we group and merge two time based series states.
  • g is commutative, order is irrelevant. switch LHS with RHS, and get exactly the same result.

So now, our random pure-FP dude might say: great! your category B sounds very similar to a semilattice. Let’s also prove that g abides the idempotency law, and that g(b,b) == b for any \(b\), and maybe we’ll find a useful abstraction using semilattice properties.

Well, that FP-programmer is actually right. and all that high gibberish talk actually has solid foundations. And in fact, that function g over B I just described does define a semilattice. Bs has partial ordering6, g acts as the “join” operation, and we even have an identity element (the empty time series), so it is even a “Bounded Semilattice”.

But this is totally irrelevant and out of the question’s scope. Remember, we are working in an OO organization. If we start defining abstractions over semilattices (or maybe there are ones in cats or scalaz libraries? I really don’t know), The code will become one giant pile of gibberish in the eyes of my coworkers. And we don’t even need it. What is needed, with the realization that g is associative & commutative, is a very natural optimization that pops to mind: why wait for all A => B transformations? whenever 2 Bs are ready, any 2 Bs, we can immediately apply g and start the merge process.

So, without over abstracting, I came up with the following piece of code (brace yourselves):

def unorderedMapReduce[A, B](as: Vector[A])
                            (f: A => Future[B], g: (B,B) => Future[B])
                            (implicit executor: ExecutionContext): Future[B] = {

  val promises = Array.fill(2 * in.size - 1)(Promise.apply[B])
  var i = 0 // used for iteration optimization
            // (not needing to search for uncompleted
            // promise from start every time)

  def completeNext(tb: Try[B]): Unit = tb match {
    case Failure(e) => promises.last.tryFailure(e) // fail fast
    case Success(b) =>
      // We're not synchronizing i, since we can guarantee we'll always
      // get an i that is less than or equal to latest i written by any
      // other thread. But we cannot use i += 1 inside the loop,
      // since it may result with skipping a cell in 2 threads doing
      // the increment in parallel, so each thread get's an initial copy
      // of i as j, and only increment it's own copy. eventually,
      // we replace i with a valid value j (less than or equal to
      // first not used promise)
      var j = i
      var p = promises(j)
      while ((p eq null) || !p.trySuccess(b)) {
        j += 1
        p = promises(j)
      }
      i = j
  }

  as.foreach { a =>
    f(a).onComplete(completeNext)
  }

  promises
    .init
    .zipWithIndex
    .sliding(2,2)
    .toSeq
    .foreach { twoElems =>
      val (p1, i1) = twoElems.head
      val (p2, i2) = twoElems.last

      // andThen side effect is meant to release potentially heavy
      // elements that otherwise may stay for potentially long time
      val f1 = p1.future.andThen { case _ => promises(i1) = null }
      val f2 = p2.future.andThen { case _ => promises(i2) = null }
      failFastZip(f1,f2)
        .flatMap(g.tupled)
        .onComplete(completeNext)
    }
  promises.last.future
}

// regular Future.zip flatMaps first future, so it won't
// fail fast when 2nd RHS future completes first as failure.
def failFastZip[A,B](l: Future[A], r: Future[B])
                    (implicit executor: ExecutionContext): Future[(A,B)] = {
  val p = Promise[(A,B)]()
  l.onComplete(_.failed.foreach(p.tryFailure))
  r.onComplete(_.failed.foreach(p.tryFailure))
  for {
    a <- l
    b <- r
  } p.success(a -> b)
  p.future
}

I can imagine the horror on the face of our random FP-dude, an Array (mutable), vars, nulls, side effects,… OMG. John would not approve such abomination. But before you panic, bare with me and let’s break it down.

The what & how

The goal is to have a “bucket” of ready B elements, and whenever we have 2 or more Bs in the “bucket”, we take a pair out of the bucket, compute g on it, and when g returns with a new B, we put it right back in the “bucket”, and we continue to do so, until only a single finite B element is left in the bucket. this is our return value.

The way it is done, is simple. we have \(n\) A elements in the original sequence, each will be mapped to a single B, and since every 2 Bs are used to generate a new B, we need room for another \(n-1\) Bs in the bucket. Overall, \(2n-1\) elements, and our “bucket” can be just a simple Array. We also want to parallelize the computation as much as we can, so some construct to handle the asynchronous nature of the computation is needed. I used a Promise[B] to store the result. So our “bucket” is simply an Array[Promise[B]] of size \(2n-1\).

Have a look at the following figure: pic This illustrates the order of computation, when the order is:

  1. as(2)
  2. as(1)
  3. g(as(2),as(1))
  4. as(3)
  5. as(0)
  6. g(g(as(2),as(1)),as(3))
  7. g(as(0),g(g(as(2),as(1)),as(3)))

In the simple FP approach (traverse + reduce), same execution will result with the following computation order:

  1. as(2)
  2. as(1)
  3. as(3)
  4. as(0)
  5. g(as(0,as(1))
  6. g(g(as(0,as(1)),as(2))
  7. g(g(g(as(0,as(1)),as(2)),as(3))

Which means we don’t take advantage of the early completed computations.

More benefits

Using early completed computations isn’t the only reason I argue the suggested code is superior. It’s not just we start computations eagerly, implicitly it also means that heavy elements are left to be dealt in in the end, and we don’t need to reduce 10GB files at every step of the way7. We also get “fail fast” semantics; for example, if g fails for whatever reason on as(2) input, in the code I suggest, you fail fast since it is mapped directly as a failed Future result. Also, whenever a future completes, we have a side effect to clean up values that are not needed anymore, so we don’t leak.

Bottom line

What I try to emphasize using the example I showed, is that there are good reasons to write impure functional code. In cases like this, you enjoy both worlds. The interface is functional, it is referential transparent (as much as Future is considered referential transparent). Perhaps the above is achievable using pure functional style (I would love to know how. really!), but my gut is telling me it won’t be so simple. And as someone who works mostly in hybrid FP-OO teams, I can’t go full FP. Not at first anyway… Using Scala enables me to introduce a smoother transition into FP without getting my peers too angry or confused. It enables me to break the rules if I really need to. I wouldn’t sacrifice the impure parts of the language. I truly think they are too valuable.


  1. Other than maybe a tiny PR to coursier which uses scalaz.↩︎

  2. You are reading that person’s blog. Obviously :)↩︎

  3. Actually, I’d bet he would use Traversable and Task over std lib’s Future.traverse↩︎

  4. There are multiple ways to approach that problem, and actually, we ended up with another solution, that utilized akka-streams, merging multiple sources with a custom stage & scaning to output a stream of aggregated state in time order.↩︎

  5. Whenever I encounter such realization, I automatically translate it to property checks tests. So to test that g is lawful, all I needed is a very simple scalacheck test to test associativity & commutativity.↩︎

  6. think about b1 != b2, which means that either g(b1,b2) > b1 and also g(b1,b2) > b2, or that b1 and b2 already has order relation between them. It may be more obvious in some situations (and it was obvious in our case).↩︎

  7. Kind of reminds me of the matrix chain multiplication problem.↩︎

The next chapter

Every end is a new beginning

For the past 6 years I’ve been a part of the CM-Well development team. I’m writing this post with lot’s of mixed feelings. Working on CM-Well has been an awesome experience! I got a chance to work with so many amazing people. But now it’s time to move on, and I’m excited to start a new gig.

CM-Well - the early days

Last year we open-sourced CM-Well, and released the code on github. Doing so involved cleaning up the code, and getting rid off the commit history. Many team members who contributed a lot to the success of the project are not recognized. So, to get it out in the open, I opened the old repo, which has commits up until July last year, and I’m sharing some stats1.

$ git shortlog -sn --all
  2697  Gilad Hoch
  1015  yaakov
   954  michael
   914  israel
   739  Israel
   720  Vadim.Punski
   580  Gilad.Hoch
   450  Mark Zitnik
   310  Tzofia Shiftan
   234  Matan Keidar
   222  Mark.Zitnik
   170  Eli
   125  Yaakov Breuer
   102  Michael.Irzh
    95  Michael
    94  Israel Klein
    87  Tzofia.Shiftan
    58  Eli Orzitzer
    54  Michael Irzh
    44  Builder
    42  gilad
    31  DockerBuilder
    22  dudi
    14  Dudi
    14  Dudi Landau
    14  Yoni.Mataraso
    10  matan
     9  tserver
     8  Liorw
     5  Liya.Katz
     4  israel klein
     2  Shachar.Ben-Zeev
     2  Yoni Mataraso
     2  builder
     1  James Howlett
     1  Yaakov

These are just the commits in the old repo, not including any new commits in github. I also created visualizations using Gource for the old repo.

The first has files fading, and focuses on the contributors:

The second version gives an overview of the entire project:

The project is active, and has a lot of work invested in it, as you can see from the videos. But it doesn’t quite show how CM-Well is used internally. So I fetched some random access.log file from one of the servers, and wrote a little something to convert the log into a logstalgia acceptable format:

https://github.com/hochgi/logstalgia-access-log-converter

I took this opportunity to get to know mill build tool, and some libraries I wanted to experiment with. Long story short, I got 10 minutes of real CM-Well action on a single node (which is part of a cluster that has 20 web servers in it - so you’re only getting 1/20th of the action), and made a visualization using logstalgia with:

$ logstalgia -f -1280x720 --title "CM-Well access.log visualization"                \
    --screen 2 -p 0.2 -u 1 --background 75715e -x -g "meta,URI=/meta/.*$,10"        \
    -g "SPARQL,URI=/_sp?.*$,30" -g "_out,URI=/_out?.*$,10" -g "_in,URI=/_in?.*$,10" \
    -g "misc,URI=/.*,40" --paddle-mode single --paddle-position 0.75                \
    --disable-progress --font-size 25 --glow-duration 0.5 --glow-multiplier 2       \
    --glow-intensity 0.25 converted-access.log

And the output:

I gotta say, it came out pretty neat!2

Goodbye

I suck at goodbyes, so let me just say that I really loved working on CM-Well. It is a great project, and I hope to see it thrive. I will keep track of it, and plan to contribute occasionally on my spare time.


  1. In the early days, we used SVN, and we converted the repo to git at some point, which is why you see some duplicated names (that, and also we may have also committed from multiple users).↩︎

  2. Kinda get me into thinking I should write a logstash appender that streams real-time action directly into a logstalgia end point. It’s gotta be the coolest monitoring one can ask for…!↩︎

Unfolding streams like a boss (part 2)

Parallelizing resumable bulk consumes with CM-Well & akka-stream

In the previous post we introduced CM-Well’s consume API, and showed how it is tailored to be used with akka-http & akka-stream. This post will get into the gory details of how to squeeze the most out of CM-Well, using the bulk-consume API with akka-http & akka-stream.

The consume API in depth

There are two types of “consumes”, consume & bulk-consume. We will focus on the latter. But a few words on consume to not leave you hanging: consume just wraps a regular search, with a few extra filters. in terms of CM-Well’s qp, it translate to the following:

Given filters qp=$QP and an timestamp index-time=${ITIME:-0}, CM-Well generates the following (loosely) equivalent search parameters:

# set to current time minus 30 seconds
NOW=$(
  MILLIS=$(  date +%s%N | cut -b1-13 )
  calc $MILLIS - 30000
)

"?op=search
 &qp=system.indexTime>$ITIME,system.indexTime<$NOW,[$QP]
 &sort-by=system.indexTime
 &length=100"

It then fetches those (up to) 100 (by default) sorted results, and: if all results has the same index time SOME_ITIME, it will replace the previous op=search with op=stream, and previous qp=system.indexTime>$ITIME,system.indexTime<$NOW,[$QP] with qp=system.indexTime:$SOME_ITIME,[$QP]. else it will have multiple values, all sorted. it will drop all the tailing results with index time = $MAX_ITIME, and return the rest, with a token in the header setting the next $ITIME to be $MAX_ITIME - 1.

These are the basics, the are a few more concerns to take into consideration, and if thats interest you, go ahead and check the source code.

Understanding bulk-consume

In contrast to consume API, bulk-consume tries to be more efficient, and retrieve a lot more infotons per request à la stream style. Under the hood it uses Elasticsearch’s scroll API in a similar way to how we described stream API is made in the previous post. The problem is, that you can’t get sorted results with scroll from Elasticsearch. So, instead of advancing the timeline using sorted search, we filter results in advance.

This means there’s a pre-processing phase where we try to find a from and to timestamps, that are not “too far” apart, in terms of number of results, but enough to stream efficiently. CM-Well does it using a simple binary search to do so, and it tries to return a chunked response with O(1M) results (by default). There are many edge cases covered, like an early cutoff, if the binary search doesn’t converged fast enough, And dealing with near current time results, etc’…

Like consume, bulk-consume returns a position token in headers. In fact, the tokens are interchangeable between the 2 APIs, But those returned from a bulk-consume request, might contain some extra attributes. It turns out, that many pre-processing phases can be avoided if previous request stored an optional next “to” timestamp it might have encounterd during the binary search. So, what’s so great about the bulk-consume API? pipelined parallelization! You see, the next position token is given eagerly in the response headers, and a user can use it right away to fire up the next request. Since it will probably take some time to fetch all those O(1M) results, you could end up with as many parallel streams of data that you can handle.

But, you might ask: “What about failures? retrying?”, the answer is, that bulk consume also let’s you set upper bound timestamp explicitly. If your token was an optimized one, you can reuse it safely. If not, a new binary search might yield different time range, and you could end up with duplicate results, or worse, data gaps. To overcome this, you should supply a timestamp explicitly when retrying. But, what should you supply? Well, there’s another header for that. Other than X-CM-WELL-POSITION header, you also get X-CM-WELL-TO header, and the value is the upper bound timestamp found in the binary search. You should supply this timestamp using to-hint query parameter, and retry the bulk-consume request with it. Note that if the position token is optimized, to-hint will be ignored.

OK, got it. let’s write some code

As implied, we will show how to build an akka-stream Source of data from CM-Well, using unfoldFlow, Retry, and other cool constructs you can find on akka-stream & akka-stream-contrib libs.

The easy part (motivation)

Assuming we can somehow get:

type PositionToken = String
val initialPosition: PositionToken = ???
val consume: Flow[PositionToken,(PositionToken,ByteString),_] = ???

The work left is ridiculously easy thanks to unfoldFlow:

SourceGen.unfoldFlow(initialPosition)(consume)

And we’re done! OK… not really… It’s too simplified. unfoldFlow can’t unfold the next element until it gets the previous generated state. This means that all our fancy talk about pipelining parallelization isn’t being taken into consideration here. So let’s try and improve that. How ’bout:

val consume: Flow[PositionToken,(PositionToken,Source[ByteString,_]),_] = ???
SourceGen.unfoldFlow(initialPosition)(consume)
         .flatMapConcat(List.apply)

This is already much better. Each bulk-consume Source is being queried eagerly. But we still have a downside here… bulks are not evenly sized, and size is counted as the number of infotons in the bulk. Not their actual size… Moreover, we mentioned retries are supported using to-hint with X-CM-WELL-TO header’s value. So, if we are going to retry some streams, this means we need to buffer an entire chunk, and only emit once we know it is complete, so we don’t get duplicate results from retries. This implies a single bulk can get us “stuck” waiting for it. The 2 major problems are: * No matter how high we set up our parallelization factor, we could still end up back-pressuring our slow source (by slow, I mean that whatever the use-case, we must assume a fast consumer. e.g: flush to disk, which is much faster than our network calls). * Having $parallelization-factor × O(1M) all buffer into memory, makes our GC inefficient, due to objects kept in memory for long time. And also we cause our downstream to be in starvation until we “unstuck” the current bulk.

So, since bulk are not sorted according to timeline anyway, then no reason not to use merge instead of concat:

SourceGen.unfoldFlow(initialPosition)(consume)
         .flatMapMerge(identity)

Also, we will try to optimize even further. Our queries to bulk-consume are our “bottle-neck”. So, it is better to not pull in all the data with the bulk. let’s use a thin format, like tsv, which won’t return data itself, only a tuple consisting of infoton’s path, date, uuid, and indexTime. This way, we can at a later stage pull in data of small batches of infotons we got from the bulk consume. So our final higher level stream should either look like:

val consume: Flow[PositionToken,(PositionToken,Source[List[ByteString],_]),_] = ???
val addData: Flow[List[ByteString],ByteString,_] = ???
SourceGen.unfoldFlow(initialPosition)(consume)
         .flatMapMerge(identity)
         .mapConcat(identity)
         .via(addData)

where addData flow utilizes Balance to fan out and parallelize data fetching job, and then fan back in to construct a Flow shape which takes care of parallelization internally. Another option, is to use a simpler mapAsync(parallelizationFactor)(...) to get an easier way to parallelize the data fetching job. Or, it can look like:

val consume: Flow[PositionToken,(PositionToken,Source[List[ByteString],_]),_] = ???
val addData: Flow[List[ByteString],ByteString,_] = ???
SourceGen.unfoldFlow(initialPosition)(consume)
         .flatMapMerge(_.via(addData))

OK, armed with our goal in mind, let’s implement the consume flow: ### The detailed part Let’s try to break down & visualize the stream:

  • Retry:
    • bulk-consume ~> fold results into a single element
    • if #lines != X-CM-WELL-N header or any other failure:
      • retry with to-hint=${X-CM-WELL-TO}
    • else
      • emit results
  • Enrich with data:
    • batch paths
    • Balance vs. mapAsync parallelization of fetching (we’ll introduce both approaches)

Wrapping up

This might be covered in a future post. Currently, It has been sitting untouched for too long, and I’m “flushing” it. For now, implementation details are left as an excersize for the reader ;)

Unfolding streams like a boss (part 1)

How we utilize Akka streams in CM-Well

Unfolding is not a new concept, and this blog post isn’t about explaining a needlessly obfuscated ivory tower FP term. This post is about how a simple concept, of building a sequence from an initial state, and a function that generated a tuple of the next state, and an element, is used in CM-Well.

How our story begins

Initially, CM-Well didn’t have streaming APIs. The closest thing our users could get, was iterator API. There’s not much in this API regarding CM-Well’s logic, we just wrap Elasticsearch scroll API, with or without enriching the results by fetching data from Cassandra. We also use a little trick of storing the scrollId received from Elasticsearch in a short lived actor, and return the actor address instead, and while doing so, ensuring that the iterator id token isn’t “long”. We did this, to allow users to always use GET with a query parameter, what isn’t possible with Elasticsearch’s original scrollId, since it may not fit in a URL (> 4K).

Well, of course, users who used it, just wanted to get all the data. There really isn’t much point in having users do a request for every chunk. We wanted to just provide a streaming chunked response with all the data.

The story unfolds

The concept of unfold isn’t just about streams, as can be seen in CM-Well’s util, we also implemented it for regular collections1. But using it to unfold streams of data is a natural fit. Elasticsearch scroll API, makes a great example for how unfolding streams of data gets simple using unfold. We have the state (scrollId), and element (scroll results). So, we experimented with RX, Akka streams (which was very young, and experimental back then), and play’s iteratees. Trying to see which will unfold our data stream best.

We ended up with a fairly reasonable implementation with play’s iteratees. After all, the webservice was built with play, and at the time (play 2.3), we had to convert to iteratees anyway. Personally, I liked the concept, and hated the DSL2. But I do think one of the biggest drawbacks of iteratees, is that it perceives as “too complicated” for newbies. And it does have a steep leaning curve. Anyway, it served us pretty well, but as akka streams kept evolving, I began to squint back in it’s direction.

The akka-stream chapter

My initial go at unfolding a stream with akka streams was a low level implementation of a publisher actor (I even have an old gist of that implementation). It worked, but felt not very “akka-streamy”, and too low level. So I decided to try and submit my first PR to akka, and add unfold to official API. This initial PR, even show you can unfold an infinite stream with what akka had back then (unfoldInf was later removed, as it really didn’t add much value…). but unfolding finite and async streams, had to be taken care of in a custom GraphStage. Basically, I shamelessly copied play’s iteratees API, and implemented it in akka streams. I even named the methods unfold & unfoldM for the async version at first, following the iteratees convention (as can be seen in the original issue). Back to CM-Well, we used unfoldAsync to create a new API. The resulting stream API is pretty useful, and provides a simple way for users to downloads a complete dataset, with a single curl/wget command.

Time went by, CM-Well kept evolving, and we started using akka-stream more and more. Many parts of CM-Well, act as clients for the webservice. e.g: cmwell-dc, which is responsible (among other things) to inter data center synchronization. Or cmwell-data-tools which provides ways to download data from CM-Well, ingest data to CM-Well, or process with SPARQL queries using CM-Well. These modules uses an API that is tailored for akka-stream/akka-http consumption, I am referring to consume API. In this API, which is meant for stable resumable timebased data consumeption, and is based on consecutive searches with range filters rather than scroll in Elasticsearch, we decided to take a slightly different approach than what we did with the iterator API. influenced greatly by the way akka-http handles responses, we decided to provide the token in the response headers. After all, an HttpResponse in akka-http, has response headers available directly, where’s the response body has to be consumed from a Source[ByteString,Any]. The resulting code looked roughly like:

val mkRequest: String => Future[HttpResponse] = { token: String =>
  Http().singleRequest(HttpRequest( ... ))
}

Source.unfoldAsync(initialTokenRetrievedFromCreateConsumerAPI) { currentToken =>
  val res: Future[HttpResponse] = mkRequest(token)
  res.map {
    case HttpResponse(s, h, e, _) if s.isSuccess() => {
      h.find(_.name() == "X-CM-WELL-POSITION").flatMap { nextToken =>
        val next = nextToken.value()
        if(currentToken == next) None
        else {
          val data = e.dataBytes
          Some(next -> data)
        }
      }
    }
  }
}

But, if we are to take this one step further, we know that akka-http uses connection pool flows under the hood. And why not use the flow directly within our nice streamy code?

That’s exactly what I thought when I opened yet another pull request to akka (this time, to stream-contrib module), adding unfoldFlow. The snippet above could be re-written using unfoldFlow to something like:

val mkRequest: String => HttpRequest = (token: String) => ...
val connFlow = Http().superPool[String]()

SourceGen.unfoldFlow(initialTokenRetrievedFromCreateConsumerAPI)(
  Flow.fromFunction(mkRequest)
      .via(connFlow)
      .map {
        case (Success(HttpResponse(s, h, e, _)),currentToken) if s.isSuccess() => {
          h.find(_.name() == "X-CM-WELL-POSITION").flatMap { nextToken =>
            val next = nextToken.value()
            if(currentToken == next) None
            else {
              val data = e.dataBytes
              Some(next -> data)
            }
          }
        }
      }
)

There’s a lot not shown in this snippet, e.g: handling failures, or customizing connection pool, etc’… But this demonstrates how we built an API that fits perfectly to akka stream, and how we utilize it efficiently.

To be continued…

Next post, will be a bit more technical, and will show how users can unfold their own streams with akka-stream & CM-Well API more efficiently. So stay tuned for “parallelizing resumable bulk consumes with CM-Well & akka-stream”!


  1. Frankly, I’m surprised it’s not a built-in part of scala collections.↩︎

  2. I kept being asked to explain all the fish in the code…↩︎

A tale of bad framework choices

and how it led to interesting code

One of the most interesting pieces of code in CM-Well, IMO, is the http client util code. In short, it defines an API for a http client, and wraps akka-http which serves as the “http client engine”. Why not just use akka’s API, you might ask…? Well, we’ll get there, but first, a bit of history. (TL;DR: A story of why suffered from tightly coupled code, and how we solved it elegantly + code samples)

A long time ago

(In a galaxy far far away) CM-Well was written mostly in java1 (oh boy…), only the web-service logic was written in scala. The scala eco-system looked very different compared to what it is today. The most popular scala web framework at the time was lift, which was much more relevant than play! Back then play was very new, maybe 1.0 or 1.1, and was still written in java. So it was decided to write the CM-Well web service with lift. But not only the web service… You see, lift classes were already on the classpath. Especially lift’s HttpSupport from the testkit, which had reasonable API. So all of CM-Well’s integration tests were written using it. It served it’s purpose, and tests kept pilling up, until we decided to upgrade scala from 2.9 to 2.10. lift was holding us back, and play started to look more and more attractive. We replaced lift with play! but decided to rewrite the integration tests without using play’s testkit or http client. After all, what if down the road we would want to replace it with something else? We didn’t want to be tied up to much to the web framework, so we minimized our dependency in play as much as possible. For the integration tests, we decided to go with a popular scala http client instead. The infamous Dispatch (don’t judge… back then it was popular). For those who are not familiar with dispatch, it’s a scala http client library, which makes heavy use of symbolic operators (check out the Periodic Table). Other than making the code less readable with all those fancy operators to new-comers, it served it’s purpose pretty good. That is until… Yup. It was time to upgrade scala again, and move from version 2.10 to 2.11. And again, our integration tests (and other code that made use of an http client), were written with a library that didn’t move fast enough. It held us back, causing a “jar hell” of unwanted & outdated dependencies… and we grew tired of all the weird operators. But no one wanted to rewrite all the tests again… that’s a lot of dirty work. We hacked, and managed to get by with using it only in tests, so at least we didn’t had the jar hell on our main artifacts classpath, just in tests. Other code in main artifacts that needed a http client, used whatever was on the classpath directly. Be it play’s WS, or spray’s client, it didn’t really mattered. But time went by, and tests kept pilling up, and it was way due to clean the code. Being a bit wiser from the bad experience, we decided to make sure that tests code will never hold us back again from changing libraries and frameworks. We decided to write a thin wrapper with a sane & simple asynchronous http client API. but wrapper for what? well… it doesn’t matter. That was the whole point; if the wrapping layer is small enough, we can always change the underlying library easily, and won’t have to patch up thousands lines of testing code if we ever switch to another library. Anyway, we needed to pick up something, and at the time, we were really excited about the recent developments in akka. It was 2015, and the akka-stream & akka-http experimental modules came out. We decided to check it out, but the experimental modules were too risky for production code, which made it a perfect candidate to serve in our tests as a dispatch replacement, without affecting production code. This was 2 birds in 1 stone - evaluating an interesting technology in it’s early stages, with real code, without risking anything crucial, and using our thin wrapper to decouple test’s logic from the http client library. P.S. to be on the safe side, and for the sport of it, we started to implement the same thin API on top of ning’s AsyncHttpClient, but never really continued with it, since akka-http got the job done perfectly. But some remnants stayed commented out in the sources, waiting for the day that will never come.

We ❤ Akka-http

Choosing akka was challenging. It introduces this new concept of streams, which, at least in tests, we wanted to abstract away for simplicity. But then again, just consuming everything eagerly, hiding completely the reactive nature of the API, and returning a future of response when everything is done, is asking for performance troubles. We needed a simple default, with an easy way of exploiting the asynchrony & reactive capabilities of akka-http. For that, we made heavy use of type classes, in a slightly adapted version of what is known as the magnet pattern.

The gory details

Let’s see some code, shall we..? starting with the API itself:

object SimpleHttpClient {

  // web sockets API is more complex, and out of the scope for this post,
  // but is shown here for completeness, as it is part of the API.
  // You are more than welcome to check out the source code.
  def ws[T : SimpleMessageHandler](uri: String,
         initiationMessage: T,
         subprotocol: Option[String] = None,
         queryParams: Seq[(String,String)] = Nil,
         headers: Seq[(String,String)] = Nil)(react: T => Option[T])
        (implicit ec: ExecutionContext,
         as: ActorSystem = this.sys,
         mat: Materializer = this.mat) = ...

  def get[T : SimpleResponseHandler](uri: String,
          queryParams: Seq[(String,String)] = Nil,
          headers: Seq[(String,String)] = Nil)
         (implicit ec: ExecutionContext,
          as: ActorSystem = this.sys,
          mat: Materializer = this.mat) = ...

  def put[T : SimpleResponseHandler](uri: String,
          body: Body,
          contentType: Option[String] = None,
          queryParams: Seq[(String,String)] = Nil,
          headers: Seq[(String,String)] = Nil)
         (implicit ec: ExecutionContext,
          as: ActorSystem = this.sys,
          mat: Materializer = this.mat) = ...

  def post[T : SimpleResponseHandler](uri: String,
           body: Body,
           contentType: Option[String] = None,
           queryParams: Seq[(String,String)] = Nil,
           headers: Seq[(String,String)] = Nil)
          (implicit ec: ExecutionContext,
           as: ActorSystem = this.sys,
           mat: Materializer = this.mat) = ...

  def delete[T : SimpleResponseHandler](uri: String,
             queryParams: Seq[(String,String)] = Nil,
             headers: Seq[(String,String)] = Nil)
            (implicit ec: ExecutionContext,
             as: ActorSystem = this.sys,
             mat: Materializer = this.mat) = ...

}

Overall, this looks like a pretty straight forward http client API. Let’s try to clear the fog from the unclear parts: Each of the methods returns a Future[SimpleResponse[T]]. I know what you might be thinking… I said simple API, and here I am, showing some fancy code with weird classes, right…? I’ll list down what might be interesting here:

  • implicit as: ActorSystem = this.sys & mat: Materializer = this.mat
  • Body
  • SimpleResponse[T] & SimpleResponseHandler

ActorSystem & Materializer

In akka-http, in order to handle http requests & reponses, you’ll need to get a hold of a HttpExt, which takes an ActorSystem in Http’s factory method. Also, a connection flow to build the request graph around it is needed. To make things simple, we use superPool which returns a flow that routes requests through cached (per host) connection pools, and is managed by akka. It needs a Materializer. We also need a Materializer for running a simple graph per request. something like:

Source.single(request -> context).via(connectionPool).runWith(Sink.head)

Which performs the request and return a Future[Try[HttpResponse]]. Lastly, we’ll need the Materializer also to handle akka’s HttpResponse, which returns the payload in the form of Source[ByteString,_]. Remember, we wanted to abstract away anything that binds us to the library, so we can’t leak (unless we want to, more on that is to follow) akka’s classes. Not Source nor ByteString. We need to convert it to something else. Anyway, as you can see, it’s needed. But if you pay attention, you’ll see it has default values. This let’s us provide reasonable defaults, which can be configured freely using standard typesafe’s config. The provided reference.conf only defines the bare minimum:

cmwell.util.http {
  akka {
    actor {
      provider = "akka.actor.LocalActorRefProvider"
    }
    http {
      host-connection-pool {
        max-open-requests = 1024
      }
    }
  }
}

And as you might have already guessed, the provided actor system is configured using:

ActorSystem("SimpleHttpClient",ConfigFactory.load().getConfig("cmwell.util.http"))

Also, the provided Materializer & ActorSystem are LAZY (as in lazy val), So it won’t even get instantiated if this code is run within production code which makes sure to supply a fully configured ActorSystem & Materializer. But, you might ask: isn’t this binds us to akka? well, technically, yes. in practice, materializer and actor system are passed implicitly, so it’s not written in code (keeping aside some very rare cases). I.E: in the tests, you don’t see any reference to any materializer or actor system, and we are still loosely coupled, thanks to scala being such a flexible language when it comes to defaults & implicits.

Body

The post & put methods also take a mysterious body: Body, so what is it? Of course, as the name suggests it’s the request body. But, you might ask: Should a user be troubled with creating such objects? The answer is no. The Body companion object hosts some pretty useful implicits:

sealed trait Body {
  def entity(contentType: Option[String]): RequestEntity
  def contentType(ct: String): akka.http.scaladsl.model.ContentType = ...
}

object Body {
  import scala.language.implicitConversions

  implicit def apply(body: String): Body = new BodyFromString(body)
  implicit def apply(body: Array[Byte]): Body = new BodyFromBytes(body)
  implicit def apply(body: ByteString): Body = new BodyFromByteString(body)

  private class BodyFromString(body: String) extends Body {  ...  }
  private class BodyFromBytes(body: Array[Byte]) extends Body {  ...  }
  private class BodyFromByteString(body: ByteString) extends Body {  ...  }
}

This means, that you may pass the body argument as whatever you want, be it a String, a Array[Byte] or even akka’s ByteString. And if we ever need something else, it’s very easy to add more automatically acceptable types. We can just add another implicit conversion in Body’s companion object. or, if it’s a special case, then just instantiate a new Body locally, or write your own implicit conversions.

SimpleResponse[T] & SimpleResponseHandler

SimpleResponse is the reponse we get back from executing the request, it’s a pretty simple case class:

object SimpleResponse {

  type ContentType = String
  type ResponseBody[T] = (ContentType, T)

  ...
}

case class SimpleResponse[T : SimpleResponseHandler](status: Int,
                                                     headers: Seq[(String,String)],
                                                     body: ResponseBody[T]) {
  def contentType = body._1
  def payload = body._2

  override def toString() = ...
}

It has an Int for status, a Seq[(String,String)] for headers, and a ResponseBody[T], which is just a tuple of the mimetype (String) and the body, which can be anything that has a SimpleResponseHandler. All methods in exposed API has a type parameter T that are context bound to SimpleResponseHandler, which is the type class responsible of generating the appropriate response for us from the response returned by underlying library - i.e: akka. It means we need an implicit SimpleResponseHandler[T] in scope. Now, please look carefully at the methods signature; none of the parameters has type T. So, you might think this means the compiler cannot infer the type, and user always have to explicitly write it down? The answer, is no. let’s try it out in the REPL:

scala> val res = SimpleHttpClient.get("http://google.com")
res: scala.concurrent.Future[cmwell.util.http.SimpleResponse[Array[Byte]]] = ...

What happens here, is that there is a single implicit that’s available in SimpleResponseHandler companion, and thus is taken by the compiler (If only one implicit instance of SimpleResponseHandler for some T can be found, than that is what’s being picked up, regardless of what T is). This one implicit has the most general type for a response body. It is simply an Array[Byte]. So, if the reponse can fit in memory, it’ll be returned as an Array[Byte]:

trait SimpleResponseHandler[T] {
  def mkStringRepr(t: T): String
  def mkResponseOf(status: Int,
                   headers: Seq[(String,String)],
                   contentType: String,
                   dataBytes: Source[ByteString,Any])
                  (implicit ec: ExecutionContext): Future[SimpleResponse[T]]
}

object SimpleResponseHandler {

  implicit object ByteArrayHandler extends SimpleResponseHandler[Array[Byte]] {
    ...
  }
}

But, if you want something else, all you have to do is import the appropriate implicit (more sensible implicit instances of SimpleResponseHandler can be found in SimpleResponse.Implicits:

object SimpleResponse {

  ...

  // if you want a SimpleResponse[T] for T != Array[Byte],
  // import a SimpleResponseHandler[T] from here (or implement your own)
  object Implicits {

    implicit object InputStreamHandler extends SimpleResponseHandler[InputStream] {
      ...
    }

    implicit object UTF8StringHandler extends SimpleResponseHandler[String] {
      ...
    }
  }
}

So if response body cannot fit in memory, for instance, simply import the appropriate implicit handler:

scala> import cmwell.util.http.SimpleResponse.Implicits.InputStreamHandler
import cmwell.util.http.SimpleResponse.Implicits.InputStreamHandler

scala> val res = SimpleHttpClient.get("http://google.com")
res: scala.concurrent.Future[cmwell.util.http.SimpleResponse[java.io.InputStream]] = ..

It just works because imported implicits takes precedence over implicits defined in the type class’ companion. Of course, If you import more than one handler, you’ll have to explicitly mention the type, or you’ll get a compiler error for “ambiguous implicit values”.

Nice, so… it seems pretty solid. What else?

The implementation we have in CM-Well’s code base is far from complete:

  • Not all http methods are defined (e.g: HEAD)
  • One can think of way more sensible generic response handlers to add in Implicits object (e.g: one that simply returns the Source[ByteString,_] from akka’s response directly)
  • Classes can be arranged better, in separate files.

Basically, this API is being extended lazily, when we need to add something new, and is not a complete solution.

OK, let’s wrap it up

This post is already getting longer than I thought, and we haven’t covered web sockets API, or how we convert akka’s classes to simple types. So lets leave it as an exercise for the reader 😉. The bottom line of this post, is how we ended up with a nice API, which is very flexible, extendable, and implemented in not too many lines of code. The road was a bit bumpy, but the lesson learned was worth it.

P.S.

If you want to get your hands dirty, we would love to get some PRs! (regarding the code shown here, or any other part of CM-Well).


  1. In fact, CM-Well started out as a POC with python & django, and only later was implemented in java & scala.↩︎