Skip to content
65 changes: 65 additions & 0 deletions PHASE2_STEP1_EXAMPLE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Phase 2 Steps 1-2 Complete: Message Producer with Threading

## What Was Added

✅ **IMessageProducer<T>** interface with lifecycle management
✅ **MessageProducer<T>** with background threading (Step 2)
✅ **DaqifiDevice** updated to optionally use message producer
✅ **Comprehensive tests** including threading validation
✅ **Backward compatibility** maintained
✅ **Cross-platform** implementation (no Windows dependencies)

## Usage Example

### Before (Desktop only):
```csharp
// Desktop had to manage its own MessageProducer
var stream = new TcpClient().GetStream();
var producer = new Daqifi.Desktop.IO.Messages.Producers.MessageProducer(stream);
producer.Start();
producer.Send(Daqifi.Core.Communication.Producers.ScpiMessageProducer.GetDeviceInfo);
```

### After (Using Core):
```csharp
// Core now provides the message producer
using var stream = new TcpClient().GetStream();
using var device = new DaqifiDevice("My Device", stream, IPAddress.Parse("192.168.1.100"));

device.Connect(); // Automatically starts message producer
device.Send(ScpiMessageProducer.GetDeviceInfo); // Uses Core's thread-safe producer
// device.Disconnect(); // Automatically stops message producer safely
```

## Testing the Implementation

All tests pass (59/59) including:
- Message producer lifecycle management
- Background thread processing and lifecycle
- Thread-safe message queuing with asynchronous processing
- Device integration with message producer
- Error handling and validation
- Backward compatibility scenarios

## Current State vs Desktop

**Desktop MessageProducer**: Windows-specific, string-only, Thread + ConcurrentQueue
**Core MessageProducer<T>**: Cross-platform, generic, Thread + ConcurrentQueue
**Functionality**: ✅ **Identical** - Core now matches desktop's threading behavior

## Next Steps

**Step 3**: Add transport abstraction (TCP/UDP/Serial interfaces)
**Step 4**: Device discovery framework

## Desktop Integration Path

The desktop can now:
1. **Gradually adopt** Core's message producer by using the new DaqifiDevice constructor
2. **Keep existing code working** - no breaking changes
3. **Test side-by-side** - old and new implementations can coexist
4. **Migrate incrementally** - device by device, connection by connection

This completes Steps 1-2 of Phase 2 migration! 🎉

**Core MessageProducer is now functionally equivalent to Desktop's implementation** but cross-platform and generic.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
using Daqifi.Core.Communication.Consumers;
using System.Text;

namespace Daqifi.Core.Tests.Communication.Consumers;

public class LineBasedMessageParserTests
{
[Fact]
public void LineBasedMessageParser_ParseMessages_WithSingleLine_ShouldReturnOneMessage()
{
// Arrange
var parser = new LineBasedMessageParser();
var data = Encoding.UTF8.GetBytes("Hello World\r\n");

// Act
var messages = parser.ParseMessages(data, out var consumedBytes);

// Assert
Assert.Single(messages);
Assert.Equal("Hello World", messages.First().Data);
Assert.Equal(data.Length, consumedBytes);
}

[Fact]
public void LineBasedMessageParser_ParseMessages_WithMultipleLines_ShouldReturnMultipleMessages()
{
// Arrange
var parser = new LineBasedMessageParser();
var data = Encoding.UTF8.GetBytes("Line 1\r\nLine 2\r\nLine 3\r\n");

// Act
var messages = parser.ParseMessages(data, out var consumedBytes);

// Assert
Assert.Equal(3, messages.Count());
Assert.Equal("Line 1", messages.ElementAt(0).Data);
Assert.Equal("Line 2", messages.ElementAt(1).Data);
Assert.Equal("Line 3", messages.ElementAt(2).Data);
Assert.Equal(data.Length, consumedBytes);
}

[Fact]
public void LineBasedMessageParser_ParseMessages_WithIncompleteMessage_ShouldNotConsumeIncomplete()
{
// Arrange
var parser = new LineBasedMessageParser();
var data = Encoding.UTF8.GetBytes("Complete Line\r\nIncomplete");

// Act
var messages = parser.ParseMessages(data, out var consumedBytes);

// Assert
Assert.Single(messages);
Assert.Equal("Complete Line", messages.First().Data);
Assert.Equal(15, consumedBytes); // "Complete Line\r\n" length
}

[Fact]
public void LineBasedMessageParser_ParseMessages_WithEmptyLines_ShouldIgnoreEmpty()
{
// Arrange
var parser = new LineBasedMessageParser();
var data = Encoding.UTF8.GetBytes("Line 1\r\n\r\nLine 2\r\n");

// Act
var messages = parser.ParseMessages(data, out var consumedBytes);

// Assert
Assert.Equal(2, messages.Count());
Assert.Equal("Line 1", messages.ElementAt(0).Data);
Assert.Equal("Line 2", messages.ElementAt(1).Data);
}

[Fact]
public void LineBasedMessageParser_ParseMessages_WithCustomLineEnding_ShouldWork()
{
// Arrange
var parser = new LineBasedMessageParser("\n"); // LF only
var data = Encoding.UTF8.GetBytes("Line 1\nLine 2\n");

// Act
var messages = parser.ParseMessages(data, out var consumedBytes);

// Assert
Assert.Equal(2, messages.Count());
Assert.Equal("Line 1", messages.ElementAt(0).Data);
Assert.Equal("Line 2", messages.ElementAt(1).Data);
}

[Fact]
public void LineBasedMessageParser_ParseMessages_WithNoData_ShouldReturnEmpty()
{
// Arrange
var parser = new LineBasedMessageParser();
var data = new byte[0];

// Act
var messages = parser.ParseMessages(data, out var consumedBytes);

// Assert
Assert.Empty(messages);
Assert.Equal(0, consumedBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
using Daqifi.Core.Communication.Consumers;
using System.Text;

namespace Daqifi.Core.Tests.Communication.Consumers;

public class StreamMessageConsumerTests
{
[Fact]
public void StreamMessageConsumer_Constructor_ShouldInitializeCorrectly()
{
// Arrange
using var stream = new MemoryStream();
var parser = new LineBasedMessageParser();

// Act
using var consumer = new StreamMessageConsumer<string>(stream, parser);

// Assert
Assert.False(consumer.IsRunning);
Assert.Equal(0, consumer.QueuedMessageCount);
}

[Fact]
public void StreamMessageConsumer_Start_ShouldSetRunningState()
{
// Arrange
using var stream = new MemoryStream();
var parser = new LineBasedMessageParser();
using var consumer = new StreamMessageConsumer<string>(stream, parser);

// Act
consumer.Start();

// Assert
Assert.True(consumer.IsRunning);

consumer.Stop();
}

[Fact]
public void StreamMessageConsumer_Stop_ShouldClearRunningState()
{
// Arrange
using var stream = new MemoryStream();
var parser = new LineBasedMessageParser();
using var consumer = new StreamMessageConsumer<string>(stream, parser);
consumer.Start();

// Act
consumer.Stop();

// Assert
Assert.False(consumer.IsRunning);
}

[Fact]
public void StreamMessageConsumer_MessageReceived_ShouldFireForValidMessages()
{
// Arrange
var testData = Encoding.UTF8.GetBytes("Test Message\r\n");
using var stream = new MemoryStream(testData);
var parser = new LineBasedMessageParser();
using var consumer = new StreamMessageConsumer<string>(stream, parser);

string? receivedMessage = null;
consumer.MessageReceived += (sender, args) => receivedMessage = args.Message.Data;

// Act
consumer.Start();
Thread.Sleep(200); // Give time for processing
consumer.Stop();

// Assert
Assert.Equal("Test Message", receivedMessage);
}

[Fact]
public void StreamMessageConsumer_MultipleMessages_ShouldFireMultipleEvents()
{
// Arrange
var testData = Encoding.UTF8.GetBytes("Message 1\r\nMessage 2\r\nMessage 3\r\n");
using var stream = new MemoryStream(testData);
var parser = new LineBasedMessageParser();
using var consumer = new StreamMessageConsumer<string>(stream, parser);

var receivedMessages = new List<string>();
consumer.MessageReceived += (sender, args) => receivedMessages.Add(args.Message.Data);

// Act
consumer.Start();
Thread.Sleep(300); // Give time for processing
consumer.Stop();

// Assert
Assert.Equal(3, receivedMessages.Count);
Assert.Contains("Message 1", receivedMessages);
Assert.Contains("Message 2", receivedMessages);
Assert.Contains("Message 3", receivedMessages);
}

[Fact]
public void StreamMessageConsumer_ErrorHandling_ShouldFireErrorEvent()
{
// Arrange - Create a stream that will throw when read
var errorStream = new ErrorThrowingStream();
var parser = new LineBasedMessageParser();
using var consumer = new StreamMessageConsumer<string>(errorStream, parser);

Exception? capturedError = null;
var errorReceived = false;
consumer.ErrorOccurred += (sender, args) =>
{
capturedError = args.Error;
errorReceived = true;
};

// Act
consumer.Start();

// Wait for error with timeout
var timeout = DateTime.UtcNow.AddMilliseconds(500);
while (!errorReceived && DateTime.UtcNow < timeout)
{
Thread.Sleep(10);
}

consumer.Stop();

// Assert
Assert.True(errorReceived, "Error event should have been fired");
Assert.NotNull(capturedError);
Assert.IsType<InvalidOperationException>(capturedError);
}

[Fact]
public void StreamMessageConsumer_StopSafely_ShouldReturnTrue()
{
// Arrange
using var stream = new MemoryStream();
var parser = new LineBasedMessageParser();
using var consumer = new StreamMessageConsumer<string>(stream, parser);
consumer.Start();

// Act
var result = consumer.StopSafely();

// Assert
Assert.True(result);
Assert.False(consumer.IsRunning);
}

[Fact]
public void StreamMessageConsumer_Dispose_ShouldCleanupResources()
{
// Arrange
using var stream = new MemoryStream();
var parser = new LineBasedMessageParser();
var consumer = new StreamMessageConsumer<string>(stream, parser);
consumer.Start();

// Act
consumer.Dispose();

// Assert
Assert.False(consumer.IsRunning);
Assert.Throws<ObjectDisposedException>(() => consumer.Start());
}

// Helper class for testing error scenarios
private class ErrorThrowingStream : Stream
{
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }

public override void Flush() { }

public override int Read(byte[] buffer, int offset, int count)
{
throw new InvalidOperationException("Test error for error handling");
}

public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
}
Loading