diff --git a/SerialPortRx/SerialPortRxMixins.cs b/SerialPortRx/SerialPortRxMixins.cs index 71c04d4..5857da9 100644 --- a/SerialPortRx/SerialPortRxMixins.cs +++ b/SerialPortRx/SerialPortRxMixins.cs @@ -7,6 +7,7 @@ using System.Reactive.Disposables; using System.Reactive.Linq; using System.Text; + using Reactive.Bindings.Extensions; /// /// Serial Port Rx Mixins @@ -44,54 +45,46 @@ public static class SerialPortRxMixins /// A string made up from the char values between the start and end chars public static IObservable BufferUntil(this IObservable @this, IObservable startsWith, IObservable endsWith, int timeOut) => Observable.Create(o => { + var dis = new CompositeDisposable(); var sb = new StringBuilder(); bool startFound = false; int elapsedTime = 0; char startsWithL = ' '; - var swd = startsWith.Subscribe(sw => + startsWith.Subscribe(sw => { startsWithL = sw; elapsedTime = 0; - }); + }).AddTo(dis); char endsWithL = ' '; - var ewd = endsWith.Subscribe(ew => endsWithL = ew); - string defaultValueL = string.Empty; - try + var ewd = endsWith.Subscribe(ew => endsWithL = ew).AddTo(dis); + var sub = @this.Subscribe(s => { - var sub = @this.Subscribe(s => + elapsedTime = 0; + if (startFound || s == startsWithL) { - elapsedTime = 0; - if (startFound || s == startsWithL) + startFound = true; + sb.Append(s); + if (s == endsWithL) { - startFound = true; - sb.Append(s); - if (s == endsWithL) - { - o.OnNext(sb.ToString()); - startFound = false; - sb.Clear(); - } + o.OnNext(sb.ToString()); + startFound = false; + sb.Clear(); } - }); - } - catch (Exception) - { - o.OnNext(defaultValueL); - } - var tim = Observable.Interval(TimeSpan.FromMilliseconds(1)).Subscribe(_ => + } + }).AddTo(dis); + Observable.Interval(TimeSpan.FromMilliseconds(1)).Subscribe(_ => { elapsedTime++; if (elapsedTime > timeOut) { - o.OnNext(defaultValueL); startFound = false; sb.Clear(); elapsedTime = 0; } - }); + }).AddTo(dis); - return new CompositeDisposable(tim, swd, ewd); + return dis; }); /// @@ -106,37 +99,38 @@ public static IObservable BufferUntil(this IObservable @this, IObs /// A string made up from the char values between the start and end chars public static IObservable BufferUntil(this IObservable @this, IObservable startsWith, IObservable endsWith, IObservable defaultValue, int timeOut) => Observable.Create(o => { + var dis = new CompositeDisposable(); var sb = new StringBuilder(); bool startFound = false; int elapsedTime = 0; char startsWithL = ' '; - var swd = startsWith.Subscribe(sw => + startsWith.Subscribe(sw => { startsWithL = sw; elapsedTime = 0; - }); + }).AddTo(dis); char endsWithL = ' '; - var ewd = endsWith.Subscribe(ew => endsWithL = ew); + endsWith.Subscribe(ew => endsWithL = ew).AddTo(dis); string defaultValueL = string.Empty; - var dvd = defaultValue.Subscribe(dv => defaultValueL = dv); - var sub = @this.Subscribe(s => - { - elapsedTime = 0; - if (startFound || s == startsWithL) - { - startFound = true; - sb.Append(s); - if (s == endsWithL) - { - o.OnNext(sb.ToString()); - startFound = false; - sb.Clear(); - } - } - }); - - var tim = Observable.Interval(TimeSpan.FromMilliseconds(1)).Subscribe(_ => + defaultValue.Subscribe(dv => defaultValueL = dv).AddTo(dis); + @this.Subscribe(s => + { + elapsedTime = 0; + if (startFound || s == startsWithL) + { + startFound = true; + sb.Append(s); + if (s == endsWithL) + { + o.OnNext(sb.ToString()); + startFound = false; + sb.Clear(); + } + } + }).AddTo(dis); + + Observable.Interval(TimeSpan.FromMilliseconds(1)).Subscribe(_ => { elapsedTime++; if (elapsedTime > timeOut) @@ -146,9 +140,9 @@ public static IObservable BufferUntil(this IObservable @this, IObs sb.Clear(); elapsedTime = 0; } - }); + }).AddTo(dis); - return new CompositeDisposable(sub, tim, swd, ewd, dvd); + return dis; }); /// @@ -160,7 +154,7 @@ public static IObservable BufferUntil(this IObservable @this, IObs public static IObservable ForEach(this IObservable @this) => Observable.Create(obs => { - var dis = @this.Subscribe(list => + return @this.Subscribe(list => { foreach (var item in list) { @@ -168,19 +162,13 @@ public static IObservable ForEach(this IObservable @this) => obs.OnNext(item); } }, obs.OnError, obs.OnCompleted); - - return dis; }); /// /// Repeats the source observable sequence until it successfully terminates. /// This is same as Retry(). /// - public static IObservable OnErrorRetry(this IObservable source) - { - var result = source.Retry(); - return result; - } + public static IObservable OnErrorRetry(this IObservable source) => source.Retry(); /// /// When caught exception, do onError action and repeat observable sequence. @@ -212,30 +200,26 @@ public static IObservable OnErrorRetry(this IObser /// time(work on delayScheduler) during within retryCount. /// public static IObservable OnErrorRetry(this IObservable source, Action onError, int retryCount, TimeSpan delay, IScheduler delayScheduler) - where TException : Exception - { - var result = Observable.Defer(() => - { - var dueTime = (delay.Ticks < 0) ? TimeSpan.Zero : delay; - var empty = Observable.Empty(); - var count = 0; - - IObservable self = null; - self = source.Catch((TException ex) => +where TException : Exception => + Observable.Defer(() => { - onError(ex); + var dueTime = (delay.Ticks < 0) ? TimeSpan.Zero : delay; + var empty = Observable.Empty(); + var count = 0; - return (++count < retryCount) - ? (dueTime == TimeSpan.Zero) - ? self.SubscribeOn(Scheduler.CurrentThread) - : empty.Delay(dueTime, delayScheduler).Concat(self).SubscribeOn(Scheduler.CurrentThread) - : Observable.Throw(ex); - }); - return self; - }); + IObservable self = null; + self = source.Catch((TException ex) => + { + onError(ex); - return result; - } + return (++count < retryCount) + ? (dueTime == TimeSpan.Zero) + ? self.SubscribeOn(Scheduler.CurrentThread) + : empty.Delay(dueTime, delayScheduler).Concat(self).SubscribeOn(Scheduler.CurrentThread) + : Observable.Throw(ex); + }); + return self; + }); /// /// Executes while port is open at the given TimeSpan. @@ -249,8 +233,7 @@ public static IObservable WhileIsOpen(this SerialPortRx @this, TimeSpan ti var isOpen = from a in Observable.Interval(timespan) from b in @this.IsOpen.DistinctUntilChanged() select b; - var dis = isOpen.Where(x => x == true).Subscribe(obs); - return dis; + return isOpen.Where(x => x).Subscribe(obs); })); ///