Skip to content

Commit

Permalink
Add connect
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPulman committed Jul 22, 2022
1 parent 383eef2 commit e3690ab
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 49 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project DefaultTargets="Build"
xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Version>2.2.4</Version>
<Version>2.2.5</Version>
<Authors>Chris Pulman</Authors>
<TargetFrameworks>net461;netstandard2.0;net6.0;</TargetFrameworks>
<Description>An Observable Com port extension of System.IO.Ports.SerialPort</Description>
Expand Down
12 changes: 10 additions & 2 deletions SerialPortRx/SerialPortRx.csproj
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<ItemGroup>
<Compile Update="**\*.cs" DependentUpon="I%(Filename).cs" />
<PackageReference Include="System.Reactive" Version="5.0.0" />
<PackageReference Include="System.IO.Ports" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<Compile Update="UdpClientRx.cs">
<DependentUpon>I%(Filename).cs</DependentUpon>
<DependentUpon>IPortRx.cs</DependentUpon>
</Compile>
<Compile Update="TcpClientRx.cs">
<DependentUpon>IPortRx.cs</DependentUpon>
</Compile>
<Compile Update="ISerialPortRx.cs">
<DependentUpon>IPortRx.cs</DependentUpon>
</Compile>
<Compile Update="SerialPortRx.cs">
<DependentUpon>ISerialPortRx.cs</DependentUpon>
</Compile>
</ItemGroup>

Expand Down
84 changes: 57 additions & 27 deletions SerialPortRx/TcpClientRx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// </copyright>

using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Reactive;
Expand Down Expand Up @@ -120,32 +119,36 @@ public int WriteTimeout
/// <value>The data received.</value>
public IObservable<int> BytesReceived => _bytesReceived.Retry().Publish().RefCount();

private IObservable<Unit> Connect => Observable.Create<Unit>(obs =>
{
var dis = new CompositeDisposable();
var lastValue = -1;
dis.Add(Observable.While(() => true, Observable.Return(Stream.ReadByte())).Retry()
.Subscribe(
d =>
{
if (lastValue != -1 || d > -1)
{
lastValue = d;
_dataReceived.OnNext(d);
}
else
{
lastValue = -1;
}
},
obs.OnError));
/// <summary>
/// Connects the specified hostname.
/// </summary>
/// <param name="hostname">The hostname.</param>
/// <param name="port">The port.</param>
public void Connect(string hostname, int port) =>
_tcpClient.Connect(hostname, port);

obs.OnNext(Unit.Default);
return Disposable.Create(() =>
{
dis.Dispose();
});
}).Publish().RefCount();
/// <summary>
/// Connects the specified address.
/// </summary>
/// <param name="address">The address.</param>
/// <param name="port">The port.</param>
public void Connect(IPAddress address, int port) =>
_tcpClient.Connect(address, port);

/// <summary>
/// Connects the specified remote ep.
/// </summary>
/// <param name="remoteEP">The remote ep.</param>
public void Connect(IPEndPoint remoteEP) =>
_tcpClient.Connect(remoteEP);

/// <summary>
/// Connects the specified ip addresses.
/// </summary>
/// <param name="ipAddresses">The ip addresses.</param>
/// <param name="port">The port.</param>
public void Connect(IPAddress[] ipAddresses, int port) =>
_tcpClient.Connect(ipAddresses, port);

/// <summary>
/// Opens this instance.
Expand All @@ -158,7 +161,7 @@ public Task Open()
_disposablePort = new();
}

return _disposablePort?.Count == 0 ? Task.Run(() => Connect.Subscribe().AddTo(_disposablePort)) : Task.CompletedTask;
return _disposablePort?.Count == 0 ? Task.Run(() => Connect().Subscribe().AddTo(_disposablePort)) : Task.CompletedTask;
}

/// <summary>
Expand Down Expand Up @@ -232,4 +235,31 @@ protected virtual void Dispose(bool disposing)
_disposedValue = true;
}
}

private IObservable<Unit> Connect() => Observable.Create<Unit>(obs =>
{
var dis = new CompositeDisposable();
var lastValue = -1;
dis.Add(Observable.While(() => true, Observable.Return(Stream.ReadByte())).Retry()
.Subscribe(
d =>
{
if (lastValue != -1 || d > -1)
{
lastValue = d;
_dataReceived.OnNext(d);
}
else
{
lastValue = -1;
}
},
obs.OnError));

obs.OnNext(Unit.Default);
return Disposable.Create(() =>
{
dis.Dispose();
});
}).Publish().RefCount();
}
65 changes: 47 additions & 18 deletions SerialPortRx/UdpClientRx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class UdpClientRx : IPortRx
/// <param name="localEP">The local ep.</param>
public UdpClientRx(IPEndPoint localEP) => _udpClient = new(localEP);

/// <summary>
/// Initializes a new instance of the <see cref="UdpClientRx"/> class.
/// </summary>
/// <param name="port">The port.</param>
public UdpClientRx(int port) => _udpClient = new(port);

/// <summary>
/// Initializes a new instance of the <see cref="UdpClientRx"/> class.
/// </summary>
Expand Down Expand Up @@ -116,24 +122,28 @@ public int WriteTimeout
/// <value>The data received.</value>
public IObservable<int> BytesReceived => _bytesReceived.Retry().Publish().RefCount();

private IObservable<Unit> Connect => Observable.Create<Unit>(obs =>
{
var dis = new CompositeDisposable
{
_udpClient!.ReceiveAsync()
.ToObservable()
.Select(x => x.Buffer)
.ForEach()
.Retry()
.Subscribe(d => _dataReceived.OnNext(d), obs.OnError)
};
/// <summary>
/// Connects the specified hostname.
/// </summary>
/// <param name="hostname">The hostname.</param>
/// <param name="port">The port.</param>
public void Connect(string hostname, int port) =>
_udpClient!.Connect(hostname, port);

obs.OnNext(Unit.Default);
return Disposable.Create(() =>
{
dis.Dispose();
});
}).Publish().RefCount();
/// <summary>
/// Connects the specified addr.
/// </summary>
/// <param name="addr">The addr.</param>
/// <param name="port">The port.</param>
public void Connect(IPAddress addr, int port) =>
_udpClient!.Connect(addr, port);

/// <summary>
/// Connects the specified end point.
/// </summary>
/// <param name="endPoint">The end point.</param>
public void Connect(IPEndPoint endPoint) =>
_udpClient!.Connect(endPoint);

/// <summary>
/// Returns a UDP datagram asynchronously that was sent by a remote host.
Expand All @@ -152,7 +162,7 @@ public Task Open()
_disposablePort = new();
}

return _disposablePort?.Count == 0 ? Task.Run(() => Connect.Subscribe().AddTo(_disposablePort)) : Task.CompletedTask;
return _disposablePort?.Count == 0 ? Task.Run(() => Connect().Subscribe().AddTo(_disposablePort)) : Task.CompletedTask;
}

/// <summary>
Expand Down Expand Up @@ -322,4 +332,23 @@ protected virtual void Dispose(bool disposing)
_disposedValue = true;
}
}

private IObservable<Unit> Connect() => Observable.Create<Unit>(obs =>
{
var dis = new CompositeDisposable
{
_udpClient!.ReceiveAsync()
.ToObservable()
.Select(x => x.Buffer)
.ForEach()
.Retry()
.Subscribe(d => _dataReceived.OnNext(d), obs.OnError)
};

obs.OnNext(Unit.Default);
return Disposable.Create(() =>
{
dis.Dispose();
});
}).Publish().RefCount();
}
2 changes: 1 addition & 1 deletion Version.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
"version": "2.2.4",
"version": "2.2.5",
"nuGetPackageVersion": {
"semVer": 2.0
},
Expand Down

0 comments on commit e3690ab

Please sign in to comment.