Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout capability to Future implementation of FetchMonadError #127

Merged
merged 14 commits into from
Aug 15, 2017
89 changes: 89 additions & 0 deletions jvm/src/test/scala/FutureTimeoutTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2016-2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch

import scala.concurrent._
import scala.concurrent.duration._
import org.scalatest._
import cats.data.NonEmptyList
import fetch.implicits._

// Note that this test cannot run on Scala.js

class FutureTimeoutTests
extends AsyncFlatSpec
with Matchers
with OptionValues
with Inside
with Inspectors {

implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global

case class ArticleId(id: Int)
case class Article(id: Int, content: String)

def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] =
Fetch(ArticleId(id))

// A sample datasource with configurable delay and timeout

case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration)
extends DataSource[ArticleId, Article] {
override def name = "ArticleFuture"
override def fetchOne(id: ArticleId): Query[Option[Article]] =
Query.async((ok, fail) => {
Thread.sleep(delay.toMillis)
ok(Option(Article(id.id, "An article with id " + id.id)))
}, timeout)
override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] =
batchingNotSupported(ids)
}

"FetchMonadError[Future]" should "fail with timeout when a datasource does not complete in time" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

recoverToSucceededIf[TimeoutException] {
fut

}
}

it should "not fail with timeout when a datasource does complete in time" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}

it should "not fail with timeout when infinite timeout specified" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Inf, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}

}
55 changes: 45 additions & 10 deletions shared/src/main/scala/implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,64 @@

package fetch

import cats.MonadError
import java.util.{Timer, TimerTask}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import cats.instances.future._
import scala.concurrent.{ExecutionContext, Future, Promise}

object implicits {

implicit def fetchFutureFetchMonadError(
implicit ec: ExecutionContext
): FetchMonadError[Future] =
// Shared Timer object to schedule timeouts
private[fetch] lazy val timer: Timer = new Timer("fetch-future-timeout-daemon", true)

implicit def fetchFutureFetchMonadError(implicit ec: ExecutionContext): FetchMonadError[Future] =
new FetchMonadError.FromMonadError[Future] {
override def runQuery[A](j: Query[A]): Future[A] = j match {
case Sync(e) => Future(e.value)
case Async(ac, timeout) => {

case Sync(e) =>
Future({e.value})

case Async(ac, timeout) =>

val p = Promise[A]()

ec.execute(new Runnable {
def run() = ac(p.trySuccess _, p.tryFailure _)
})
val runnable = new Runnable {
def run() : Unit = ac(p.trySuccess, p.tryFailure)
}

timeout match {

// Handle the case where there is a finite timeout requested
case finite: FiniteDuration =>

// Timer task that completes the future when the timeout occurs
// if it didn't complete already
val timerTask = new TimerTask() {
def run() : Unit = {
p.tryFailure(new TimeoutException())
}
}

// Start the timeout Timer
timer.schedule(timerTask, timeout.toMillis)

// Execute the user's action
ec.execute(runnable)

// No timeout
case _ =>

// Execute the user's action
ec.execute(runnable)
}

p.future
}

case Ap(qf, qx) =>
runQuery(qf).zip(runQuery(qx)).map { case (f, x) => f(x) }
}


}
}