Skip to content

Commit

Permalink
Expose AnonymousObserver
Browse files Browse the repository at this point in the history
In certain scenarios, it might be useful to be able to create the anonymous observer outside the Subscribe overloads.

For example, the official guidance on DiagnosticSource filtering recommends creating an anonymous observer to combine with the predicate to filter events.

There may be other scenarios too where not having to manually create an IObserver<T> could be useful.

Fixes #8
  • Loading branch information
kzu committed Sep 27, 2021
1 parent f425c90 commit 3583300
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 68 deletions.
9 changes: 1 addition & 8 deletions src/RxFree/RxFree.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@ IObservable&lt;T&gt;.Select/Where/OfType LINQ operators, and others.
<ItemGroup>
<Compile Remove="build\NotNull\*.cs" Condition="'$(Nullable)' != 'enable'" />
<Compile Remove="build\Legacy\*.cs" Condition="'$(Nullable)' == 'enable'" />
<Compile Remove="build\Nullable\CompositeDisposable.cs" />
<Compile Remove="build\Nullable\Disposable.cs" />
<Compile Remove="build\Nullable\ObservableExtensions.cs" />
<Compile Remove="build\Nullable\ObservableExtensions.OfType.cs" />
<Compile Remove="build\Nullable\ObservableExtensions.Select.cs" />
<Compile Remove="build\Nullable\ObservableExtensions.Where.cs" />
<Compile Remove="build\Nullable\StableCompositeDisposable.cs" />
<Compile Remove="build\Nullable\Subject.cs" />
<Compile Remove="build\Nullable\*.cs" />
<None Include="build\**\*.cs" CopyToOutputDirectory="PreserveNewest" PackagePath="build\%(RecursiveDir)%(Filename)%(Extension)" Pack="true" />
<None Update="build\RxFree.props" CopyToOutputDirectory="PreserveNewest" PackagePath="build\%(Filename)%(Extension)" Pack="true" />
<None Update="build\RxFree.targets" CopyToOutputDirectory="PreserveNewest" PackagePath="build\%(Filename)%(Extension)" Pack="true" />
Expand Down
65 changes: 65 additions & 0 deletions src/RxFree/build/Legacy/AnonymousObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System.Runtime.ExceptionServices;

namespace System
{
/// <summary>
/// Create an <see cref="IObserver{T}"/> instance from delegate-based implementations
/// of the On* methods.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
partial class AnonymousObserver<T> : IObserver<T>
{
static readonly Action<Exception> rethrow = e => ExceptionDispatchInfo.Capture(e).Throw();
static readonly Action nop = () => { };

readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;

/// <summary>
/// Creates the observable providing just the <paramref name="onNext"/> action.
/// </summary>
public AnonymousObserver(Action<T> onNext)
: this(onNext, rethrow, nop) { }

/// <summary>
/// Creates the observable providing both the <paramref name="onNext"/> and
/// <paramref name="onError"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action<Exception> onError)
: this(onNext, onError, nop) { }

/// <summary>
/// Creates the observable providing both the <paramref name="onNext"/> and
/// <paramref name="onCompleted"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action onCompleted)
: this(onNext, rethrow, onCompleted) { }

/// <summary>
/// Creates the observable providing all three <paramref name="onNext"/>,
/// <paramref name="onError"/> and <paramref name="onCompleted"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnCompleted()"/>.
/// </summary>
public void OnCompleted() => onCompleted();

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnError(Exception)"/>.
/// </summary>
public void OnError(Exception error) => onError(error);

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnNext(T)"/>.
/// </summary>
public void OnNext(T value) => onNext(value);
}
}
20 changes: 0 additions & 20 deletions src/RxFree/build/Legacy/ObservableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,5 @@ public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onN
/// </summary>
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
=> source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));

class AnonymousObserver<T> : IObserver<T>
{
readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;

public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

public void OnCompleted() => onCompleted();

public void OnError(Exception error) => onError(error);

public void OnNext(T value) => onNext(value);
}
}
}
65 changes: 65 additions & 0 deletions src/RxFree/build/NotNull/AnonymousObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System.Runtime.ExceptionServices;

namespace System
{
/// <summary>
/// Create an <see cref="IObserver{T}"/> instance from delegate-based implementations
/// of the On* methods.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
partial class AnonymousObserver<T> : IObserver<T>
{
static readonly Action<Exception> rethrow = e => ExceptionDispatchInfo.Capture(e).Throw();
static readonly Action nop = () => { };

readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;

/// <summary>
/// Creates the observable providing just the <paramref name="onNext"/> action.
/// </summary>
public AnonymousObserver(Action<T> onNext)
: this(onNext, rethrow, nop) { }

/// <summary>
/// Creates the observable providing both the <paramref name="onNext"/> and
/// <paramref name="onError"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action<Exception> onError)
: this(onNext, onError, nop) { }

/// <summary>
/// Creates the observable providing both the <paramref name="onNext"/> and
/// <paramref name="onCompleted"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action onCompleted)
: this(onNext, rethrow, onCompleted) { }

/// <summary>
/// Creates the observable providing all three <paramref name="onNext"/>,
/// <paramref name="onError"/> and <paramref name="onCompleted"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnCompleted()"/>.
/// </summary>
public void OnCompleted() => onCompleted();

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnError(Exception)"/>.
/// </summary>
public void OnError(Exception error) => onError(error);

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnNext(T)"/>.
/// </summary>
public void OnNext(T value) => onNext(value);
}
}
20 changes: 0 additions & 20 deletions src/RxFree/build/NotNull/ObservableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,5 @@ public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onN
/// </summary>
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
=> source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));

class AnonymousObserver<T> : IObserver<T>
{
readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;

public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

public void OnCompleted() => onCompleted();

public void OnError(Exception error) => onError(error);

public void OnNext(T value) => onNext(value);
}
}
}
65 changes: 65 additions & 0 deletions src/RxFree/build/Nullable/AnonymousObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System.Runtime.ExceptionServices;

namespace System
{
/// <summary>
/// Create an <see cref="IObserver{T}"/> instance from delegate-based implementations
/// of the On* methods.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
partial class AnonymousObserver<T> : IObserver<T>
{
static readonly Action<Exception> rethrow = e => ExceptionDispatchInfo.Capture(e).Throw();
static readonly Action nop = () => { };

readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;

/// <summary>
/// Creates the observable providing just the <paramref name="onNext"/> action.
/// </summary>
public AnonymousObserver(Action<T> onNext)
: this(onNext, rethrow, nop) { }

/// <summary>
/// Creates the observable providing both the <paramref name="onNext"/> and
/// <paramref name="onError"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action<Exception> onError)
: this(onNext, onError, nop) { }

/// <summary>
/// Creates the observable providing both the <paramref name="onNext"/> and
/// <paramref name="onCompleted"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action onCompleted)
: this(onNext, rethrow, onCompleted) { }

/// <summary>
/// Creates the observable providing all three <paramref name="onNext"/>,
/// <paramref name="onError"/> and <paramref name="onCompleted"/> actions.
/// </summary>
public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnCompleted()"/>.
/// </summary>
public void OnCompleted() => onCompleted();

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnError(Exception)"/>.
/// </summary>
public void OnError(Exception error) => onError(error);

/// <summary>
/// Calls the action implementing <see cref="IObserver{T}.OnNext(T)"/>.
/// </summary>
public void OnNext(T value) => onNext(value);
}
}
20 changes: 0 additions & 20 deletions src/RxFree/build/Nullable/ObservableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,5 @@ public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onN
/// </summary>
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
=> source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));

class AnonymousObserver<T> : IObserver<T>
{
readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;

public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

public void OnCompleted() => onCompleted();

public void OnError(Exception error) => onError(error);

public void OnNext(T value) => onNext(value);
}
}
}

0 comments on commit 3583300

Please sign in to comment.