From e3690ab80164b1a5b6ff1b4da663d2a002a7cf97 Mon Sep 17 00:00:00 2001 From: Chris Pulman Date: Fri, 22 Jul 2022 16:19:28 +0100 Subject: [PATCH] Add connect --- Directory.Build.props | 2 +- SerialPortRx/SerialPortRx.csproj | 12 ++++- SerialPortRx/TcpClientRx.cs | 84 ++++++++++++++++++++++---------- SerialPortRx/UdpClientRx.cs | 65 +++++++++++++++++------- Version.json | 2 +- 5 files changed, 116 insertions(+), 49 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index 0c5f1f3..73de969 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 2.2.4 + 2.2.5 Chris Pulman net461;netstandard2.0;net6.0; An Observable Com port extension of System.IO.Ports.SerialPort diff --git a/SerialPortRx/SerialPortRx.csproj b/SerialPortRx/SerialPortRx.csproj index c6f2de5..8932a4a 100644 --- a/SerialPortRx/SerialPortRx.csproj +++ b/SerialPortRx/SerialPortRx.csproj @@ -1,14 +1,22 @@  - - I%(Filename).cs + IPortRx.cs + + + IPortRx.cs + + + IPortRx.cs + + + ISerialPortRx.cs diff --git a/SerialPortRx/TcpClientRx.cs b/SerialPortRx/TcpClientRx.cs index 3ed8118..a07bc26 100644 --- a/SerialPortRx/TcpClientRx.cs +++ b/SerialPortRx/TcpClientRx.cs @@ -4,7 +4,6 @@ // using System; -using System.IO; using System.Net; using System.Net.Sockets; using System.Reactive; @@ -120,32 +119,36 @@ public int WriteTimeout /// The data received. public IObservable BytesReceived => _bytesReceived.Retry().Publish().RefCount(); - private IObservable Connect => Observable.Create(obs => - { - var dis = new CompositeDisposable(); - var lastValue = -1; - dis.Add(Observable.While(() => true, Observable.Return(Stream.ReadByte())).Retry() - .Subscribe( - d => - { - if (lastValue != -1 || d > -1) - { - lastValue = d; - _dataReceived.OnNext(d); - } - else - { - lastValue = -1; - } - }, - obs.OnError)); + /// + /// Connects the specified hostname. + /// + /// The hostname. + /// The port. + public void Connect(string hostname, int port) => + _tcpClient.Connect(hostname, port); - obs.OnNext(Unit.Default); - return Disposable.Create(() => - { - dis.Dispose(); - }); - }).Publish().RefCount(); + /// + /// Connects the specified address. + /// + /// The address. + /// The port. + public void Connect(IPAddress address, int port) => + _tcpClient.Connect(address, port); + + /// + /// Connects the specified remote ep. + /// + /// The remote ep. + public void Connect(IPEndPoint remoteEP) => + _tcpClient.Connect(remoteEP); + + /// + /// Connects the specified ip addresses. + /// + /// The ip addresses. + /// The port. + public void Connect(IPAddress[] ipAddresses, int port) => + _tcpClient.Connect(ipAddresses, port); /// /// Opens this instance. @@ -158,7 +161,7 @@ public Task Open() _disposablePort = new(); } - return _disposablePort?.Count == 0 ? Task.Run(() => Connect.Subscribe().AddTo(_disposablePort)) : Task.CompletedTask; + return _disposablePort?.Count == 0 ? Task.Run(() => Connect().Subscribe().AddTo(_disposablePort)) : Task.CompletedTask; } /// @@ -232,4 +235,31 @@ protected virtual void Dispose(bool disposing) _disposedValue = true; } } + + private IObservable Connect() => Observable.Create(obs => + { + var dis = new CompositeDisposable(); + var lastValue = -1; + dis.Add(Observable.While(() => true, Observable.Return(Stream.ReadByte())).Retry() + .Subscribe( + d => + { + if (lastValue != -1 || d > -1) + { + lastValue = d; + _dataReceived.OnNext(d); + } + else + { + lastValue = -1; + } + }, + obs.OnError)); + + obs.OnNext(Unit.Default); + return Disposable.Create(() => + { + dis.Dispose(); + }); + }).Publish().RefCount(); } diff --git a/SerialPortRx/UdpClientRx.cs b/SerialPortRx/UdpClientRx.cs index 8e5250e..1362b82 100644 --- a/SerialPortRx/UdpClientRx.cs +++ b/SerialPortRx/UdpClientRx.cs @@ -43,6 +43,12 @@ public class UdpClientRx : IPortRx /// The local ep. public UdpClientRx(IPEndPoint localEP) => _udpClient = new(localEP); + /// + /// Initializes a new instance of the class. + /// + /// The port. + public UdpClientRx(int port) => _udpClient = new(port); + /// /// Initializes a new instance of the class. /// @@ -116,24 +122,28 @@ public int WriteTimeout /// The data received. public IObservable BytesReceived => _bytesReceived.Retry().Publish().RefCount(); - private IObservable Connect => Observable.Create(obs => - { - var dis = new CompositeDisposable - { - _udpClient!.ReceiveAsync() - .ToObservable() - .Select(x => x.Buffer) - .ForEach() - .Retry() - .Subscribe(d => _dataReceived.OnNext(d), obs.OnError) - }; + /// + /// Connects the specified hostname. + /// + /// The hostname. + /// The port. + public void Connect(string hostname, int port) => + _udpClient!.Connect(hostname, port); - obs.OnNext(Unit.Default); - return Disposable.Create(() => - { - dis.Dispose(); - }); - }).Publish().RefCount(); + /// + /// Connects the specified addr. + /// + /// The addr. + /// The port. + public void Connect(IPAddress addr, int port) => + _udpClient!.Connect(addr, port); + + /// + /// Connects the specified end point. + /// + /// The end point. + public void Connect(IPEndPoint endPoint) => + _udpClient!.Connect(endPoint); /// /// Returns a UDP datagram asynchronously that was sent by a remote host. @@ -152,7 +162,7 @@ public Task Open() _disposablePort = new(); } - return _disposablePort?.Count == 0 ? Task.Run(() => Connect.Subscribe().AddTo(_disposablePort)) : Task.CompletedTask; + return _disposablePort?.Count == 0 ? Task.Run(() => Connect().Subscribe().AddTo(_disposablePort)) : Task.CompletedTask; } /// @@ -322,4 +332,23 @@ protected virtual void Dispose(bool disposing) _disposedValue = true; } } + + private IObservable Connect() => Observable.Create(obs => + { + var dis = new CompositeDisposable + { + _udpClient!.ReceiveAsync() + .ToObservable() + .Select(x => x.Buffer) + .ForEach() + .Retry() + .Subscribe(d => _dataReceived.OnNext(d), obs.OnError) + }; + + obs.OnNext(Unit.Default); + return Disposable.Create(() => + { + dis.Dispose(); + }); + }).Publish().RefCount(); } diff --git a/Version.json b/Version.json index 17d2d63..9f18639 100644 --- a/Version.json +++ b/Version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json", - "version": "2.2.4", + "version": "2.2.5", "nuGetPackageVersion": { "semVer": 2.0 },