Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor default serial port implementation to avoid event callbacks #1444

Merged
merged 3 commits into from
Jul 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 32 additions & 83 deletions Bonsai.System/IO/Ports/ObservableSerialPort.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using System;
using System.Reactive.Linq;
using System.Reactive.Disposables;
using System.IO.Ports;
using System.Text.RegularExpressions;
using System.Threading;
using System.Reactive.Concurrency;
using System.Threading.Tasks;

namespace Bonsai.IO
{
Expand Down Expand Up @@ -47,96 +44,48 @@ public static string Unescape(string value)

public static IObservable<string> ReadLine(string portName, string newLine)
{
return Observable.Create<string>(observer =>
return Observable.Create<string>((observer, cancellationToken) =>
{
var data = string.Empty;
Action disposeAction = default;
var connection = SerialPortManager.ReserveConnection(portName);
SerialDataReceivedEventHandler dataReceivedHandler;
var serialPort = connection.SerialPort;
var baseStream = connection.SerialPort.BaseStream;
dataReceivedHandler = (sender, e) =>
return Task.Factory.StartNew(() =>
{
try
var data = string.Empty;
using var connection = SerialPortManager.ReserveConnection(portName);
using var cancellation = cancellationToken.Register(connection.Dispose);
var serialPort = connection.SerialPort;
while (!cancellationToken.IsCancellationRequested)
{
switch (e.EventType)
try
{
case SerialData.Eof: observer.OnCompleted(); break;
case SerialData.Chars:
default:
if (serialPort.IsOpen && serialPort.BytesToRead > 0)
{
data += serialPort.ReadExisting();
var lines = data.Split(new[] { newLine }, StringSplitOptions.None);
for (int i = 0; i < lines.Length; i++)
{
if (i == lines.Length - 1) data = lines[i];
else observer.OnNext(lines[i]);
}
}
break;
}
}
finally
{
// We need a volatile read here to prevent reordering of
// instructions on access to the shared dispose delegate
var dispose = Volatile.Read(ref disposeAction);
if (dispose != null)
{
// If we reach this branch, we might be in deadlock
// so we share the responsibility of disposing the
// serial port.
dispose();
Volatile.Write(ref disposeAction, null);
}
}
};
connection.SerialPort.DataReceived += dataReceivedHandler;
return Disposable.Create(() =>
{
connection.SerialPort.DataReceived -= dataReceivedHandler;
// Arm the dispose call. We do not need a memory barrier here
// since both threads are sharing full mutexes and stores
// will be eventually updated (we don't care exactly when)
disposeAction = connection.Dispose;
// We do an async spin lock until someone can dispose the serial port.
// Since the dispose call is idempotent it is enough to guarantee
// at-least-once semantics
void TryDispose()
{
// Same as above we need a volatile read here to prevent
// reordering of instructions
var dispose = Volatile.Read(ref disposeAction);
if (dispose == null) return;
var bytesToRead = serialPort.BytesToRead;
if (bytesToRead == 0)
{
var next = (char)serialPort.ReadChar();
data = string.Concat(data, next, serialPort.ReadExisting());
}
else data = string.Concat(data, serialPort.ReadExisting());
if (cancellationToken.IsCancellationRequested) break;
// The SerialPort class holds a lock on base stream to
// ensure synchronization between calls to Dispose and
// calls to DataReceived handler
if (Monitor.TryEnter(baseStream))
{
// If we enter the critical section we can go ahead and
// dispose the serial port
try
var lines = data.Split(new[] { newLine }, StringSplitOptions.None);
for (int i = 0; i < lines.Length; i++)
{
dispose();
Volatile.Write(ref disposeAction, null);
if (i == lines.Length - 1) data = lines[i];
else observer.OnNext(lines[i]);
}
finally { Monitor.Exit(baseStream); }
}
else
catch (Exception ex)
{
// If we reach this branch we may be in deadlock so we
// need to release this thread
DefaultScheduler.Instance.Schedule(TryDispose);
if (!cancellationToken.IsCancellationRequested)
{
observer.OnError(ex);
}
break;
}
}
// Run the spin lock
TryDispose();
});
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
});
}
}
Expand Down
141 changes: 0 additions & 141 deletions Bonsai.System/IO/Ports/PollingSerialPort.cs

This file was deleted.

53 changes: 17 additions & 36 deletions Bonsai.System/IO/Ports/SerialPortManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ internal static SerialPortDisposable ReserveConnection(string portName, SerialPo
else throw new ArgumentException("An alias or serial port name must be specified.", "portName");
}

Tuple<SerialPort, RefCountDisposable> connection;
lock (SyncRoot)
{
if (!openConnections.TryGetValue(portName, out connection))
if (!openConnections.TryGetValue(portName, out var connection))
{
var serialPortName = serialPortConfiguration.PortName;
if (string.IsNullOrEmpty(serialPortName)) serialPortName = portName;
Expand All @@ -45,48 +44,30 @@ internal static SerialPortDisposable ReserveConnection(string portName, SerialPo
}
#pragma warning restore CS0612 // Type or member is obsolete

SerialPort serialPort;
if (IsRunningOnMono)
var serialPort = new SerialPort(
serialPortName,
serialPortConfiguration.BaudRate,
serialPortConfiguration.Parity,
serialPortConfiguration.DataBits,
serialPortConfiguration.StopBits);
if (!IsRunningOnMono)
{
var pollingPort = new PollingSerialPort(
serialPortName,
serialPortConfiguration.BaudRate,
serialPortConfiguration.Parity,
serialPortConfiguration.DataBits,
serialPortConfiguration.StopBits);
serialPort = pollingPort;
ConfigureSerialPort(serialPort);
pollingPort.Open();
}
else
{
serialPort = new SerialPort(
serialPortName,
serialPortConfiguration.BaudRate,
serialPortConfiguration.Parity,
serialPortConfiguration.DataBits,
serialPortConfiguration.StopBits);
serialPort.ReceivedBytesThreshold = serialPortConfiguration.ReceivedBytesThreshold;
serialPort.ParityReplace = serialPortConfiguration.ParityReplace;
serialPort.DiscardNull = serialPortConfiguration.DiscardNull;
ConfigureSerialPort(serialPort);
serialPort.Open();
}
serialPort.ReadBufferSize = serialPortConfiguration.ReadBufferSize;
serialPort.WriteBufferSize = serialPortConfiguration.WriteBufferSize;
serialPort.Handshake = serialPortConfiguration.Handshake;
serialPort.DtrEnable = serialPortConfiguration.DtrEnable;
serialPort.RtsEnable = serialPortConfiguration.RtsEnable;

void ConfigureSerialPort(SerialPort serialPort)
var encoding = serialPortConfiguration.Encoding;
if (!string.IsNullOrEmpty(encoding))
{
serialPort.ReadBufferSize = serialPortConfiguration.ReadBufferSize;
serialPort.WriteBufferSize = serialPortConfiguration.WriteBufferSize;
serialPort.Handshake = serialPortConfiguration.Handshake;
serialPort.DtrEnable = serialPortConfiguration.DtrEnable;
serialPort.RtsEnable = serialPortConfiguration.RtsEnable;

var encoding = serialPortConfiguration.Encoding;
if (!string.IsNullOrEmpty(encoding))
{
serialPort.Encoding = Encoding.GetEncoding(encoding);
}
serialPort.Encoding = Encoding.GetEncoding(encoding);
}
serialPort.Open();

if (serialPort.BytesToRead > 0)
{
Expand Down
Loading