diff --git a/.github/workflows/BuildOnly.yml b/.github/workflows/BuildOnly.yml
index e27ecac..06ee0e6 100644
--- a/.github/workflows/BuildOnly.yml
+++ b/.github/workflows/BuildOnly.yml
@@ -11,34 +11,26 @@ jobs:
outputs:
nbgv: ${{ steps.nbgv.outputs.SemVer2 }}
steps:
- - name: Get Current Visual Studio Information
- shell: bash
- run: |
- dotnet tool update -g dotnet-vs
- echo "-- About RELEASE --"
- vs where release
-
- - name: Update Visual Studio Latest Release
- shell: bash
- run: |
- echo "-- Update RELEASE --"
- vs update release Enterprise
- vs modify release Enterprise +mobile +desktop +uwp +web
- echo "-- About RELEASE Updated --"
- vs where release
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0 # avoid shallow clone so nbgv can do its work.
- - name: Setup .NET
+ - name: Setup .NET 6/7
uses: actions/setup-dotnet@v3
with:
dotnet-version: |
6.0.x
7.0.x
+ - name: Setup .NET 8
+ uses: actions/setup-dotnet@v3
+ with:
+ dotnet-quality: 'preview'
+ dotnet-version: |
+ 8.0.x
+
- name: Add MSBuild to PATH
uses: microsoft/setup-msbuild@v1.1.3
with:
@@ -49,6 +41,7 @@ jobs:
uses: dotnet/nbgv@master
with:
setAllVars: true
+
- run: echo 'SemVer2=${{ steps.nbgv.outputs.SemVer2 }}'
- name: NuGet Restore
@@ -56,21 +49,9 @@ jobs:
working-directory: src
- name: Build
- run: msbuild /t:build,pack /nowarn:MSB4011 /maxcpucount /p:NoPackageAnalysis=true /verbosity:minimal /p:Configuration=Release SerialPortRx.sln
+ run: dotnet build --no-restore --configuration Release SerialPortRx.sln
working-directory: src
- - name: Run Unit Tests and Generate Coverage
- uses: glennawatson/coverlet-msbuild@v2.1
- with:
- project-files: 'src/**/*Tests*.csproj'
- no-build: true
- include-filter: 'SerialPortRx*'
- output-format: cobertura
- configuration: Release
-
- - name: Upload Code Coverage
- uses: codecov/codecov-action@v3
-
- name: Create NuGet Artifacts
uses: actions/upload-artifact@master
with:
diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
index bb04bd5..f669856 100644
--- a/.github/workflows/dotnet.yml
+++ b/.github/workflows/dotnet.yml
@@ -11,34 +11,26 @@ jobs:
outputs:
nbgv: ${{ steps.nbgv.outputs.SemVer2 }}
steps:
- - name: Get Current Visual Studio Information
- shell: bash
- run: |
- dotnet tool update -g dotnet-vs
- echo "-- About RELEASE --"
- vs where release
-
- - name: Update Visual Studio Latest Release
- shell: bash
- run: |
- echo "-- Update RELEASE --"
- vs update release Enterprise
- vs modify release Enterprise +mobile +desktop +uwp +web
- echo "-- About RELEASE Updated --"
- vs where release
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0 # avoid shallow clone so nbgv can do its work.
- - name: Setup .NET
+ - name: Setup .NET 6/7
uses: actions/setup-dotnet@v3
with:
dotnet-version: |
6.0.x
7.0.x
+ - name: Setup .NET 8
+ uses: actions/setup-dotnet@v3
+ with:
+ dotnet-quality: 'preview'
+ dotnet-version: |
+ 8.0.x
+
- name: Add MSBuild to PATH
uses: microsoft/setup-msbuild@v1.1.3
with:
@@ -49,6 +41,7 @@ jobs:
uses: dotnet/nbgv@master
with:
setAllVars: true
+
- run: echo 'SemVer2=${{ steps.nbgv.outputs.SemVer2 }}'
- name: NuGet Restore
@@ -56,7 +49,7 @@ jobs:
working-directory: src
- name: Build
- run: msbuild /t:build,pack /nowarn:MSB4011 /maxcpucount /p:NoPackageAnalysis=true /verbosity:minimal /p:Configuration=Release SerialPortRx.sln
+ run: dotnet build --no-restore --configuration Release SerialPortRx.sln
working-directory: src
- name: Create NuGet Artifacts
diff --git a/Directory.Build.props b/Directory.Build.props
index 44f167b..67b1b7e 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -11,10 +11,9 @@
preview
$(TargetFramework)
ChrisPulman
- CS1591;IDE0190;IDE1006
+ $(NoWarn);CS1591;IDE0190;IDE1006;SA1010
enable
logo.png
- Debug;Release;PreRelease
Compatability with Net 6, Net 7 and netstandard2.0
SerialPort;rx;reactive;extensions;observable;LINQ;net;netstandard
True
@@ -47,20 +46,6 @@
-
-
-
-
-
- runtime; build; native; contentfiles; analyzers; buildtransitive
- all
-
-
-
- all
- runtime; build; native; contentfiles; analyzers
-
-
true
@@ -70,7 +55,7 @@
-
+
diff --git a/README.md b/README.md
index 3efb296..ba4578e 100644
--- a/README.md
+++ b/README.md
@@ -10,56 +10,65 @@ using System;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
+using ReactiveMarbles.Extensions;
-namespace CP.IO.Ports.Test
+namespace CP.IO.Ports.Test;
+
+internal static class Program
{
- internal class Program
+ private static void Main(string[] args)
{
- private static void Main(string[] args)
+ const string comPortName = "COM1";
+
+ // configure the data to write, this can be a string, a byte array, or a char array
+ const string dataToWrite = "DataToWrite";
+ var dis = new CompositeDisposable();
+
+ // Setup the start of message and end of message
+ var startChar = 0x21.AsObservable();
+ var endChar = 0x0a.AsObservable();
+
+ // Create a disposable for each COM port to allow automatic disposal upon loss of COM port
+ var comdis = new CompositeDisposable();
+
+ // Subscribe to com ports available
+ SerialPortRx.PortNames().Do(x =>
+ {
+ if (comdis?.Count == 0 && x.Contains(comPortName))
+ {
+ // Create a port
+ var port = new SerialPortRx(comPortName, 9600);
+ port.DisposeWith(comdis);
+
+ // Subscribe to Exceptions from port
+ port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comdis);
+ port.IsOpenObservable.Subscribe(x => Console.WriteLine($"Port {comPortName} is {(x ? "Open" : "Closed")}")).DisposeWith(comdis);
+
+ // Subscribe to the Data Received
+ port.DataReceived.BufferUntil(startChar, endChar, 100).Subscribe(data => Console.WriteLine(data)).DisposeWith(comdis);
+
+ // Subscribe to the Is Open @500ms intervals and write to com port
+ port.WhileIsOpen(TimeSpan.FromMilliseconds(500)).Subscribe(_ => port.Write(dataToWrite)).DisposeWith(comdis);
+
+ // Open the Com Port after subscriptions created
+ port.Open();
+ }
+ else
+ {
+ comdis?.Dispose();
+ Console.WriteLine($"Port {comPortName} Disposed");
+ comdis = [];
+ }
+ }).ForEach().Subscribe(name =>
{
- var comPortName = "COM1";
- // configure the data to write, this can be a string, a byte array, or a char array
- var dataToWrite = "DataToWrite";
- var dis = new CompositeDisposable();
- // Setup the start of message and end of message
- var startChar = (0x21).AsObservable();
- var endChar = (0x0a).AsObservable();
- // Create a disposable for each COM port to allow automatic disposal upon loss of COM port
- var comdis = new CompositeDisposable();
- // Subscribe to com ports available
- SerialPortRx.PortNames().Do(x => {
- if (comdis?.Count == 0 && x.Contains(comPortName)) {
- // Create a port
- var port = new SerialPortRx(comPortName, 9600);
- port.AddTo(comdis);
- // Subscribe to Exceptions from port
- port.ErrorReceived.Subscribe(Console.WriteLine).AddTo(comdis);
- port.IsOpenObservable.Subscribe(x => Console.WriteLine($"Port {comPortName} is {(x ? "Open" : "Closed")}")).AddTo(comdis);
- // Subscribe to the Data Received
- port.DataReceived.BufferUntil(startChar, endChar, 100).Subscribe(data => {
- Console.WriteLine(data);
- }).AddTo(comdis);
- // Subscribe to the Is Open @500ms intervals and write to com port
- port.WhileIsOpen(TimeSpan.FromMilliseconds(500)).Subscribe(x => {
- port.Write(dataToWrite);
- }).AddTo(comdis);
- // Open the Com Port after subscriptions created
- port.Open();
- } else {
- comdis.Dispose();
- Console.WriteLine($"Port {comPortName} Disposed");
- comdis = new CompositeDisposable();
- }
- }).ForEach().Subscribe(name => {
- // Show available ports
- Console.WriteLine(name);
- }).AddTo(dis);
- Console.ReadLine();
- // Cleanup ports
- comdis.Dispose();
- dis.Dispose();
- }
+ // Show available ports
+ Console.WriteLine(name);
+ }).DisposeWith(dis);
+ Console.ReadLine();
+
+ // Cleanup ports
+ comdis.Dispose();
+ dis.Dispose();
}
}
-
```
diff --git a/Version.json b/Version.json
index db88420..2b0d668 100644
--- a/Version.json
+++ b/Version.json
@@ -1,6 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
- "version": "2.3.4",
+ "version": "3.0.1",
"nuGetPackageVersion": {
"semVer": 2.0
},
diff --git a/src/SerialPortRx.Test/Program.cs b/src/SerialPortRx.Test/Program.cs
index 4570402..e49aa7b 100644
--- a/src/SerialPortRx.Test/Program.cs
+++ b/src/SerialPortRx.Test/Program.cs
@@ -5,6 +5,7 @@
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
+using ReactiveMarbles.Extensions;
namespace CP.IO.Ports.Test;
@@ -32,17 +33,17 @@ private static void Main(string[] args)
{
// Create a port
var port = new SerialPortRx(comPortName, 9600);
- port.AddTo(comdis);
+ port.DisposeWith(comdis);
// Subscribe to Exceptions from port
- port.ErrorReceived.Subscribe(Console.WriteLine).AddTo(comdis);
- port.IsOpenObservable.Subscribe(x => Console.WriteLine($"Port {comPortName} is {(x ? "Open" : "Closed")}")).AddTo(comdis);
+ port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comdis);
+ port.IsOpenObservable.Subscribe(x => Console.WriteLine($"Port {comPortName} is {(x ? "Open" : "Closed")}")).DisposeWith(comdis);
// Subscribe to the Data Received
- port.DataReceived.BufferUntil(startChar, endChar, 100).Subscribe(data => Console.WriteLine(data)).AddTo(comdis);
+ port.DataReceived.BufferUntil(startChar, endChar, 100).Subscribe(data => Console.WriteLine(data)).DisposeWith(comdis);
// Subscribe to the Is Open @500ms intervals and write to com port
- port.WhileIsOpen(TimeSpan.FromMilliseconds(500)).Subscribe(_ => port.Write(dataToWrite)).AddTo(comdis);
+ port.WhileIsOpen(TimeSpan.FromMilliseconds(500)).Subscribe(_ => port.Write(dataToWrite)).DisposeWith(comdis);
// Open the Com Port after subscriptions created
port.Open();
@@ -51,13 +52,13 @@ private static void Main(string[] args)
{
comdis?.Dispose();
Console.WriteLine($"Port {comPortName} Disposed");
- comdis = new CompositeDisposable();
+ comdis = [];
}
}).ForEach().Subscribe(name =>
{
// Show available ports
Console.WriteLine(name);
- }).AddTo(dis);
+ }).DisposeWith(dis);
Console.ReadLine();
// Cleanup ports
diff --git a/src/SerialPortRx/IDisposableExtensions.cs b/src/SerialPortRx/IDisposableExtensions.cs
deleted file mode 100644
index 8e9c59d..0000000
--- a/src/SerialPortRx/IDisposableExtensions.cs
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright (c) Chris Pulman. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-
-using System;
-using System.Collections.Generic;
-
-namespace CP.IO.Ports;
-
-///
-/// IDisposable Extensions.
-///
-public static class IDisposableExtensions
-{
- ///
- /// Add disposable(self) to CompositeDisposable(or other ICollection).
- ///
- /// The type.
- /// The disposable.
- /// The container.
- /// Type of T.
- public static T AddTo(this T disposable, ICollection container)
- where T : IDisposable
- {
- if (container == null)
- {
- throw new ArgumentNullException(nameof(container));
- }
-
- container.Add(disposable);
- return disposable;
- }
-}
diff --git a/src/SerialPortRx/Properties/AssemblyInfo.cs b/src/SerialPortRx/Properties/AssemblyInfo.cs
index 8f0f619..a3846e0 100644
--- a/src/SerialPortRx/Properties/AssemblyInfo.cs
+++ b/src/SerialPortRx/Properties/AssemblyInfo.cs
@@ -1,7 +1,6 @@
// Copyright (c) Chris Pulman. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-using System.Reflection;
using System.Runtime.InteropServices;
[assembly: ComVisible(false)]
diff --git a/src/SerialPortRx/SerialPortRx.cs b/src/SerialPortRx/SerialPortRx.cs
index 3533dc9..6268238 100644
--- a/src/SerialPortRx/SerialPortRx.cs
+++ b/src/SerialPortRx/SerialPortRx.cs
@@ -13,6 +13,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using ReactiveMarbles.Extensions;
namespace CP.IO.Ports;
@@ -22,19 +23,20 @@ namespace CP.IO.Ports;
///
public class SerialPortRx : ISerialPortRx
{
- private readonly ISubject _isOpenValue = new ReplaySubject(1);
- private readonly ISubject _dataReceived = new Subject();
- private readonly ISubject _errors = new Subject();
- private readonly ISubject<(byte[] byteArray, int offset, int count)> _writeByte = new Subject<(byte[] byteArray, int offset, int count)>();
- private readonly ISubject<(char[] charArray, int offset, int count)> _writeChar = new Subject<(char[] charArray, int offset, int count)>();
- private readonly ISubject _writeString = new Subject();
- private readonly ISubject _writeStringLine = new Subject();
- private readonly ISubject _discardInBuffer = new Subject();
- private readonly ISubject _discardOutBuffer = new Subject();
- private readonly ISubject<(byte[] buffer, int offset, int count)> _readBytes = new Subject<(byte[] buffer, int offset, int count)>();
- private readonly ISubject _bytesRead = new Subject();
- private readonly ISubject _bytesReceived = new Subject();
- private CompositeDisposable _disposablePort = new();
+ private static readonly string[] noPorts = ["NoPorts"];
+ private readonly ReplaySubject _isOpenValue = new(1);
+ private readonly Subject _dataReceived = new();
+ private readonly Subject _errors = new();
+ private readonly Subject<(byte[] byteArray, int offset, int count)> _writeByte = new();
+ private readonly Subject<(char[] charArray, int offset, int count)> _writeChar = new();
+ private readonly Subject _writeString = new();
+ private readonly Subject _writeStringLine = new();
+ private readonly Subject _discardInBuffer = new();
+ private readonly Subject _discardOutBuffer = new();
+ private readonly Subject<(byte[] buffer, int offset, int count)> _readBytes = new();
+ private readonly Subject _bytesRead = new();
+ private readonly Subject _bytesReceived = new();
+ private CompositeDisposable _disposablePort = [];
private bool _readBusy;
///
@@ -454,7 +456,7 @@ public static IObservable PortNames(int pollInterval = 500, int pollLi
var compareNew = SerialPort.GetPortNames();
if (compareNew.Length == 0)
{
- compareNew = new string[] { "NoPorts" };
+ compareNew = noPorts;
}
if (compare == null)
@@ -516,10 +518,10 @@ public Task Open()
{
if (_disposablePort?.IsDisposed != false)
{
- _disposablePort = new();
+ _disposablePort = [];
}
- return _disposablePort?.Count == 0 ? Task.Run(() => Connect.Subscribe().AddTo(_disposablePort)) : Task.CompletedTask;
+ return _disposablePort?.Count == 0 ? Task.Run(() => Connect.Subscribe().DisposeWith(_disposablePort)) : Task.CompletedTask;
}
///
@@ -544,10 +546,14 @@ public void Write(byte[] byteArray, int offset, int count) =>
/// The byte array.
public void Write(byte[] byteArray)
{
+#if NETSTANDARD
if (byteArray == null)
{
throw new ArgumentNullException(nameof(byteArray));
}
+#else
+ ArgumentNullException.ThrowIfNull(byteArray);
+#endif
_writeByte?.OnNext((byteArray, 0, byteArray.Length));
}
@@ -558,10 +564,14 @@ public void Write(byte[] byteArray)
/// The character array.
public void Write(char[] charArray)
{
+#if NETSTANDARD
if (charArray == null)
{
throw new ArgumentNullException(nameof(charArray));
}
+#else
+ ArgumentNullException.ThrowIfNull(charArray);
+#endif
_writeChar?.OnNext((charArray, 0, charArray.Length));
}
@@ -610,6 +620,18 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
+ _isOpenValue.Dispose();
+ _dataReceived.Dispose();
+ _errors.Dispose();
+ _writeByte.Dispose();
+ _writeChar.Dispose();
+ _writeString.Dispose();
+ _writeStringLine.Dispose();
+ _discardInBuffer.Dispose();
+ _discardOutBuffer.Dispose();
+ _readBytes.Dispose();
+ _bytesRead.Dispose();
+ _bytesReceived.Dispose();
_disposablePort?.Dispose();
}
diff --git a/src/SerialPortRx/SerialPortRx.csproj b/src/SerialPortRx/SerialPortRx.csproj
index b5c8dfe..1a1f9a3 100644
--- a/src/SerialPortRx/SerialPortRx.csproj
+++ b/src/SerialPortRx/SerialPortRx.csproj
@@ -1,13 +1,13 @@
- netstandard2.0;net6.0;net7.0;net6.0-windows10.0.17763.0;net7.0-windows10.0.17763.0
+ netstandard2.0;net6.0;net7.0;net8.0;net6.0-windows10.0.19041.0;net7.0-windows10.0.19041.0;net8.0-windows10.0.19041.0
-
+
HasWindows
-
-
+
+
diff --git a/src/SerialPortRx/SerialPortRxMixins.cs b/src/SerialPortRx/SerialPortRxMixins.cs
index 6fe4236..d795112 100644
--- a/src/SerialPortRx/SerialPortRxMixins.cs
+++ b/src/SerialPortRx/SerialPortRxMixins.cs
@@ -2,12 +2,12 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
-using System.Collections.Generic;
using System.IO.Ports;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
+using ReactiveMarbles.Extensions;
namespace CP.IO.Ports;
@@ -60,9 +60,9 @@ public static IObservable BufferUntil(this IObservable @this, IObs
{
startsWithL = sw;
elapsedTime = 0;
- }).AddTo(dis);
+ }).DisposeWith(dis);
var endsWithL = ' ';
- var ewd = endsWith.Subscribe(ew => endsWithL = ew).AddTo(dis);
+ var ewd = endsWith.Subscribe(ew => endsWithL = ew).DisposeWith(dis);
var sub = @this.Subscribe(s =>
{
elapsedTime = 0;
@@ -77,7 +77,7 @@ public static IObservable BufferUntil(this IObservable @this, IObs
str = string.Empty;
}
}
- }).AddTo(dis);
+ }).DisposeWith(dis);
scheduler ??= new EventLoopScheduler();
@@ -90,7 +90,7 @@ public static IObservable BufferUntil(this IObservable @this, IObs
str = string.Empty;
elapsedTime = 0;
}
- }).AddTo(dis);
+ }).DisposeWith(dis);
return dis;
});
@@ -121,11 +121,11 @@ public static IObservable BufferUntil(this IObservable @this, IObs
{
startsWithL = sw;
elapsedTime = 0;
- }).AddTo(dis);
+ }).DisposeWith(dis);
var endsWithL = ' ';
- endsWith.Subscribe(ew => endsWithL = ew).AddTo(dis);
+ endsWith.Subscribe(ew => endsWithL = ew).DisposeWith(dis);
var defaultValueL = string.Empty;
- defaultValue.Subscribe(dv => defaultValueL = dv).AddTo(dis);
+ defaultValue.Subscribe(dv => defaultValueL = dv).DisposeWith(dis);
@this.Subscribe(s =>
{
elapsedTime = 0;
@@ -140,7 +140,7 @@ public static IObservable BufferUntil(this IObservable @this, IObs
str = string.Empty;
}
}
- }).AddTo(dis);
+ }).DisposeWith(dis);
scheduler ??= new EventLoopScheduler();
@@ -154,7 +154,7 @@ public static IObservable BufferUntil(this IObservable @this, IObs
str = string.Empty;
elapsedTime = 0;
}
- }).AddTo(dis);
+ }).DisposeWith(dis);
return dis;
});
@@ -173,27 +173,6 @@ public static IObservable BufferUntil(this IObservable @this, IObs
/// Observable value.
public static IObservable> ErrorReceivedObserver(this SerialPort @this) => Observable.FromEventPattern(h => @this.ErrorReceived += h, h => @this.ErrorReceived -= h);
- ///
- /// Fors the each.
- ///
- /// The type.
- /// The this.
- /// Observable value.
- public static IObservable ForEach(this IObservable @this) =>
- Observable.Create(obs => @this.Subscribe(
- list =>
- {
- foreach (var item in list)
- {
- if (!EqualityComparer.Default.Equals(item, default!))
- {
- obs.OnNext(item);
- }
- }
- },
- obs.OnError,
- obs.OnCompleted));
-
///
/// Repeats the source observable sequence until it successfully terminates.
/// This is same as Retry().
diff --git a/src/SerialPortRx/TcpClientRx.cs b/src/SerialPortRx/TcpClientRx.cs
index 2ef9515..1851564 100644
--- a/src/SerialPortRx/TcpClientRx.cs
+++ b/src/SerialPortRx/TcpClientRx.cs
@@ -10,6 +10,7 @@
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
+using ReactiveMarbles.Extensions;
namespace CP.IO.Ports;
@@ -19,9 +20,9 @@ namespace CP.IO.Ports;
public class TcpClientRx : IPortRx
{
private readonly TcpClient _tcpClient;
- private readonly ISubject _bytesReceived = new Subject();
- private readonly ISubject _dataReceived = new Subject();
- private CompositeDisposable _disposablePort = new();
+ private readonly Subject _bytesReceived = new();
+ private readonly Subject _dataReceived = new();
+ private CompositeDisposable _disposablePort = [];
private bool _disposedValue;
///
@@ -156,10 +157,10 @@ public Task Open()
{
if (_disposablePort?.IsDisposed != false)
{
- _disposablePort = new();
+ _disposablePort = [];
}
- return _disposablePort?.Count == 0 ? Task.Run(() => Connect().Subscribe().AddTo(_disposablePort)) : Task.CompletedTask;
+ return _disposablePort?.Count == 0 ? Task.Run(() => Connect().Subscribe().DisposeWith(_disposablePort)) : Task.CompletedTask;
}
///
@@ -211,7 +212,6 @@ public void DiscardInBuffer() =>
///
public void Dispose()
{
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
@@ -226,6 +226,8 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
+ _bytesReceived.Dispose();
+ _dataReceived.Dispose();
_tcpClient.Dispose();
_disposablePort.Dispose();
}
@@ -255,9 +257,6 @@ private IObservable Connect() => Observable.Create(obs =>
obs.OnError));
obs.OnNext(Unit.Default);
- return Disposable.Create(() =>
- {
- dis.Dispose();
- });
+ return Disposable.Create(() => dis.Dispose());
}).Publish().RefCount();
}
diff --git a/src/SerialPortRx/UdpClientRx.cs b/src/SerialPortRx/UdpClientRx.cs
index 27591ef..00ce1bf 100644
--- a/src/SerialPortRx/UdpClientRx.cs
+++ b/src/SerialPortRx/UdpClientRx.cs
@@ -12,6 +12,7 @@
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
+using ReactiveMarbles.Extensions;
namespace CP.IO.Ports;
@@ -23,9 +24,9 @@ public class UdpClientRx : IPortRx
private const int MaxBufferSize = ushort.MaxValue;
private readonly UdpClient? _udpClient;
private readonly byte[] _buffer = new byte[MaxBufferSize];
- private readonly ISubject _bytesReceived = new Subject();
- private readonly ISubject _dataReceived = new Subject();
- private CompositeDisposable _disposablePort = new();
+ private readonly Subject _bytesReceived = new();
+ private readonly Subject _dataReceived = new();
+ private CompositeDisposable _disposablePort = [];
private bool _disposedValue;
private int _bufferOffset;
@@ -220,10 +221,10 @@ public Task Open()
{
if (_disposablePort?.IsDisposed != false)
{
- _disposablePort = new();
+ _disposablePort = [];
}
- return _disposablePort?.Count == 0 ? Task.Run(() => Connect().Subscribe().AddTo(_disposablePort)) : Task.CompletedTask;
+ return _disposablePort?.Count == 0 ? Task.Run(() => Connect().Subscribe().DisposeWith(_disposablePort)) : Task.CompletedTask;
}
///
@@ -239,10 +240,14 @@ public Task Open()
/// The count.
public void Write(byte[] buffer, int offset, int count)
{
+#if NETSTANDARD
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer));
}
+#else
+ ArgumentNullException.ThrowIfNull(buffer);
+#endif
if (offset < 0)
{
@@ -294,10 +299,14 @@ public Task SendAsync(byte[] dataGram, int bytes, IPEndPoint endPoint) =>
/// A int.
public Task ReadAsync(byte[] buffer, int offset, int count)
{
+#if NETSTANDARD
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer));
}
+#else
+ ArgumentNullException.ThrowIfNull(buffer);
+#endif
if (offset < 0)
{
@@ -371,7 +380,6 @@ public void DiscardInBuffer()
///
public void Dispose()
{
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
@@ -386,6 +394,8 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
+ _bytesReceived.Dispose();
+ _dataReceived.Dispose();
_udpClient?.Dispose();
_disposablePort.Dispose();
}