Skip to content

Commit

Permalink
Merge pull request #1444 from glopesdev/polling-serialport
Browse files Browse the repository at this point in the history
Refactor default serial port implementation to avoid event callbacks
  • Loading branch information
glopesdev committed Jul 1, 2023
2 parents f100b8c + a05eb36 commit f12bb6c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 260 deletions.
115 changes: 32 additions & 83 deletions Bonsai.System/IO/Ports/ObservableSerialPort.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using System;
using System.Reactive.Linq;
using System.Reactive.Disposables;
using System.IO.Ports;
using System.Text.RegularExpressions;
using System.Threading;
using System.Reactive.Concurrency;
using System.Threading.Tasks;

namespace Bonsai.IO
{
Expand Down Expand Up @@ -47,96 +44,48 @@ public static string Unescape(string value)

public static IObservable<string> ReadLine(string portName, string newLine)
{
return Observable.Create<string>(observer =>
return Observable.Create<string>((observer, cancellationToken) =>
{
var data = string.Empty;
Action disposeAction = default;
var connection = SerialPortManager.ReserveConnection(portName);
SerialDataReceivedEventHandler dataReceivedHandler;
var serialPort = connection.SerialPort;
var baseStream = connection.SerialPort.BaseStream;
dataReceivedHandler = (sender, e) =>
return Task.Factory.StartNew(() =>
{
try
var data = string.Empty;
using var connection = SerialPortManager.ReserveConnection(portName);
using var cancellation = cancellationToken.Register(connection.Dispose);
var serialPort = connection.SerialPort;
while (!cancellationToken.IsCancellationRequested)
{
switch (e.EventType)
try
{
case SerialData.Eof: observer.OnCompleted(); break;
case SerialData.Chars:
default:
if (serialPort.IsOpen && serialPort.BytesToRead > 0)
{
data += serialPort.ReadExisting();
var lines = data.Split(new[] { newLine }, StringSplitOptions.None);
for (int i = 0; i < lines.Length; i++)
{
if (i == lines.Length - 1) data = lines[i];
else observer.OnNext(lines[i]);
}
}
break;
}
}
finally
{
// We need a volatile read here to prevent reordering of
// instructions on access to the shared dispose delegate
var dispose = Volatile.Read(ref disposeAction);
if (dispose != null)
{
// If we reach this branch, we might be in deadlock
// so we share the responsibility of disposing the
// serial port.
dispose();
Volatile.Write(ref disposeAction, null);
}
}
};
connection.SerialPort.DataReceived += dataReceivedHandler;
return Disposable.Create(() =>
{
connection.SerialPort.DataReceived -= dataReceivedHandler;
// Arm the dispose call. We do not need a memory barrier here
// since both threads are sharing full mutexes and stores
// will be eventually updated (we don't care exactly when)
disposeAction = connection.Dispose;
// We do an async spin lock until someone can dispose the serial port.
// Since the dispose call is idempotent it is enough to guarantee
// at-least-once semantics
void TryDispose()
{
// Same as above we need a volatile read here to prevent
// reordering of instructions
var dispose = Volatile.Read(ref disposeAction);
if (dispose == null) return;
var bytesToRead = serialPort.BytesToRead;
if (bytesToRead == 0)
{
var next = (char)serialPort.ReadChar();
data = string.Concat(data, next, serialPort.ReadExisting());
}
else data = string.Concat(data, serialPort.ReadExisting());
if (cancellationToken.IsCancellationRequested) break;
// The SerialPort class holds a lock on base stream to
// ensure synchronization between calls to Dispose and
// calls to DataReceived handler
if (Monitor.TryEnter(baseStream))
{
// If we enter the critical section we can go ahead and
// dispose the serial port
try
var lines = data.Split(new[] { newLine }, StringSplitOptions.None);
for (int i = 0; i < lines.Length; i++)
{
dispose();
Volatile.Write(ref disposeAction, null);
if (i == lines.Length - 1) data = lines[i];
else observer.OnNext(lines[i]);
}
finally { Monitor.Exit(baseStream); }
}
else
catch (Exception ex)
{
// If we reach this branch we may be in deadlock so we
// need to release this thread
DefaultScheduler.Instance.Schedule(TryDispose);
if (!cancellationToken.IsCancellationRequested)
{
observer.OnError(ex);
}
break;
}
}
// Run the spin lock
TryDispose();
});
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
});
}
}
Expand Down
141 changes: 0 additions & 141 deletions Bonsai.System/IO/Ports/PollingSerialPort.cs

This file was deleted.

53 changes: 17 additions & 36 deletions Bonsai.System/IO/Ports/SerialPortManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ internal static SerialPortDisposable ReserveConnection(string portName, SerialPo
else throw new ArgumentException("An alias or serial port name must be specified.", "portName");
}

Tuple<SerialPort, RefCountDisposable> connection;
lock (SyncRoot)
{
if (!openConnections.TryGetValue(portName, out connection))
if (!openConnections.TryGetValue(portName, out var connection))
{
var serialPortName = serialPortConfiguration.PortName;
if (string.IsNullOrEmpty(serialPortName)) serialPortName = portName;
Expand All @@ -45,48 +44,30 @@ internal static SerialPortDisposable ReserveConnection(string portName, SerialPo
}
#pragma warning restore CS0612 // Type or member is obsolete

SerialPort serialPort;
if (IsRunningOnMono)
var serialPort = new SerialPort(
serialPortName,
serialPortConfiguration.BaudRate,
serialPortConfiguration.Parity,
serialPortConfiguration.DataBits,
serialPortConfiguration.StopBits);
if (!IsRunningOnMono)
{
var pollingPort = new PollingSerialPort(
serialPortName,
serialPortConfiguration.BaudRate,
serialPortConfiguration.Parity,
serialPortConfiguration.DataBits,
serialPortConfiguration.StopBits);
serialPort = pollingPort;
ConfigureSerialPort(serialPort);
pollingPort.Open();
}
else
{
serialPort = new SerialPort(
serialPortName,
serialPortConfiguration.BaudRate,
serialPortConfiguration.Parity,
serialPortConfiguration.DataBits,
serialPortConfiguration.StopBits);
serialPort.ReceivedBytesThreshold = serialPortConfiguration.ReceivedBytesThreshold;
serialPort.ParityReplace = serialPortConfiguration.ParityReplace;
serialPort.DiscardNull = serialPortConfiguration.DiscardNull;
ConfigureSerialPort(serialPort);
serialPort.Open();
}
serialPort.ReadBufferSize = serialPortConfiguration.ReadBufferSize;
serialPort.WriteBufferSize = serialPortConfiguration.WriteBufferSize;
serialPort.Handshake = serialPortConfiguration.Handshake;
serialPort.DtrEnable = serialPortConfiguration.DtrEnable;
serialPort.RtsEnable = serialPortConfiguration.RtsEnable;

void ConfigureSerialPort(SerialPort serialPort)
var encoding = serialPortConfiguration.Encoding;
if (!string.IsNullOrEmpty(encoding))
{
serialPort.ReadBufferSize = serialPortConfiguration.ReadBufferSize;
serialPort.WriteBufferSize = serialPortConfiguration.WriteBufferSize;
serialPort.Handshake = serialPortConfiguration.Handshake;
serialPort.DtrEnable = serialPortConfiguration.DtrEnable;
serialPort.RtsEnable = serialPortConfiguration.RtsEnable;

var encoding = serialPortConfiguration.Encoding;
if (!string.IsNullOrEmpty(encoding))
{
serialPort.Encoding = Encoding.GetEncoding(encoding);
}
serialPort.Encoding = Encoding.GetEncoding(encoding);
}
serialPort.Open();

if (serialPort.BytesToRead > 0)
{
Expand Down

0 comments on commit f12bb6c

Please sign in to comment.