Skip to content

Move re-used internal Scheduler classes to their own package #1296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 30, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.schedulers;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.*;

public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor;
volatile boolean isUnsubscribed;

/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
executor = Executors.newScheduledThreadPool(1, threadFactory);
}

@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.empty();
}
return scheduleActual(action, delayTime, unit);
}

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
ScheduledAction run = new ScheduledAction(action);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(Subscriptions.from(f));

return run;
}

@Override
public void unsubscribe() {
isUnsubscribed = true;
executor.shutdownNow();
}

@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}
}
100 changes: 100 additions & 0 deletions rxjava-core/src/main/java/rx/internal/schedulers/ScheduledAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.schedulers;

import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.CompositeSubscription;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
* A runnable that executes an Action0 and can be cancelled
* The analogue is the Subscriber in respect of an Observer.
*/
public final class ScheduledAction implements Runnable, Subscription {
final CompositeSubscription cancel;
final Action0 action;
volatile int once;
static final AtomicIntegerFieldUpdater<ScheduledAction> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "once");

public ScheduledAction(Action0 action) {
this.action = action;
this.cancel = new CompositeSubscription();
}

@Override
public void run() {
try {
action.call();
} finally {
unsubscribe();
}
}

@Override
public boolean isUnsubscribed() {
return cancel.isUnsubscribed();
}

@Override
public void unsubscribe() {
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
cancel.unsubscribe();
}
}

public void add(Subscription s) {
cancel.add(s);
}

/**
* Adds a parent to this ScheduledAction so when it is
* cancelled or terminates, it can remove itself from this parent.
* @param parent
*/
public void addParent(CompositeSubscription parent) {
cancel.add(new Remover(this, parent));
}

/** Remove a child subscription from a composite when unsubscribing. */
private static final class Remover implements Subscription {
final Subscription s;
final CompositeSubscription parent;
volatile int once;
static final AtomicIntegerFieldUpdater<Remover> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once");

public Remover(Subscription s, CompositeSubscription parent) {
this.s = s;
this.parent = parent;
}

@Override
public boolean isUnsubscribed() {
return s.isUnsubscribed();
}

@Override
public void unsubscribe() {
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
parent.remove(s);
}
}

}
}
37 changes: 37 additions & 0 deletions rxjava-core/src/main/java/rx/internal/util/RxThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.util;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public final class RxThreadFactory implements ThreadFactory {
final String prefix;
volatile long counter;
static final AtomicLongFieldUpdater<RxThreadFactory> COUNTER_UPDATER
= AtomicLongFieldUpdater.newUpdater(RxThreadFactory.class, "counter");

public RxThreadFactory(String prefix) {
this.prefix = prefix;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + COUNTER_UPDATER.incrementAndGet(this));
t.setDaemon(true);
return t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.schedulers.ScheduledAction;
import rx.internal.util.RxThreadFactory;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

Expand All @@ -27,12 +30,12 @@

/* package */final class CachedThreadScheduler extends Scheduler {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
private static final NewThreadScheduler.RxThreadFactory WORKER_THREAD_FACTORY =
new NewThreadScheduler.RxThreadFactory(WORKER_THREAD_NAME_PREFIX);
private static final RxThreadFactory WORKER_THREAD_FACTORY =
new RxThreadFactory(WORKER_THREAD_NAME_PREFIX);

private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
private static final NewThreadScheduler.RxThreadFactory EVICTOR_THREAD_FACTORY =
new NewThreadScheduler.RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);
private static final RxThreadFactory EVICTOR_THREAD_FACTORY =
new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);

private static final class CachedWorkerPool {
private final long keepAliveTime;
Expand Down Expand Up @@ -143,14 +146,14 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
return Subscriptions.empty();
}

NewThreadScheduler.NewThreadWorker.ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}

private static final class ThreadWorker extends NewThreadScheduler.NewThreadWorker {
private static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;

ThreadWorker(ThreadFactory threadFactory) {
Expand Down
13 changes: 7 additions & 6 deletions rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
*/
package rx.schedulers;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.schedulers.NewThreadScheduler.NewThreadWorker.ScheduledAction;
import rx.schedulers.NewThreadScheduler.RxThreadFactory;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.schedulers.ScheduledAction;
import rx.internal.util.RxThreadFactory;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/* package */class EventLoopsScheduler extends Scheduler {
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
Expand Down Expand Up @@ -104,7 +105,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
}
}

private static final class PoolWorker extends NewThreadScheduler.NewThreadWorker {
private static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
*/
package rx.schedulers;

import rx.Scheduler;
import rx.internal.util.RxThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import rx.Scheduler;

/**
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
* <p>
Expand All @@ -34,7 +35,7 @@
/* package */final class GenericScheduledExecutorService {

private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
private static final NewThreadScheduler.RxThreadFactory THREAD_FACTORY = new NewThreadScheduler.RxThreadFactory(THREAD_NAME_PREFIX);
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

private final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();
private final ScheduledExecutorService executor;
Expand Down
Loading