I need to consume a REST service using Akka's HTTP client (v2.0.2). The logical approach is to do this via a host connection pool because we expect large numbers of simultaneous connections. The Flow for this consumes a (HttpRequest, T) and returns a (Try[HttpResponse, T). The documentation indicates that some arbitrary type T is needed to manage potential out of order responses to requests but does not point out what the caller is supposed to do with the returned T.
My first attempt is the function below using an Int as T. It is called from many places to ensure that the connections use a single pool.
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
case (Failure(f), `unique`) ⇒ Future.failed(f)
case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
}
}
The question is how should the client use this T? Is there a cleaner more efficient solution? And finally, Is my paranoia that something may arrive out of order not actually paranoia?