Skip to content

Commit

Permalink
Cleaned up redundant code
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPulman committed Sep 25, 2016
1 parent cdbace2 commit 9fe7215
Showing 1 changed file with 63 additions and 80 deletions.
143 changes: 63 additions & 80 deletions SerialPortRx/SerialPortRxMixins.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using Reactive.Bindings.Extensions;

/// <summary>
/// Serial Port Rx Mixins
Expand Down Expand Up @@ -44,54 +45,46 @@ public static class SerialPortRxMixins
/// <returns>A string made up from the char values between the start and end chars</returns>
public static IObservable<string> BufferUntil(this IObservable<char> @this, IObservable<char> startsWith, IObservable<char> endsWith, int timeOut) => Observable.Create<string>(o =>
{
var dis = new CompositeDisposable();
var sb = new StringBuilder();

bool startFound = false;
int elapsedTime = 0;
char startsWithL = ' ';
var swd = startsWith.Subscribe(sw =>
startsWith.Subscribe(sw =>
{
startsWithL = sw;
elapsedTime = 0;
});
}).AddTo(dis);
char endsWithL = ' ';
var ewd = endsWith.Subscribe(ew => endsWithL = ew);
string defaultValueL = string.Empty;
try
var ewd = endsWith.Subscribe(ew => endsWithL = ew).AddTo(dis);
var sub = @this.Subscribe(s =>
{
var sub = @this.Subscribe(s =>
elapsedTime = 0;
if (startFound || s == startsWithL)
{
elapsedTime = 0;
if (startFound || s == startsWithL)
startFound = true;
sb.Append(s);
if (s == endsWithL)
{
startFound = true;
sb.Append(s);
if (s == endsWithL)
{
o.OnNext(sb.ToString());
startFound = false;
sb.Clear();
}
o.OnNext(sb.ToString());
startFound = false;
sb.Clear();
}
});
}
catch (Exception)
{
o.OnNext(defaultValueL);
}
var tim = Observable.Interval(TimeSpan.FromMilliseconds(1)).Subscribe(_ =>
}
}).AddTo(dis);
Observable.Interval(TimeSpan.FromMilliseconds(1)).Subscribe(_ =>
{
elapsedTime++;
if (elapsedTime > timeOut)
{
o.OnNext(defaultValueL);
startFound = false;
sb.Clear();
elapsedTime = 0;
}
});
}).AddTo(dis);

return new CompositeDisposable(tim, swd, ewd);
return dis;
});

/// <summary>
Expand All @@ -106,37 +99,38 @@ public static IObservable<string> BufferUntil(this IObservable<char> @this, IObs
/// <returns>A string made up from the char values between the start and end chars</returns>
public static IObservable<string> BufferUntil(this IObservable<char> @this, IObservable<char> startsWith, IObservable<char> endsWith, IObservable<string> defaultValue, int timeOut) => Observable.Create<string>(o =>
{
var dis = new CompositeDisposable();
var sb = new StringBuilder();

bool startFound = false;
int elapsedTime = 0;
char startsWithL = ' ';
var swd = startsWith.Subscribe(sw =>
startsWith.Subscribe(sw =>
{
startsWithL = sw;
elapsedTime = 0;
});
}).AddTo(dis);
char endsWithL = ' ';
var ewd = endsWith.Subscribe(ew => endsWithL = ew);
endsWith.Subscribe(ew => endsWithL = ew).AddTo(dis);
string defaultValueL = string.Empty;
var dvd = defaultValue.Subscribe(dv => defaultValueL = dv);
var sub = @this.Subscribe(s =>
{
elapsedTime = 0;
if (startFound || s == startsWithL)
{
startFound = true;
sb.Append(s);
if (s == endsWithL)
{
o.OnNext(sb.ToString());
startFound = false;
sb.Clear();
}
}
});

var tim = Observable.Interval(TimeSpan.FromMilliseconds(1)).Subscribe(_ =>
defaultValue.Subscribe(dv => defaultValueL = dv).AddTo(dis);
@this.Subscribe(s =>
{
elapsedTime = 0;
if (startFound || s == startsWithL)
{
startFound = true;
sb.Append(s);
if (s == endsWithL)
{
o.OnNext(sb.ToString());
startFound = false;
sb.Clear();
}
}
}).AddTo(dis);

Observable.Interval(TimeSpan.FromMilliseconds(1)).Subscribe(_ =>
{
elapsedTime++;
if (elapsedTime > timeOut)
Expand All @@ -146,9 +140,9 @@ public static IObservable<string> BufferUntil(this IObservable<char> @this, IObs
sb.Clear();
elapsedTime = 0;
}
});
}).AddTo(dis);

return new CompositeDisposable(sub, tim, swd, ewd, dvd);
return dis;
});

/// <summary>
Expand All @@ -160,27 +154,21 @@ public static IObservable<string> BufferUntil(this IObservable<char> @this, IObs
public static IObservable<T> ForEach<T>(this IObservable<T[]> @this) =>
Observable.Create<T>(obs =>
{
var dis = @this.Subscribe(list =>
return @this.Subscribe(list =>
{
foreach (var item in list)
{
if (item != null)
obs.OnNext(item);
}
}, obs.OnError, obs.OnCompleted);

return dis;
});

/// <summary>
/// <para>Repeats the source observable sequence until it successfully terminates.</para>
/// <para>This is same as Retry().</para>
/// </summary>
public static IObservable<TSource> OnErrorRetry<TSource>(this IObservable<TSource> source)
{
var result = source.Retry();
return result;
}
public static IObservable<TSource> OnErrorRetry<TSource>(this IObservable<TSource> source) => source.Retry();

/// <summary>
/// When caught exception, do onError action and repeat observable sequence.
Expand Down Expand Up @@ -212,30 +200,26 @@ public static IObservable<TSource> OnErrorRetry<TSource, TException>(this IObser
/// time(work on delayScheduler) during within retryCount.
/// </summary>
public static IObservable<TSource> OnErrorRetry<TSource, TException>(this IObservable<TSource> source, Action<TException> onError, int retryCount, TimeSpan delay, IScheduler delayScheduler)
where TException : Exception
{
var result = Observable.Defer(() =>
{
var dueTime = (delay.Ticks < 0) ? TimeSpan.Zero : delay;
var empty = Observable.Empty<TSource>();
var count = 0;

IObservable<TSource> self = null;
self = source.Catch((TException ex) =>
where TException : Exception =>
Observable.Defer(() =>
{
onError(ex);
var dueTime = (delay.Ticks < 0) ? TimeSpan.Zero : delay;
var empty = Observable.Empty<TSource>();
var count = 0;

return (++count < retryCount)
? (dueTime == TimeSpan.Zero)
? self.SubscribeOn(Scheduler.CurrentThread)
: empty.Delay(dueTime, delayScheduler).Concat(self).SubscribeOn(Scheduler.CurrentThread)
: Observable.Throw<TSource>(ex);
});
return self;
});
IObservable<TSource> self = null;
self = source.Catch((TException ex) =>
{
onError(ex);

return result;
}
return (++count < retryCount)
? (dueTime == TimeSpan.Zero)
? self.SubscribeOn(Scheduler.CurrentThread)
: empty.Delay(dueTime, delayScheduler).Concat(self).SubscribeOn(Scheduler.CurrentThread)
: Observable.Throw<TSource>(ex);
});
return self;
});

/// <summary>
/// Executes while port is open at the given TimeSpan.
Expand All @@ -249,8 +233,7 @@ public static IObservable<bool> WhileIsOpen(this SerialPortRx @this, TimeSpan ti
var isOpen = from a in Observable.Interval(timespan)
from b in @this.IsOpen.DistinctUntilChanged()
select b;
var dis = isOpen.Where(x => x == true).Subscribe(obs);
return dis;
return isOpen.Where(x => x).Subscribe(obs);
}));

/// <summary>
Expand Down

0 comments on commit 9fe7215

Please sign in to comment.