Search This Blog


Showing posts with label async. Show all posts
Showing posts with label async. Show all posts

FP-ish patterns

Background

My twitter feed is becoming more and more (pure) FP oriented. Many of the people I follow from the Scala community are advocating for Haskell over Scala lately. To be honest, I was always intrigued about pure FP, but never got to use it in the “real world”1. I always worked in teams that had OOP mindset, and codebase. With Scala I was able to shift the balance a little bit towards FP. I advocated for referential transparency, strongly typed over stringly typed code, higher order functions, higher kinded types, typeclasses, etc’… I aimed to be more FP in an OO world. But I never managed to get my peers into learning together, and using, “pure FP” with libraries like cats or scalaz. So I wonder, does pure FP really is the answer - the silver bullet - I’m looking for, or should we tilt the balance less towards purity in Scala?

Harsh criticism against impure Scala

In the latest scalapeno conference, I attended John De Goes’ keynote “The Last Hope for Scala’s Infinity War”. In the talk, John claimed that: > “Some of the proposed features in Scala 3 are targeting someone who doesn’t exist”

He referred to features like Type Classes & Effect System. features for no one

But the truth is, that john was wrong. There is at least one person that wants such features2. There is a middle-ground. And I want to believe I’m not alone, and some other people see the value Scala has to offer for FP lovers (wannabes?) working with traditional OO programmers, trying to make a difference. neither

Like John, I love Scala. but unlike John, I don’t think we need to sacrifice half the community

And I don’t want to see all the crossed out features removed from Scala: liabilities?

Scala is expressive

I was recently asked, why I love Scala. I answered that with Scala, It’s very easy for me to express exactly what I mean for the program to do, and that it’s easy to expose beautiful (and safe) functional interface, while encapsulating impure logic and gaining the benefit of both worlds. I was then asked to give an example, of short code that I find easy to express in Scala, and hard(er) to write in other languages. I choose something I’ve written a while ago, and I find as a nice example for such FP-ish pattern.

def mapr[A,B](as:         Vector[A])
             (f:    A  => Future[B])
             (g: (B,B) => Future[B]): Future[B]

So… what’s so special about mapr? A quick scala thinker will implement it with something like3:

Future.traverse(as)(f)
      .flatMap(_ reduce g) // if g were (B,B) => B

And it can be fine, considering the usecase. It wasn’t fine in our case.

The “real world” case

Consider A to be file names to fetch from a S3 bucket. But not just any file, it’s a CSV with time series events. B is a time based aggregated states series of the events. f can be “fetch & aggregate” a single file. But since many “sources” (S3 files) can add data, we should think about merging (reducing) multiple sources into one uber states series4. Thus we need g to “merge” two time based aggregated states into a single time based state series. Now, file sizes range from a few MBs, up to ~10GB. And this is important, because in the simple solution, we cannot start reduce-ing the small files, until we are done fetching and transforming the bigger 10GB files. Also, if the first A in the Vector is also mapped to the heaviest B, the reduce will be expansive since we always merging ~10GB B.

Lawful properties

In this case, there are interesting things to note about g5:

  • g is assocciative, it doesn’t matter how we group and merge two time based series states.
  • g is commutative, order is irrelevant. switch LHS with RHS, and get exactly the same result.

So now, our random pure-FP dude might say: great! your category B sounds very similar to a semilattice. Let’s also prove that g abides the idempotency law, and that g(b,b) == b for any \(b\), and maybe we’ll find a useful abstraction using semilattice properties.

Well, that FP-programmer is actually right. and all that high gibberish talk actually has solid foundations. And in fact, that function g over B I just described does define a semilattice. Bs has partial ordering6, g acts as the “join” operation, and we even have an identity element (the empty time series), so it is even a “Bounded Semilattice”.

But this is totally irrelevant and out of the question’s scope. Remember, we are working in an OO organization. If we start defining abstractions over semilattices (or maybe there are ones in cats or scalaz libraries? I really don’t know), The code will become one giant pile of gibberish in the eyes of my coworkers. And we don’t even need it. What is needed, with the realization that g is associative & commutative, is a very natural optimization that pops to mind: why wait for all A => B transformations? whenever 2 Bs are ready, any 2 Bs, we can immediately apply g and start the merge process.

So, without over abstracting, I came up with the following piece of code (brace yourselves):

def unorderedMapReduce[A, B](as: Vector[A])
                            (f: A => Future[B], g: (B,B) => Future[B])
                            (implicit executor: ExecutionContext): Future[B] = {

  val promises = Array.fill(2 * in.size - 1)(Promise.apply[B])
  var i = 0 // used for iteration optimization
            // (not needing to search for uncompleted
            // promise from start every time)

  def completeNext(tb: Try[B]): Unit = tb match {
    case Failure(e) => promises.last.tryFailure(e) // fail fast
    case Success(b) =>
      // We're not synchronizing i, since we can guarantee we'll always
      // get an i that is less than or equal to latest i written by any
      // other thread. But we cannot use i += 1 inside the loop,
      // since it may result with skipping a cell in 2 threads doing
      // the increment in parallel, so each thread get's an initial copy
      // of i as j, and only increment it's own copy. eventually,
      // we replace i with a valid value j (less than or equal to
      // first not used promise)
      var j = i
      var p = promises(j)
      while ((p eq null) || !p.trySuccess(b)) {
        j += 1
        p = promises(j)
      }
      i = j
  }

  as.foreach { a =>
    f(a).onComplete(completeNext)
  }

  promises
    .init
    .zipWithIndex
    .sliding(2,2)
    .toSeq
    .foreach { twoElems =>
      val (p1, i1) = twoElems.head
      val (p2, i2) = twoElems.last

      // andThen side effect is meant to release potentially heavy
      // elements that otherwise may stay for potentially long time
      val f1 = p1.future.andThen { case _ => promises(i1) = null }
      val f2 = p2.future.andThen { case _ => promises(i2) = null }
      failFastZip(f1,f2)
        .flatMap(g.tupled)
        .onComplete(completeNext)
    }
  promises.last.future
}

// regular Future.zip flatMaps first future, so it won't
// fail fast when 2nd RHS future completes first as failure.
def failFastZip[A,B](l: Future[A], r: Future[B])
                    (implicit executor: ExecutionContext): Future[(A,B)] = {
  val p = Promise[(A,B)]()
  l.onComplete(_.failed.foreach(p.tryFailure))
  r.onComplete(_.failed.foreach(p.tryFailure))
  for {
    a <- l
    b <- r
  } p.success(a -> b)
  p.future
}

I can imagine the horror on the face of our random FP-dude, an Array (mutable), vars, nulls, side effects,… OMG. John would not approve such abomination. But before you panic, bare with me and let’s break it down.

The what & how

The goal is to have a “bucket” of ready B elements, and whenever we have 2 or more Bs in the “bucket”, we take a pair out of the bucket, compute g on it, and when g returns with a new B, we put it right back in the “bucket”, and we continue to do so, until only a single finite B element is left in the bucket. this is our return value.

The way it is done, is simple. we have \(n\) A elements in the original sequence, each will be mapped to a single B, and since every 2 Bs are used to generate a new B, we need room for another \(n-1\) Bs in the bucket. Overall, \(2n-1\) elements, and our “bucket” can be just a simple Array. We also want to parallelize the computation as much as we can, so some construct to handle the asynchronous nature of the computation is needed. I used a Promise[B] to store the result. So our “bucket” is simply an Array[Promise[B]] of size \(2n-1\).

Have a look at the following figure: pic This illustrates the order of computation, when the order is:

  1. as(2)
  2. as(1)
  3. g(as(2),as(1))
  4. as(3)
  5. as(0)
  6. g(g(as(2),as(1)),as(3))
  7. g(as(0),g(g(as(2),as(1)),as(3)))

In the simple FP approach (traverse + reduce), same execution will result with the following computation order:

  1. as(2)
  2. as(1)
  3. as(3)
  4. as(0)
  5. g(as(0,as(1))
  6. g(g(as(0,as(1)),as(2))
  7. g(g(g(as(0,as(1)),as(2)),as(3))

Which means we don’t take advantage of the early completed computations.

More benefits

Using early completed computations isn’t the only reason I argue the suggested code is superior. It’s not just we start computations eagerly, implicitly it also means that heavy elements are left to be dealt in in the end, and we don’t need to reduce 10GB files at every step of the way7. We also get “fail fast” semantics; for example, if g fails for whatever reason on as(2) input, in the code I suggest, you fail fast since it is mapped directly as a failed Future result. Also, whenever a future completes, we have a side effect to clean up values that are not needed anymore, so we don’t leak.

Bottom line

What I try to emphasize using the example I showed, is that there are good reasons to write impure functional code. In cases like this, you enjoy both worlds. The interface is functional, it is referential transparent (as much as Future is considered referential transparent). Perhaps the above is achievable using pure functional style (I would love to know how. really!), but my gut is telling me it won’t be so simple. And as someone who works mostly in hybrid FP-OO teams, I can’t go full FP. Not at first anyway… Using Scala enables me to introduce a smoother transition into FP without getting my peers too angry or confused. It enables me to break the rules if I really need to. I wouldn’t sacrifice the impure parts of the language. I truly think they are too valuable.


  1. Other than maybe a tiny PR to coursier which uses scalaz.↩︎

  2. You are reading that person’s blog. Obviously :)↩︎

  3. Actually, I’d bet he would use Traversable and Task over std lib’s Future.traverse↩︎

  4. There are multiple ways to approach that problem, and actually, we ended up with another solution, that utilized akka-streams, merging multiple sources with a custom stage & scaning to output a stream of aggregated state in time order.↩︎

  5. Whenever I encounter such realization, I automatically translate it to property checks tests. So to test that g is lawful, all I needed is a very simple scalacheck test to test associativity & commutativity.↩︎

  6. think about b1 != b2, which means that either g(b1,b2) > b1 and also g(b1,b2) > b2, or that b1 and b2 already has order relation between them. It may be more obvious in some situations (and it was obvious in our case).↩︎

  7. Kind of reminds me of the matrix chain multiplication problem.↩︎

Async Scheduling & Retrying revised

In my previous blog post, useful async scala snippets, I described a way to retry an asynchronous task that may fail. The code Iv’e suggested was:

def retry[T](maxRetries: Int, waitBetweenRetries: Option[FiniteDuration] = None)
            (task: =>Future[T])
            (implicit ec: ExecutionContext): Future[T] = {
  require(maxRetries > 0, "maxRetries must be positive")
  if(maxRetries > 1) task.recoverWith{
    case t: Throwable => waitBetweenRetries match {
      case None => retry(maxRetries - 1)(task)
      case Some(waitTime) => Future[Unit]{
        try {
          Await.ready(Promise().future, waitTime)
        }
        catch {
          case _: TimeoutException => ()
        }
      }.flatMap(_ => retry(maxRetries - 1, waitBetweenRetries)(task))
    }
  } else task
}

Now, I want to suggest a better alternative. So, what’s wrong with this first solution? well, let’s start with showing the alternate suggestion, and compare the 2 snippets:

Firstly, we need to get a hold of some simple scheduler. If you have akka’s ActorSystem available, then you can use system.scheduler.scheduleOnce. If not, let’s write simple (somewhat naïve) implementation:

object SimpleScheduler {
  // if we never schedule a job, we shouldn't waste the resources. 
  // So let's define our thread pool lazily.
  private[this] lazy val timer = java.util.concurrent.Executors.newScheduledThreadPool(1)

  def schedule[T](duration: FiniteDuration)
                 (body: => T)
                 (implicit executionContext: ExecutionContext): Future[T] = {
    val p = Promise[T]()
    timer.schedule(new Runnable {
      override def run(): Unit = {
        // body may be expensive to compute, 
        // and must not be run in our only timer thread expense,
        // so we compute the task inside a `Future`,
        // and make it run on the expense of the given executionContext.
        p.completeWith(Future(body)(executionContext))
      }
    },duration.toMillis,java.util.concurrent.TimeUnit.MILLISECONDS)
    p.future
  }

  // The given `body` must not do any work on current thread. 
  // We have no way to enforce it,
  // but we must be careful not do pass a task that looks something like:
  // {{{
  //   val result: Result = doSomethingRealyExpensive()
  //   Future.successful(result)
  // }}}
  // remember we only have a single Thread to schedule all tasks.
  def scheduleFuture[T](duration: FiniteDuration)
                       (body: => Future[T]): Future[T] = {
    val p = Promise[T]()
    timer.schedule(new Runnable {
      override def run(): Unit = p.completeWith(body)
    },duration.toMillis,java.util.concurrent.TimeUnit.MILLISECONDS)
    p.future
  }
}

OK, so now we have our scheduler set up, and we can write our retry function:

/**
 * @param maxRetries max numbers to retry the task
 * @param delay "cool-down" wait period
 * @param task the task to run
 */
def retry[T](maxRetries: Int, delay: FiniteDuration = Duration.Zero)
            (task: => Future[T])
            (implicit ec: ExecutionContext): Future[T] = {
  require(maxRetries > 0, "maxRetries must be positive")
  require(delay >= Duration.Zero, "delay must be non-negative")
  if (maxRetries == 1) task
  else task.recoverWith {
    case _: Throwable =>
      if(delay == Duration.Zero) retry(maxRetries - 1)(task)
      else SimpleScheduler.scheduleFuture(delay)(retry(maxRetries - 1)(task))
  }
}

So what makes this implementation better? besides a minor API change (taking a FiniteDuration without unnecessary boxing of it in an Option). we have less threads contexts switches, and overall this is more efficient. In the first implementation we used:

Future[Unit]{
  try {
    Await.ready(Promise().future, ...

If we’ll count the number of “tasks” to execute at the expense of the given ExecutionContext, we’ll see that we have the first Future.apply, and inside it’s body, we call Await.ready, which uses blocking{...} behind the scenes, which means we allow the ExecutionContext to allocate an extra thread if needed, but all of these only to wait for a future which will never complete, and throw an Exception, and what we really need is only the time-out, so we can try the original given task again. A lot of unneeded work, and threads context-switch with the possibility to cause the allocation of a new thread.

Now, let’s examine the revised implementation. we already have a thread to handle the execution when it is scheduled. we’re just scheduling a task it should run. the only “extra” work we are doing, is creating a new Runnable, which when run, will complete a promise with the task’s output.

One thing we must pay attention to, is that we must not perform any work at the expense of the scheduling thread. This thread should be available for other scheduling tasks. Note that I also added a regular scheduling method, in which the given (synchronous) task is wrapped inside a Future{...} block. this is to make sure the task is run at the expense of the given (implicitly) ExecutionContext, and not on the scheduling thread.

Useful Async Scala Snippets

Scala makes it easy to work asynchronously with futures. After some time working with scala async constructs, we’ve seen some repetitive patterns. So here’s a few snippets that may be useful for others:

  • select - selecting the first future that completes, if there are multiple already completed futures, selects one of those. given a collection of futures, returns a Future[(Try[T],Coll[Future[T]])], or in words: a Future of tuple of the first completed future’s Try and the collection of the rest of the futures.
def select[T,Coll](fc: Coll)
          (implicit ec: ExecutionContext, 
           ev: Coll <:< TraversableLike[Future[T],Coll]): Future[(Try[T],Coll)] = {
  if (fc.isEmpty)
    Future.failed(new IllegalArgumentException("select from empty collection"))
  else {
    val p = Promise[(Try[T], Future[T])]()
    fc.foreach { f =>
      f.onComplete { t =>
        if (!p.isCompleted)
          p.trySuccess(t -> f)
      }
    }
    p.future.map {
      case (t, f) =>
        t -> fc.filter(_ != f)
    }
  }
}

sidenote: there is a similar gist by @viktorklang which inspired this snippet (difference is on the collection return type). you can find it here

  • successes - given a sequence of futures, return a future of a sequence containing all the futures that succeeded. one of the most frequently used Future‘s methods in our code base, is Future.sequence. but sometimes, you’ll need a “softer” method, that will collect all the succeeding futures’ elements, and won’t fail if a few futures failed.
def successes[A, M[X] <: Traversable[X]](in: M[Future[A]])
             (implicit ec: ExecutionContext, 
              cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = {
  in.foldLeft(Future.successful(cbf(in))) {
    (fr, fa) => {
      fa.flatMap(a => fr.map(_ += a)(ec))(ec).recoverWith{case _: Throwable => fr}(ec)
    }
  }.map(_.result())
}

sidenote: this is almost the same as Future.sequence method. (check the scala source code)

  • stream - being async is great, but also being lazy. sometime, you’ll want to convert part of your logic to use a regular collections instead of collections containing futures (you’ll need a really good reason to do so). to do this efficiently, you’ll want to convert a sequence of futures to a Stream, where each element in the stream is given as soon as it’s corresponding future completes. i.e: sort a collection of futures by they’re completion time. so here’s how to do async → lazy conversion:
def stream[T,Coll](fc: Coll, timeout: FiniteDuration)
                  (implicit ec: ExecutionContext, 
                   ev: Coll <:< TraversableLike[Future[T],Coll]): Stream[Try[T]] = {
  if (fc.isEmpty) Stream.empty[Try[T]]
  else try {
    Await.result(select(fc).map {
      case (t, coll) => t #:: stream(coll, timeout)
    }, timeout)
  }
  catch {
    case e: TimeoutException => Stream(Failure[T](e))
  }
}
  • retry - sometime, you’ll deal with futures that may fail, and you’ll want to retry whatever task that created the future in the first place, with an optional delay between retries. here’s a simple way to do it: The old snippets had many flaws. please refer to the following blog post to compare the version given here, with the new improved version.

  • travector - The following is a tweak of Future.traverse. In our code, we wanted to improve performance, and decided to use internally in Vector instead of the more general seq (defaults to List). Vector is better for cache locality, and perform better with concatenations or appending to the end. So, we had in many places methods that take a Seq[Something], and return Future[Seq[SomethingElse]]. behind the scenes, it was Future.traverse that did the work. And now, we replaced it with: travector!

/** 
 * Transforms a `TraversableOnce[A]` into a `Future[Vector[B]]` 
 * using the provided function `A => Future[B]`.
 * This is useful for performing a parallel map. 
 * For example, to apply a function to all items of a list in parallel:
 *
 * {{{
 *   val myFutureVector = Future.travector(myList)(x => Future(myFunc(x)))
 * }}}
 *
 * This is the same as `Future.traverse`,
 * but it will always yield a vector, 
 * regardless of the provided collection type.
 */
def travector[A, B, M[X] <: TraversableOnce[X]]
             (in: M[A])
             (fn: A => Future[B])
             (implicit ec: ExecutionContext): Future[Vector[B]] =
  in.foldLeft(Future.successful(Vector.newBuilder[B])) { (fr, a) =>
    val fb = fn(a)
    for (r <- fr; b <- fb) yield (r += b)
  }.map(_.result())

The code is pretty much copied from Future.traverse (check & compare), but this simple little trick gained us some perf boost.