Skip to content

Commit

Permalink
implementing Multicast
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 27, 2023
1 parent f2f2e33 commit cf8b5e7
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 55 deletions.
48 changes: 8 additions & 40 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,56 +28,24 @@

// Enumerable.Empty<int>().ElementAtOrDefault(

var range = System.Reactive.Linq.Observable.Range(1, 10);
var publisher = new System.Reactive.Subjects.Subject<int>();

var connectable = publisher.Multicast(new System.Reactive.Subjects.Subject<int>());

Enumerable.Range(1, 10).Cast<int>();

connectable.Subscribe(x => Console.WriteLine(x));

var a = range.Publish();
var d= connectable.Connect();


// BehaviourSubject -> ReactiveProperty
// ReplaySubject
publisher.OnNext(100);

d.Dispose();


//var d2 = connectable.Connect();

a.Connect();


// range.Catch(
// range.Append(

// Enumerable.Range(1,10).Min(
// range.SelectMany(

// range.TakeLast(


// var publisher = new R3.Subject<int>();
//publisher.PublishOnNext(1);

// publisher.Subscribe(new object(), (x,y) => y

//var xs = await publisher.Take(TimeSpan.FromSeconds(5));


foreach (var item in Enumerable.Range(1, 10).TakeWhile(x => x <= 3))
{
Console.WriteLine(item);
}

var repeat = System.Reactive.Linq.Observable.Repeat("foo", 10);

// System.Reactive.Linq.Observable.Append(

// repeat.TakeWhile(

// System.Reactive.Linq.Observable.FromEvent(

var rp = new ReactiveProperty<int>(999);
rp.Value += 10;
publisher.OnNext(200);

public static class Extensions
{
Expand Down
37 changes: 37 additions & 0 deletions src/R3/ISubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
namespace R3;

// NOTE: implement type must inherit from Observable<T>
public interface ISubject<T>
{
// Observable
IDisposable Subscribe(Observer<T> observer);

// Observer
void OnNext(T value);
void OnErrorResume(Exception error);
void OnCompleted(Result complete);

// Conversion
public Observer<T> AsObserver()
{
return new SubjectToObserver<T>(this);
}
}

internal sealed class SubjectToObserver<T>(ISubject<T> subject) : Observer<T>
{
protected override void OnNextCore(T value)
{
subject.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
subject.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
subject.OnCompleted(result);
}
}
88 changes: 80 additions & 8 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@

using System.Runtime.CompilerServices;
using System.Threading.Channels;

namespace R3;
namespace R3;

public static partial class ObservableExtensions
{
Expand All @@ -12,18 +8,20 @@ public static partial class ObservableExtensions
// AsUnitObservable

// Time based
// Debounce, Throttle, ThrottleFirst, Sample, Delay, DelaySubscription
// Debounce, Throttle, ThrottleFirst, Sample, Delay, DelaySubscription, Timeout
// + frame variation

// TImeInterval <-> FrameInterval

// Buffer + BUfferFrame => Chunk, ChunkFrame

// SubscribeOn, Synchronize

// Rx Merging:
// CombineLatest, Zip, WithLatestFrom, ZipLatest, Switch

// Standard Query:
// Distinct, DistinctBy, DistinctUntilChanged, Scan
// Distinct, DistinctBy, DistinctUntilChanged, Scan, DefaultIfEmpty

// SkipTake:
// Skip, SkipLast, SkipUntil, SkipWhile
Expand All @@ -32,5 +30,79 @@ public static partial class ObservableExtensions
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup,

// Multicast
// Multicast, Publish, Replay, RefCount, Share(Publish().RefCount()), AutoConnect
// Multicast, Publish, Replay, RefCount, Share(Publish().RefCount())

public static ConnectableObservable<T> Multicast<T>(this Observable<T> source, ISubject<T> subject)
{
return new Multicast<T>(source, subject);
}

public static Observable<T> Publish<T>(this Observable<T> source)
{
return source.Multicast(new Subject<T>());
}

public static Observable<T> Publish<T>(this Observable<T> source, T initialValue)
{
return source.Multicast(new ReactiveProperty<T>(initialValue));
}

// TODO: ReplaySubject
//public static Observable<T> Replay<T>(this Observable<T> source)
//{
// return source.Multicast(new ReplaySubject<T>());
//}

// TODO: require RefCount
//public static Observable<T> Share<T>(this Observable<T> source)
//{
// return source.Publish().RefCount();
//}
}


public abstract class ConnectableObservable<T> : Observable<T>
{
public abstract IDisposable Connect();
}

internal sealed class Multicast<T>(Observable<T> source, ISubject<T> subject) : ConnectableObservable<T>
{
readonly object gate = new object();
Connection? connection;

public override IDisposable Connect()
{
lock (gate)
{
if (connection == null)
{
var subscription = source.Subscribe(subject.AsObserver());
connection = new Connection(this, subscription);
}

return connection;
}
}

protected override IDisposable SubscribeCore(Observer<T> observer)
{
return subject.Subscribe(observer);
}

sealed class Connection(Multicast<T> parent, IDisposable? subscription) : IDisposable
{
public void Dispose()
{
lock (parent.gate)
{
if (subscription != null)
{
subscription.Dispose();
subscription = null;
parent.connection = null;
}
}
}
}
}
7 changes: 0 additions & 7 deletions src/R3/Subject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@

namespace R3;

public interface ISubject<T>
{
void OnNext(T value);
void OnErrorResume(Exception error);
void OnCompleted(Result complete);
}

public sealed class Subject<T> : Observable<T>, ISubject<T>, IDisposable
{
int calledCompleted = 0;
Expand Down

0 comments on commit cf8b5e7

Please sign in to comment.