Skip to content

Commit

Permalink
Materialize/Dematerialize
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 26, 2023
1 parent 8aed3fc commit a05145a
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 1 deletion.
87 changes: 87 additions & 0 deletions src/R3/Operators/Materialize.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<Notification<T>> Materialize<T>(this Observable<T> source)
{
return new Materialize<T>(source);
}

public static Observable<T> Dematerialize<T>(this Observable<Notification<T>> source)
{
return new Dematerialize<T>(source);
}
}

internal sealed class Materialize<T>(Observable<T> source) : Observable<Notification<T>>
{
protected override IDisposable SubscribeCore(Observer<Notification<T>> observer)
{
return source.Subscribe(new _Materialize(observer));
}

sealed class _Materialize(Observer<Notification<T>> observer) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(new(value));
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnNext(new(error));
}

protected override void OnCompletedCore(Result result)
{
try
{
observer.OnNext(new(result));
}
finally
{
observer.OnCompleted();
}
}
}
}

internal sealed class Dematerialize<T>(Observable<Notification<T>> source) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _Dematerialize(observer));
}

sealed class _Dematerialize(Observer<T> observer) : Observer<Notification<T>>
{
protected override void OnNextCore(Notification<T> value)
{
switch (value.Kind)
{
case NotificationKind.OnNext:
observer.OnNext(value.Value!);
break;
case NotificationKind.OnErrorResume:
OnErrorResume(value.Error!);
break;
case NotificationKind.OnCompleted:
OnCompleted(value.Result!.Value);
break;
default:
break;
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}
}
}

1 change: 0 additions & 1 deletion src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,4 @@ public static partial class ObservableExtensions
// return tasks:
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup,


}
42 changes: 42 additions & 0 deletions tests/R3.Tests/OperatorTests/MaterializeTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace R3.Tests.OperatorTests;

public class MaterializeTest
{
[Fact]
public void Materialize()
{
var publisher = new Subject<int>();
var list = publisher.Materialize().ToLiveList();

publisher.OnNext(10);
publisher.OnNext(20);
publisher.OnNext(30);
publisher.OnErrorResume(new Exception("foo"));
publisher.OnCompleted(new Exception("comp"));

list[0].Value.Should().Be(10);
list[1].Value.Should().Be(20);
list[2].Value.Should().Be(30);
list[3].Error!.Message.Should().Be("foo");
list[4].Result!.Value.Exception!.Message.Should().Be("comp");

list.AssertIsCompleted();
}

[Fact]
public void Dematerialize()
{
var publisher = new Subject<int>();
var list = publisher.Materialize().Dematerialize().ToLiveList();

publisher.OnNext(10);
publisher.OnNext(20);
publisher.OnNext(30);
publisher.OnErrorResume(new Exception("foo"));
publisher.OnCompleted(new Exception("comp"));

list.AssertEqual([10, 20, 30]);
list.CompletedValue.IsFailure.Should().BeTrue();
list.AssertIsCompleted();
}
}

0 comments on commit a05145a

Please sign in to comment.