diff --git a/Bonsai.System/IO/Ports/ObservableSerialPort.cs b/Bonsai.System/IO/Ports/ObservableSerialPort.cs index fb5ba8bb..9451c6a1 100644 --- a/Bonsai.System/IO/Ports/ObservableSerialPort.cs +++ b/Bonsai.System/IO/Ports/ObservableSerialPort.cs @@ -1,10 +1,7 @@ using System; using System.Reactive.Linq; -using System.Reactive.Disposables; -using System.IO.Ports; using System.Text.RegularExpressions; -using System.Threading; -using System.Reactive.Concurrency; +using System.Threading.Tasks; namespace Bonsai.IO { @@ -47,96 +44,48 @@ public static string Unescape(string value) public static IObservable ReadLine(string portName, string newLine) { - return Observable.Create(observer => + return Observable.Create((observer, cancellationToken) => { - var data = string.Empty; - Action disposeAction = default; - var connection = SerialPortManager.ReserveConnection(portName); - SerialDataReceivedEventHandler dataReceivedHandler; - var serialPort = connection.SerialPort; - var baseStream = connection.SerialPort.BaseStream; - dataReceivedHandler = (sender, e) => + return Task.Factory.StartNew(() => { - try + var data = string.Empty; + using var connection = SerialPortManager.ReserveConnection(portName); + using var cancellation = cancellationToken.Register(connection.Dispose); + var serialPort = connection.SerialPort; + while (!cancellationToken.IsCancellationRequested) { - switch (e.EventType) + try { - case SerialData.Eof: observer.OnCompleted(); break; - case SerialData.Chars: - default: - if (serialPort.IsOpen && serialPort.BytesToRead > 0) - { - data += serialPort.ReadExisting(); - var lines = data.Split(new[] { newLine }, StringSplitOptions.None); - for (int i = 0; i < lines.Length; i++) - { - if (i == lines.Length - 1) data = lines[i]; - else observer.OnNext(lines[i]); - } - } - break; - } - } - finally - { - // We need a volatile read here to prevent reordering of - // instructions on access to the shared dispose delegate - var dispose = Volatile.Read(ref disposeAction); - if (dispose != null) - { - // If we reach this branch, we might be in deadlock - // so we share the responsibility of disposing the - // serial port. - dispose(); - Volatile.Write(ref disposeAction, null); - } - } - }; - connection.SerialPort.DataReceived += dataReceivedHandler; - return Disposable.Create(() => - { - connection.SerialPort.DataReceived -= dataReceivedHandler; - - // Arm the dispose call. We do not need a memory barrier here - // since both threads are sharing full mutexes and stores - // will be eventually updated (we don't care exactly when) - disposeAction = connection.Dispose; - - // We do an async spin lock until someone can dispose the serial port. - // Since the dispose call is idempotent it is enough to guarantee - // at-least-once semantics - void TryDispose() - { - // Same as above we need a volatile read here to prevent - // reordering of instructions - var dispose = Volatile.Read(ref disposeAction); - if (dispose == null) return; + var bytesToRead = serialPort.BytesToRead; + if (bytesToRead == 0) + { + var next = (char)serialPort.ReadChar(); + data = string.Concat(data, next, serialPort.ReadExisting()); + } + else data = string.Concat(data, serialPort.ReadExisting()); + if (cancellationToken.IsCancellationRequested) break; - // The SerialPort class holds a lock on base stream to - // ensure synchronization between calls to Dispose and - // calls to DataReceived handler - if (Monitor.TryEnter(baseStream)) - { - // If we enter the critical section we can go ahead and - // dispose the serial port - try + var lines = data.Split(new[] { newLine }, StringSplitOptions.None); + for (int i = 0; i < lines.Length; i++) { - dispose(); - Volatile.Write(ref disposeAction, null); + if (i == lines.Length - 1) data = lines[i]; + else observer.OnNext(lines[i]); } - finally { Monitor.Exit(baseStream); } } - else + catch (Exception ex) { - // If we reach this branch we may be in deadlock so we - // need to release this thread - DefaultScheduler.Instance.Schedule(TryDispose); + if (!cancellationToken.IsCancellationRequested) + { + observer.OnError(ex); + } + + break; } } - - // Run the spin lock - TryDispose(); - }); + }, + cancellationToken, + TaskCreationOptions.LongRunning, + TaskScheduler.Default); }); } } diff --git a/Bonsai.System/IO/Ports/PollingSerialPort.cs b/Bonsai.System/IO/Ports/PollingSerialPort.cs deleted file mode 100644 index c204d273..00000000 --- a/Bonsai.System/IO/Ports/PollingSerialPort.cs +++ /dev/null @@ -1,141 +0,0 @@ -using System; -using System.ComponentModel; -using System.IO; -using System.IO.Ports; -using System.Linq.Expressions; -using System.Reflection; -using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; - -namespace Bonsai.IO -{ - class PollingSerialPort : SerialPort - { - int fd; - object data_received; - static readonly Func newSerialDataReceivedEventArgs; - CancellationTokenSource cancellationTokenSource; - - static PollingSerialPort() - { - newSerialDataReceivedEventArgs = CreateNewSerialEventArgs(); - } - - public PollingSerialPort() - { - } - - public PollingSerialPort(IContainer container) - : base(container) - { - } - - public PollingSerialPort(string portName) - : base(portName) - { - } - - public PollingSerialPort(string portName, int baudRate) - : base(portName, baudRate) - { - } - - public PollingSerialPort(string portName, int baudRate, Parity parity) - : base(portName, baudRate, parity) - { - } - - public PollingSerialPort(string portName, int baudRate, Parity parity, int dataBits) - : base(portName, baudRate, parity, dataBits) - { - } - - public PollingSerialPort(string portName, int baudRate, Parity parity, int dataBits, StopBits stopBits) - : base(portName, baudRate, parity, dataBits, stopBits) - { - } - - static Func CreateNewSerialEventArgs() - { - var constructorInfo = typeof(TEventArgs).GetConstructor( - BindingFlags.Instance | BindingFlags.NonPublic, - binder: null, - new[] { typeof(TEventType) }, - modifiers: null); - var parameter = Expression.Parameter(typeof(TEventType)); - var body = Expression.New(constructorInfo, parameter); - var lambda = Expression.Lambda>(body, parameter); - return lambda.Compile(); - } - - public new void Open() - { - base.Open(); - var bindingFlags = BindingFlags.Instance | BindingFlags.NonPublic; - var fdField = BaseStream.GetType().GetField(nameof(fd), bindingFlags); - var dataReceivedField = typeof(SerialPort).GetField(nameof(data_received), bindingFlags); - data_received = dataReceivedField?.GetValue(this); - if (fdField != null) - { - fd = (int)fdField.GetValue(BaseStream); - } - - cancellationTokenSource = new CancellationTokenSource(); - var cancellationToken = cancellationTokenSource.Token; - Task.Factory.StartNew(() => - { - while (!cancellationToken.IsCancellationRequested) - { - if (PollSerialStream(ReadTimeout)) - { - OnDataReceived(newSerialDataReceivedEventArgs(SerialData.Chars)); - } - } - }, - cancellationToken, - TaskCreationOptions.LongRunning, - TaskScheduler.Default); - } - - private void OnDataReceived(SerialDataReceivedEventArgs e) - { - ((SerialDataReceivedEventHandler)Events[data_received])?.Invoke(this, e); - } - - private bool PollSerialStream(int timeout) - { - var result = poll_serial(fd, out int error, timeout); - if (error < 0) - { - ThrowIOException(); - } - - return result; - } - - protected override void Dispose(bool disposing) - { - if (disposing) - { - cancellationTokenSource.Dispose(); - } - - base.Dispose(disposing); - } - - [DllImport("MonoPosixHelper", SetLastError = true)] - static extern bool poll_serial(int fd, out int error, int timeout); - - [DllImport("libc")] - static extern IntPtr strerror(int errnum); - - static void ThrowIOException() - { - int errnum = Marshal.GetLastWin32Error(); - string error_message = Marshal.PtrToStringAnsi(strerror(errnum)); - - throw new IOException(error_message); - } - } -} diff --git a/Bonsai.System/IO/Ports/SerialPortManager.cs b/Bonsai.System/IO/Ports/SerialPortManager.cs index 539b0ebe..208ca2e1 100644 --- a/Bonsai.System/IO/Ports/SerialPortManager.cs +++ b/Bonsai.System/IO/Ports/SerialPortManager.cs @@ -29,10 +29,9 @@ internal static SerialPortDisposable ReserveConnection(string portName, SerialPo else throw new ArgumentException("An alias or serial port name must be specified.", "portName"); } - Tuple connection; lock (SyncRoot) { - if (!openConnections.TryGetValue(portName, out connection)) + if (!openConnections.TryGetValue(portName, out var connection)) { var serialPortName = serialPortConfiguration.PortName; if (string.IsNullOrEmpty(serialPortName)) serialPortName = portName; @@ -45,48 +44,30 @@ internal static SerialPortDisposable ReserveConnection(string portName, SerialPo } #pragma warning restore CS0612 // Type or member is obsolete - SerialPort serialPort; - if (IsRunningOnMono) + var serialPort = new SerialPort( + serialPortName, + serialPortConfiguration.BaudRate, + serialPortConfiguration.Parity, + serialPortConfiguration.DataBits, + serialPortConfiguration.StopBits); + if (!IsRunningOnMono) { - var pollingPort = new PollingSerialPort( - serialPortName, - serialPortConfiguration.BaudRate, - serialPortConfiguration.Parity, - serialPortConfiguration.DataBits, - serialPortConfiguration.StopBits); - serialPort = pollingPort; - ConfigureSerialPort(serialPort); - pollingPort.Open(); - } - else - { - serialPort = new SerialPort( - serialPortName, - serialPortConfiguration.BaudRate, - serialPortConfiguration.Parity, - serialPortConfiguration.DataBits, - serialPortConfiguration.StopBits); serialPort.ReceivedBytesThreshold = serialPortConfiguration.ReceivedBytesThreshold; serialPort.ParityReplace = serialPortConfiguration.ParityReplace; serialPort.DiscardNull = serialPortConfiguration.DiscardNull; - ConfigureSerialPort(serialPort); - serialPort.Open(); } + serialPort.ReadBufferSize = serialPortConfiguration.ReadBufferSize; + serialPort.WriteBufferSize = serialPortConfiguration.WriteBufferSize; + serialPort.Handshake = serialPortConfiguration.Handshake; + serialPort.DtrEnable = serialPortConfiguration.DtrEnable; + serialPort.RtsEnable = serialPortConfiguration.RtsEnable; - void ConfigureSerialPort(SerialPort serialPort) + var encoding = serialPortConfiguration.Encoding; + if (!string.IsNullOrEmpty(encoding)) { - serialPort.ReadBufferSize = serialPortConfiguration.ReadBufferSize; - serialPort.WriteBufferSize = serialPortConfiguration.WriteBufferSize; - serialPort.Handshake = serialPortConfiguration.Handshake; - serialPort.DtrEnable = serialPortConfiguration.DtrEnable; - serialPort.RtsEnable = serialPortConfiguration.RtsEnable; - - var encoding = serialPortConfiguration.Encoding; - if (!string.IsNullOrEmpty(encoding)) - { - serialPort.Encoding = Encoding.GetEncoding(encoding); - } + serialPort.Encoding = Encoding.GetEncoding(encoding); } + serialPort.Open(); if (serialPort.BytesToRead > 0) {