Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes made while integrating it with our internal system #922

Merged
merged 1 commit into from
Feb 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,72 @@
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func1;
import rx.plugins.DebugNotification;

public final class DebugSubscriber<T> extends Subscriber<T> {
public final class DebugSubscriber<T, C> extends Subscriber<T> {
private final Func1<T, T> onNextHook;
final Action1<DebugNotification> events;
final Observer<? super T> o;
Operator<? extends T, ?> from = null;
Operator<?, ? super T> to = null;
private final Func1<DebugNotification, C> start;
private final Action1<C> complete;
private final Action2<C, Throwable> error;
private final Observer<? super T> o;
private Operator<? extends T, ?> from = null;
private Operator<?, ? super T> to = null;

public DebugSubscriber(
Func1<T, T> onNextHook,
Action1<DebugNotification> _events,
Func1<DebugNotification, C> start,
Action1<C> complete,
Action2<C, Throwable> error,
Subscriber<? super T> _o,
Operator<? extends T, ?> _out,
Operator<?, ? super T> _in) {
super(_o);
this.events = _events;
this.start = start;
this.complete = complete;
this.error = error;
this.o = _o;
this.onNextHook = onNextHook;
this.from = _out;
this.to = _in;
this.add(new DebugSubscription<T>(this));
this.add(new DebugSubscription<T, C>(this, start, complete, error));
}

@Override
public void onCompleted() {
events.call(DebugNotification.createOnCompleted(o, from, to));
o.onCompleted();
final DebugNotification<T, C> n = DebugNotification.createOnCompleted(o, from, to);
C context = start.call(n);
try {
o.onCompleted();
complete.call(context);
} catch (Throwable e) {
error.call(context, e);
}
}

@Override
public void onError(Throwable e) {
events.call(DebugNotification.createOnError(o, from, e, to));
o.onError(e);
final DebugNotification<T, C> n = DebugNotification.createOnError(o, from, e, to);
C context = start.call(n);
try {
o.onError(e);
complete.call(context);
} catch (Throwable e2) {
error.call(context, e2);
}
}

@Override
public void onNext(T t) {
events.call(DebugNotification.createOnNext(o, from, t, to));
o.onNext(onNextHook.call(t));
final DebugNotification<T, C> n = DebugNotification.createOnNext(o, from, t, to);
C context = start.call(n);
try {
o.onNext(onNextHook.call(t));
complete.call(context);
} catch (Throwable e) {
error.call(context, e);
}
}

public Operator<? extends T, ?> getFrom() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
package rx.operators;

import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func1;
import rx.plugins.DebugNotification;

final class DebugSubscription<T> implements Subscription {
private final DebugSubscriber<T> debugObserver;
final class DebugSubscription<T, C> implements Subscription {
private final DebugSubscriber<T, C> debugObserver;
private final Func1<DebugNotification, C> start;
private final Action1<C> complete;
private final Action2<C, Throwable> error;

DebugSubscription(DebugSubscriber<T> debugObserver) {
DebugSubscription(DebugSubscriber<T, C> debugObserver, Func1<DebugNotification, C> start, Action1<C> complete, Action2<C, Throwable> error) {
this.debugObserver = debugObserver;
this.start = start;
this.complete = complete;
this.error = error;
}

@Override
public void unsubscribe() {
debugObserver.events.call(DebugNotification.<T> createUnsubscribe(debugObserver.o, debugObserver.from, debugObserver.to));
debugObserver.unsubscribe();
final DebugNotification<T, C> n = DebugNotification.<T, C> createUnsubscribe(debugObserver.getActual(), debugObserver.getFrom(), debugObserver.getTo());
C context = start.call(n);
try {
debugObserver.unsubscribe();
complete.call(context);
} catch (Throwable e) {
error.call(context, e);
}
}

@Override
Expand Down
57 changes: 40 additions & 17 deletions rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Actions;
import rx.functions.Func1;
import rx.functions.Functions;
Expand All @@ -17,9 +18,11 @@
*
* @author gscampbell
*/
public class DebugHook extends RxJavaObservableExecutionHook {
public class DebugHook<C> extends RxJavaObservableExecutionHook {
private final Func1 onNextHook;
private final Action1<DebugNotification> events;
private final Func1<DebugNotification, C> start;
private final Action1<C> complete;
private final Action2<C, Throwable> error;

/**
* Creates a new instance of the DebugHook RxJava plug-in that can be passed into
Expand All @@ -31,18 +34,26 @@ public class DebugHook extends RxJavaObservableExecutionHook {
* @param events
* This action is invoked as each notification is generated
*/
public DebugHook(Func1 onNextDataHook, Action1<DebugNotification> events) {
public DebugHook(Func1 onNextDataHook, Func1<DebugNotification, C> start, Action1<C> complete, Action2<C, Throwable> error) {
this.complete = complete;
this.error = error;
this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook;
this.events = events == null ? Actions.empty() : events;
this.start = (Func1<DebugNotification, C>) (start == null ? Actions.empty() : start);
}

@Override
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
public <T> OnSubscribe<T> onSubscribeStart(final Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
return new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> o) {
events.call(DebugNotification.createSubscribe(o, f));
f.call(wrapOutbound(null, o));
C context = start.call(DebugNotification.createSubscribe(o, observableInstance, f));
try {
f.call(wrapOutbound(null, o));
complete.call(context);
}
catch(Throwable e) {
error.call(context, e);
}
}
};
}
Expand All @@ -54,12 +65,7 @@ public <T> Subscription onSubscribeReturn(Observable<? extends T> observableInst

@Override
public <T> OnSubscribe<T> onCreate(final OnSubscribe<T> f) {
return new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> o) {
f.call(wrapInbound(null, o));
}
};
return new OnCreateWrapper<T>(f);
}

@Override
Expand All @@ -81,19 +87,36 @@ public <T> Subscription onAdd(Subscriber<T> subscriber, Subscription s) {
private <R> Subscriber<? super R> wrapOutbound(Operator<? extends R, ?> bind, Subscriber<? super R> o) {
if (o instanceof DebugSubscriber) {
if (bind != null)
((DebugSubscriber<R>) o).setFrom(bind);
((DebugSubscriber<R, C>) o).setFrom(bind);
return o;
}
return new DebugSubscriber<R>(onNextHook, events, o, bind, null);
return new DebugSubscriber<R, C>(onNextHook, start, complete, error, o, bind, null);
}

@SuppressWarnings("unchecked")
private <T> Subscriber<? super T> wrapInbound(Operator<?, ? super T> bind, Subscriber<? super T> o) {
if (o instanceof DebugSubscriber) {
if (bind != null)
((DebugSubscriber<T>) o).setTo(bind);
((DebugSubscriber<T, C>) o).setTo(bind);
return o;
}
return new DebugSubscriber<T>(onNextHook, events, o, null, bind);
return new DebugSubscriber<T, C>(onNextHook, start, complete, error, o, null, bind);
}

public final class OnCreateWrapper<T> implements OnSubscribe<T> {
private final OnSubscribe<T> f;

private OnCreateWrapper(OnSubscribe<T> f) {
this.f = f;
}

@Override
public void call(Subscriber<? super T> o) {
f.call(wrapInbound(null, o));
}

public OnSubscribe<T> getActual() {
return f;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,103 +1,119 @@
package rx.plugins;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Observer;
import rx.observers.SafeSubscriber;
import rx.operators.DebugSubscriber;

public class DebugNotification<T> {
public class DebugNotification<T, C> {
public static enum Kind {
OnNext, OnError, OnCompleted, Subscribe, Unsubscribe
}

private final OnSubscribe<T> source;
private final Observable<? extends T> source;
private final OnSubscribe<T> sourceFunc;
private final Operator<? extends T, ?> from;
private final Kind kind;
private final Notification<T> notification;
private final Operator<?, ? super T> to;
private final long nanoTime;
private final long threadId;
private Observer o;
private final Throwable throwable;
private final T value;
private final Observer observer;

public static <T> DebugNotification<T> createSubscribe(Observer<? super T> o, OnSubscribe<T> source) {
public static <T, C> DebugNotification<T, C> createSubscribe(Observer<? super T> o, Observable<? extends T> source, OnSubscribe<T> sourceFunc) {
Operator<?, ? super T> to = null;
Operator<? extends T, ?> from = null;
if (o instanceof SafeSubscriber) {
o = ((SafeSubscriber) o).getActual();
}
if (o instanceof DebugSubscriber) {
to = ((DebugSubscriber<T>) o).getTo();
from = ((DebugSubscriber<T>) o).getFrom();
to = ((DebugSubscriber<T, C>) o).getTo();
from = ((DebugSubscriber<T, C>) o).getFrom();
o = ((DebugSubscriber) o).getActual();
}
return new DebugNotification<T>(o, from, Kind.Subscribe, null, to, source);
if (sourceFunc instanceof DebugHook.OnCreateWrapper) {
sourceFunc = ((DebugHook.OnCreateWrapper) sourceFunc).getActual();
}
return new DebugNotification<T, C>(o, from, Kind.Subscribe, null, null, to, source, sourceFunc);
}

public static <T> DebugNotification<T> createOnNext(Observer<? super T> o, Operator<? extends T, ?> from, T t, Operator<?, ? super T> to) {
return new DebugNotification<T>(o, from, Kind.OnNext, Notification.createOnNext(t), to, null);
public static <T, C> DebugNotification<T, C> createOnNext(Observer<? super T> o, Operator<? extends T, ?> from, T t, Operator<?, ? super T> to) {
return new DebugNotification<T, C>(o, from, Kind.OnNext, t, null, to, null, null);
}

public static <T> DebugNotification<T> createOnError(Observer<? super T> o, Operator<? extends T, ?> from, Throwable e, Operator<?, ? super T> to) {
return new DebugNotification<T>(o, from, Kind.OnError, Notification.<T> createOnError(e), to, null);
public static <T, C> DebugNotification<T, C> createOnError(Observer<? super T> o, Operator<? extends T, ?> from, Throwable e, Operator<?, ? super T> to) {
return new DebugNotification<T, C>(o, from, Kind.OnError, null, e, to, null, null);
}

public static <T> DebugNotification<T> createOnCompleted(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
return new DebugNotification<T>(o, from, Kind.OnCompleted, Notification.<T> createOnCompleted(), to, null);
public static <T, C> DebugNotification<T, C> createOnCompleted(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
return new DebugNotification<T, C>(o, from, Kind.OnCompleted, null, null, to, null, null);
}

public static <T> DebugNotification<T> createUnsubscribe(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
return new DebugNotification<T>(o, from, Kind.Unsubscribe, null, to, null);
public static <T, C> DebugNotification<T, C> createUnsubscribe(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
return new DebugNotification<T, C>(o, from, Kind.Unsubscribe, null, null, to, null, null);
}

private DebugNotification(Observer o, Operator<? extends T, ?> from, Kind kind, Notification<T> notification, Operator<?, ? super T> to, OnSubscribe<T> source) {
this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
private DebugNotification(Observer o, Operator<? extends T, ?> from, Kind kind, T value, Throwable throwable, Operator<?, ? super T> to, Observable<? extends T> source, OnSubscribe<T> sourceFunc) {
this.observer = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
this.from = from;
this.kind = kind;
this.notification = notification;
this.value = value;
this.throwable = throwable;
this.to = to;
this.source = source;
this.nanoTime = System.nanoTime();
this.threadId = Thread.currentThread().getId();
this.sourceFunc = sourceFunc;
}

public Observer getObserver() {
return observer;
}

public Operator<? extends T, ?> getFrom() {
return from;
}

public Notification<T> getNotification() {
return notification;
public T getValue() {
return value;
}

public Operator<?, ? super T> getTo() {
return to;
public Throwable getThrowable() {
return throwable;
}

public long getNanoTime() {
return nanoTime;
}

public long getThreadId() {
return threadId;
public Operator<?, ? super T> getTo() {
return to;
}

public Kind getKind() {
return kind;
}

public Observable<? extends T> getSource() {
return source;
}

public OnSubscribe<T> getSourceFunc() {
return sourceFunc;
}

@Override
/**
* Does a very bad job of making JSON like string.
*/
public String toString() {
final StringBuilder s = new StringBuilder("{");
s.append(" \"nano\": ").append(nanoTime);
s.append(", \"thread\": ").append(threadId);
s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\"");
s.append("\"observer\": \"").append(observer.getClass().getName()).append("@").append(Integer.toHexString(observer.hashCode())).append("\"");
s.append(", \"type\": \"").append(kind).append("\"");
if (notification != null) {
if (notification.hasValue())
s.append(", \"value\": \"").append(notification.getValue()).append("\"");
if (notification.hasThrowable())
s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
}
if (kind == Kind.OnNext)
// not json safe
s.append(", \"value\": \"").append(value).append("\"");
if (kind == Kind.OnError)
s.append(", \"exception\": \"").append(throwable.getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
if (source != null)
s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\"");
if (sourceFunc != null)
s.append(", \"sourceFunc\": \"").append(sourceFunc.getClass().getName()).append("@").append(Integer.toHexString(sourceFunc.hashCode())).append("\"");
if (from != null)
s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\"");
if (to != null)
Expand Down
Loading