Scala makes it easy to work asynchronously with futures. After some time working with scala async constructs, we’ve seen some repetitive patterns. So here’s a few snippets that may be useful for others:
- select - selecting the first future that completes, if there are multiple already completed futures, selects one of those. given a collection of futures, returns a
Future[(Try[T],Coll[Future[T]])]
, or in words: a Future of tuple of the first completed future’sTry
and the collection of the rest of the futures.
def select[T,Coll](fc: Coll)
implicit ec: ExecutionContext,
(Coll <:< TraversableLike[Future[T],Coll]): Future[(Try[T],Coll)] = {
ev: if (fc.isEmpty)
Future.failed(new IllegalArgumentException("select from empty collection"))
else {
val p = Promise[(Try[T], Future[T])]()
foreach { f =>
fc.onComplete { t =>
f.if (!p.isCompleted)
trySuccess(t -> f)
p.
}
}future.map {
p.case (t, f) =>
filter(_ != f)
t -> fc.
}
} }
sidenote: there is a similar gist by @viktorklang which inspired this snippet (difference is on the collection return type). you can find it here
- successes - given a sequence of futures, return a future of a sequence containing all the futures that succeeded. one of the most frequently used
Future
‘s methods in our code base, isFuture.sequence
. but sometimes, you’ll need a “softer” method, that will collect all the succeeding futures’ elements, and won’t fail if a few futures failed.
def successes[A, M[X] <: Traversable[X]](in: M[Future[A]])
implicit ec: ExecutionContext,
(CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = {
cbf: foldLeft(Future.successful(cbf(in))) {
in.
(fr, fa) => {flatMap(a => fr.map(_ += a)(ec))(ec).recoverWith{case _: Throwable => fr}(ec)
fa.
}map(_.result())
}. }
sidenote: this is almost the same as Future.sequence
method. (check the scala source code)
- stream - being async is great, but also being lazy. sometime, you’ll want to convert part of your logic to use a regular collections instead of collections containing futures (you’ll need a really good reason to do so). to do this efficiently, you’ll want to convert a sequence of futures to a
Stream
, where each element in the stream is given as soon as it’s corresponding future completes. i.e: sort a collection of futures by they’re completion time. so here’s how to do async → lazy conversion:
def stream[T,Coll](fc: Coll, timeout: FiniteDuration)
implicit ec: ExecutionContext,
(Coll <:< TraversableLike[Future[T],Coll]): Stream[Try[T]] = {
ev: if (fc.isEmpty) Stream.empty[Try[T]]
else try {
Await.result(select(fc).map {
case (t, coll) => t #:: stream(coll, timeout)
}, timeout)
}catch {
case e: TimeoutException => Stream(Failure[T](e))
} }
retry -
sometime, you’ll deal with futures that may fail, and you’ll want to retry whatever task that created the future in the first place, with an optional delay between retries. here’s a simple way to do it:The old snippets had many flaws. please refer to the following blog post to compare the version given here, with the new improved version.travector - The following is a tweak of
Future.traverse
. In our code, we wanted to improve performance, and decided to use internally inVector
instead of the more general seq (defaults toList
).Vector
is better for cache locality, and perform better with concatenations or appending to the end. So, we had in many places methods that take aSeq[Something]
, and returnFuture[Seq[SomethingElse]]
. behind the scenes, it wasFuture.traverse
that did the work. And now, we replaced it with:travector
!
/**
* Transforms a `TraversableOnce[A]` into a `Future[Vector[B]]`
* using the provided function `A => Future[B]`.
* This is useful for performing a parallel map.
* For example, to apply a function to all items of a list in parallel:
*
* {{{
* val myFutureVector = Future.travector(myList)(x => Future(myFunc(x)))
* }}}
*
* This is the same as `Future.traverse`,
* but it will always yield a vector,
* regardless of the provided collection type.
*/
def travector[A, B, M[X] <: TraversableOnce[X]]
M[A])
(in: A => Future[B])
(fn: implicit ec: ExecutionContext): Future[Vector[B]] =
(foldLeft(Future.successful(Vector.newBuilder[B])) { (fr, a) =>
in.val fb = fn(a)
for (r <- fr; b <- fb) yield (r += b)
map(_.result()) }.
The code is pretty much copied from Future.traverse
(check & compare), but this simple little trick gained us some perf boost.