Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #1187 and #1196 #1188

Merged
merged 3 commits into from
May 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package rx.lang.scala.examples

import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -790,4 +792,48 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList)
}

@Test def schedulerExample1(): Unit = {
val latch = new CountDownLatch(1)
val worker = IOScheduler().createWorker
worker.schedule {
println("Hello from Scheduler")
latch.countDown()
}
latch.await(5, TimeUnit.SECONDS)
}

@Test def schedulerExample2(): Unit = {
val latch = new CountDownLatch(1)
val worker = IOScheduler().createWorker
worker.schedule(1 seconds) {
println("Hello from Scheduler after 1 second")
latch.countDown()
}
latch.await(5, TimeUnit.SECONDS)
}

@Test def schedulerExample3(): Unit = {
val worker = IOScheduler().createWorker
var no = 1
val subscription = worker.schedulePeriodically(initialDelay = 1 seconds, period = 100 millis) {
println(s"Hello(${no}) from Scheduler")
no += 1
}
TimeUnit.SECONDS.sleep(2)
subscription.unsubscribe()
}

@Test def schedulerExample4(): Unit = {
val worker = IOScheduler().createWorker
var no = 1
def hello: Unit = {
println(s"Hello(${no}) from Scheduler")
no += 1
worker.schedule(100 millis)(hello)
}
val subscription = worker.schedule(1 seconds)(hello)
TimeUnit.SECONDS.sleep(2)
subscription.unsubscribe()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,26 @@ trait Scheduler {
/**
* Parallelism available to a Scheduler.
*
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
* This defaults to `Runtime.getRuntime().availableProcessors()` but can be overridden for use cases such as scheduling work on a computer cluster.
*
* @return the scheduler's available degree of parallelism.
*/
def degreeOfParallelism: Int = asJavaScheduler.degreeOfParallelism
def parallelism: Int = asJavaScheduler.parallelism()

/**
* @return the scheduler's notion of current absolute time in milliseconds.
*/
def now: Long = this.asJavaScheduler.now()

/**
* Retrieve or create a new [[rx.lang.scala.Worker]] that represents serial execution of actions.
* <p>
* When work is completed it should be unsubscribed using [[rx.lang.scala.Worker unsubscribe]].
* <p>
* Work on a [[rx.lang.scala.Worker]] is guaranteed to be sequential.
*
* @return Inner representing a serial queue of actions to be executed
*/
def createWorker: Worker = this.asJavaScheduler.createWorker()

}
Expand All @@ -54,24 +63,55 @@ trait Worker extends Subscription {
private [scala] val asJavaWorker: rx.Scheduler.Worker

/**
* Schedules a cancelable action to be executed in delayTime.
* Schedules an Action for execution at some point in the future.
*
* @param action the Action to schedule
* @param delay time to wait before executing the action
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: Unit => Unit, delayTime: Duration): Subscription =
def schedule(delay: Duration)(action: => Unit): Subscription = {
this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action()
override def call(): Unit = action
},
delayTime.length,
delayTime.unit)
delay.length,
delay.unit)
}

/**
* Schedules an Action for execution.
*
* @param action the Action to schedule
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: => Unit): Subscription = {
this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action
}
)
}

/**
* Schedules a cancelable action to be executed immediately.
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param action the Action to execute periodically
* @param initialDelay time to wait before executing the action for the first time
* @param period the time interval to wait each time in between executing the action
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: Unit => Unit): Subscription = this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action()
}
)
def schedulePeriodically(initialDelay: Duration, period: Duration)(action: => Unit): Subscription = {
this.asJavaWorker.schedulePeriodically(
new Action0 {
override def call(): Unit = action
},
initialDelay.toNanos,
period.toNanos,
duration.NANOSECONDS
)
}

/**
* @return the scheduler's notion of current absolute time in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import rx.lang.scala.Scheduler

object ComputationScheduler {
/**
* {@link Scheduler} intended for computational work.
* [[rx.lang.scala.Scheduler]] intended for computational work.
* <p>
* This can be used for event-loops, processing callbacks and other computational work.
* <p>
* Do not perform IO-bound work on this scheduler. Use {@link IOScheduler()} instead.
* Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.schedulers.IOScheduler]] instead.
*
* @return { @link Scheduler} for computation-bound work.
* @return [[rx.lang.scala.Scheduler]] for computation-bound work.
*/
def apply(): ComputationScheduler = {
new ComputationScheduler(rx.schedulers.Schedulers.computation())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import rx.lang.scala.Scheduler

object IOScheduler {
/**
* {@link Scheduler} intended for IO-bound work.
* [[rx.lang.scala.Scheduler]] intended for IO-bound work.
* <p>
* The implementation is backed by an {@link Executor} thread-pool that will grow as needed.
* The implementation is backed by an `Executor` thread-pool that will grow as needed.
* <p>
* This can be used for asynchronously performing blocking IO.
* <p>
* Do not perform computational work on this scheduler. Use {@link ComputationScheduler()} instead.
* Do not perform computational work on this scheduler. Use [[rx.lang.scala.schedulers.ComputationScheduler]] instead.
*
* @return { @link ExecutorScheduler} for IO-bound work.
* @return [[rx.lang.scala.Scheduler]] for IO-bound work
*/
def apply(): IOScheduler = {
new IOScheduler(rx.schedulers.Schedulers.io)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import rx.lang.scala.Scheduler
object NewThreadScheduler {

/**
* Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work.
* Returns a [[rx.lang.scala.Scheduler]] that creates a new `java.lang.Thread` for each unit of work.
*/
def apply(): NewThreadScheduler = {
new NewThreadScheduler(rx.schedulers.Schedulers.newThread())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import rx.lang.scala.Scheduler

object TrampolineScheduler {
/**
* {@link Scheduler} that queues work on the current thread to be executed after the current work completes.
* [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes.
*/
def apply(): TrampolineScheduler = {
new TrampolineScheduler(rx.schedulers.Schedulers.trampoline())
Expand Down