Skip to content

Commit

Permalink
Merge pull request #1188 from zsxwing/issue1187
Browse files Browse the repository at this point in the history
Fix issue #1187 and #1196
  • Loading branch information
benjchristensen committed May 16, 2014
2 parents 8faa8cb + a26b21d commit e53b1cd
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package rx.lang.scala.examples

import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -790,4 +792,48 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList)
}

@Test def schedulerExample1(): Unit = {
val latch = new CountDownLatch(1)
val worker = IOScheduler().createWorker
worker.schedule {
println("Hello from Scheduler")
latch.countDown()
}
latch.await(5, TimeUnit.SECONDS)
}

@Test def schedulerExample2(): Unit = {
val latch = new CountDownLatch(1)
val worker = IOScheduler().createWorker
worker.schedule(1 seconds) {
println("Hello from Scheduler after 1 second")
latch.countDown()
}
latch.await(5, TimeUnit.SECONDS)
}

@Test def schedulerExample3(): Unit = {
val worker = IOScheduler().createWorker
var no = 1
val subscription = worker.schedulePeriodically(initialDelay = 1 seconds, period = 100 millis) {
println(s"Hello(${no}) from Scheduler")
no += 1
}
TimeUnit.SECONDS.sleep(2)
subscription.unsubscribe()
}

@Test def schedulerExample4(): Unit = {
val worker = IOScheduler().createWorker
var no = 1
def hello: Unit = {
println(s"Hello(${no}) from Scheduler")
no += 1
worker.schedule(100 millis)(hello)
}
val subscription = worker.schedule(1 seconds)(hello)
TimeUnit.SECONDS.sleep(2)
subscription.unsubscribe()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,26 @@ trait Scheduler {
/**
* Parallelism available to a Scheduler.
*
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
* This defaults to `Runtime.getRuntime().availableProcessors()` but can be overridden for use cases such as scheduling work on a computer cluster.
*
* @return the scheduler's available degree of parallelism.
*/
def degreeOfParallelism: Int = asJavaScheduler.degreeOfParallelism
def parallelism: Int = asJavaScheduler.parallelism()

/**
* @return the scheduler's notion of current absolute time in milliseconds.
*/
def now: Long = this.asJavaScheduler.now()

/**
* Retrieve or create a new [[rx.lang.scala.Worker]] that represents serial execution of actions.
* <p>
* When work is completed it should be unsubscribed using [[rx.lang.scala.Worker unsubscribe]].
* <p>
* Work on a [[rx.lang.scala.Worker]] is guaranteed to be sequential.
*
* @return Inner representing a serial queue of actions to be executed
*/
def createWorker: Worker = this.asJavaScheduler.createWorker()

}
Expand All @@ -54,24 +63,55 @@ trait Worker extends Subscription {
private [scala] val asJavaWorker: rx.Scheduler.Worker

/**
* Schedules a cancelable action to be executed in delayTime.
* Schedules an Action for execution at some point in the future.
*
* @param action the Action to schedule
* @param delay time to wait before executing the action
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: Unit => Unit, delayTime: Duration): Subscription =
def schedule(delay: Duration)(action: => Unit): Subscription = {
this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action()
override def call(): Unit = action
},
delayTime.length,
delayTime.unit)
delay.length,
delay.unit)
}

/**
* Schedules an Action for execution.
*
* @param action the Action to schedule
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: => Unit): Subscription = {
this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action
}
)
}

/**
* Schedules a cancelable action to be executed immediately.
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param action the Action to execute periodically
* @param initialDelay time to wait before executing the action for the first time
* @param period the time interval to wait each time in between executing the action
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: Unit => Unit): Subscription = this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action()
}
)
def schedulePeriodically(initialDelay: Duration, period: Duration)(action: => Unit): Subscription = {
this.asJavaWorker.schedulePeriodically(
new Action0 {
override def call(): Unit = action
},
initialDelay.toNanos,
period.toNanos,
duration.NANOSECONDS
)
}

/**
* @return the scheduler's notion of current absolute time in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import rx.lang.scala.Scheduler

object ComputationScheduler {
/**
* {@link Scheduler} intended for computational work.
* [[rx.lang.scala.Scheduler]] intended for computational work.
* <p>
* This can be used for event-loops, processing callbacks and other computational work.
* <p>
* Do not perform IO-bound work on this scheduler. Use {@link IOScheduler()} instead.
* Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.schedulers.IOScheduler]] instead.
*
* @return { @link Scheduler} for computation-bound work.
* @return [[rx.lang.scala.Scheduler]] for computation-bound work.
*/
def apply(): ComputationScheduler = {
new ComputationScheduler(rx.schedulers.Schedulers.computation())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import rx.lang.scala.Scheduler

object IOScheduler {
/**
* {@link Scheduler} intended for IO-bound work.
* [[rx.lang.scala.Scheduler]] intended for IO-bound work.
* <p>
* The implementation is backed by an {@link Executor} thread-pool that will grow as needed.
* The implementation is backed by an `Executor` thread-pool that will grow as needed.
* <p>
* This can be used for asynchronously performing blocking IO.
* <p>
* Do not perform computational work on this scheduler. Use {@link ComputationScheduler()} instead.
* Do not perform computational work on this scheduler. Use [[rx.lang.scala.schedulers.ComputationScheduler]] instead.
*
* @return { @link ExecutorScheduler} for IO-bound work.
* @return [[rx.lang.scala.Scheduler]] for IO-bound work
*/
def apply(): IOScheduler = {
new IOScheduler(rx.schedulers.Schedulers.io)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import rx.lang.scala.Scheduler
object NewThreadScheduler {

/**
* Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work.
* Returns a [[rx.lang.scala.Scheduler]] that creates a new `java.lang.Thread` for each unit of work.
*/
def apply(): NewThreadScheduler = {
new NewThreadScheduler(rx.schedulers.Schedulers.newThread())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import rx.lang.scala.Scheduler

object TrampolineScheduler {
/**
* {@link Scheduler} that queues work on the current thread to be executed after the current work completes.
* [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes.
*/
def apply(): TrampolineScheduler = {
new TrampolineScheduler(rx.schedulers.Schedulers.trampoline())
Expand Down

0 comments on commit e53b1cd

Please sign in to comment.