1+ using Daqifi . Core . Communication . Consumers ;
2+ using System . Text ;
3+
4+ namespace Daqifi . Core . Tests . Communication . Consumers ;
5+
6+ public class StreamMessageConsumerTests
7+ {
8+ [ Fact ]
9+ public void StreamMessageConsumer_Constructor_ShouldInitializeCorrectly ( )
10+ {
11+ // Arrange
12+ using var stream = new MemoryStream ( ) ;
13+ var parser = new LineBasedMessageParser ( ) ;
14+
15+ // Act
16+ using var consumer = new StreamMessageConsumer < string > ( stream , parser ) ;
17+
18+ // Assert
19+ Assert . False ( consumer . IsRunning ) ;
20+ Assert . Equal ( 0 , consumer . QueuedMessageCount ) ;
21+ }
22+
23+ [ Fact ]
24+ public void StreamMessageConsumer_Start_ShouldSetRunningState ( )
25+ {
26+ // Arrange
27+ using var stream = new MemoryStream ( ) ;
28+ var parser = new LineBasedMessageParser ( ) ;
29+ using var consumer = new StreamMessageConsumer < string > ( stream , parser ) ;
30+
31+ // Act
32+ consumer . Start ( ) ;
33+
34+ // Assert
35+ Assert . True ( consumer . IsRunning ) ;
36+
37+ consumer . Stop ( ) ;
38+ }
39+
40+ [ Fact ]
41+ public void StreamMessageConsumer_Stop_ShouldClearRunningState ( )
42+ {
43+ // Arrange
44+ using var stream = new MemoryStream ( ) ;
45+ var parser = new LineBasedMessageParser ( ) ;
46+ using var consumer = new StreamMessageConsumer < string > ( stream , parser ) ;
47+ consumer . Start ( ) ;
48+
49+ // Act
50+ consumer . Stop ( ) ;
51+
52+ // Assert
53+ Assert . False ( consumer . IsRunning ) ;
54+ }
55+
56+ [ Fact ]
57+ public void StreamMessageConsumer_MessageReceived_ShouldFireForValidMessages ( )
58+ {
59+ // Arrange
60+ var testData = Encoding . UTF8 . GetBytes ( "Test Message\r \n " ) ;
61+ using var stream = new MemoryStream ( testData ) ;
62+ var parser = new LineBasedMessageParser ( ) ;
63+ using var consumer = new StreamMessageConsumer < string > ( stream , parser ) ;
64+
65+ string ? receivedMessage = null ;
66+ consumer . MessageReceived += ( sender , args ) => receivedMessage = args . Message . Data ;
67+
68+ // Act
69+ consumer . Start ( ) ;
70+ Thread . Sleep ( 200 ) ; // Give time for processing
71+ consumer . Stop ( ) ;
72+
73+ // Assert
74+ Assert . Equal ( "Test Message" , receivedMessage ) ;
75+ }
76+
77+ [ Fact ]
78+ public void StreamMessageConsumer_MultipleMessages_ShouldFireMultipleEvents ( )
79+ {
80+ // Arrange
81+ var testData = Encoding . UTF8 . GetBytes ( "Message 1\r \n Message 2\r \n Message 3\r \n " ) ;
82+ using var stream = new MemoryStream ( testData ) ;
83+ var parser = new LineBasedMessageParser ( ) ;
84+ using var consumer = new StreamMessageConsumer < string > ( stream , parser ) ;
85+
86+ var receivedMessages = new List < string > ( ) ;
87+ consumer . MessageReceived += ( sender , args ) => receivedMessages . Add ( args . Message . Data ) ;
88+
89+ // Act
90+ consumer . Start ( ) ;
91+ Thread . Sleep ( 300 ) ; // Give time for processing
92+ consumer . Stop ( ) ;
93+
94+ // Assert
95+ Assert . Equal ( 3 , receivedMessages . Count ) ;
96+ Assert . Contains ( "Message 1" , receivedMessages ) ;
97+ Assert . Contains ( "Message 2" , receivedMessages ) ;
98+ Assert . Contains ( "Message 3" , receivedMessages ) ;
99+ }
100+
101+ [ Fact ]
102+ public void StreamMessageConsumer_ErrorHandling_ShouldFireErrorEvent ( )
103+ {
104+ // Arrange - Create a stream that will throw when read
105+ var errorStream = new ErrorThrowingStream ( ) ;
106+ var parser = new LineBasedMessageParser ( ) ;
107+ using var consumer = new StreamMessageConsumer < string > ( errorStream , parser ) ;
108+
109+ Exception ? capturedError = null ;
110+ var errorReceived = false ;
111+ consumer . ErrorOccurred += ( sender , args ) =>
112+ {
113+ capturedError = args . Error ;
114+ errorReceived = true ;
115+ } ;
116+
117+ // Act
118+ consumer . Start ( ) ;
119+
120+ // Wait for error with timeout
121+ var timeout = DateTime . UtcNow . AddMilliseconds ( 500 ) ;
122+ while ( ! errorReceived && DateTime . UtcNow < timeout )
123+ {
124+ Thread . Sleep ( 10 ) ;
125+ }
126+
127+ consumer . Stop ( ) ;
128+
129+ // Assert
130+ Assert . True ( errorReceived , "Error event should have been fired" ) ;
131+ Assert . NotNull ( capturedError ) ;
132+ Assert . IsType < InvalidOperationException > ( capturedError ) ;
133+ }
134+
135+ [ Fact ]
136+ public void StreamMessageConsumer_StopSafely_ShouldReturnTrue ( )
137+ {
138+ // Arrange
139+ using var stream = new MemoryStream ( ) ;
140+ var parser = new LineBasedMessageParser ( ) ;
141+ using var consumer = new StreamMessageConsumer < string > ( stream , parser ) ;
142+ consumer . Start ( ) ;
143+
144+ // Act
145+ var result = consumer . StopSafely ( ) ;
146+
147+ // Assert
148+ Assert . True ( result ) ;
149+ Assert . False ( consumer . IsRunning ) ;
150+ }
151+
152+ [ Fact ]
153+ public void StreamMessageConsumer_Dispose_ShouldCleanupResources ( )
154+ {
155+ // Arrange
156+ using var stream = new MemoryStream ( ) ;
157+ var parser = new LineBasedMessageParser ( ) ;
158+ var consumer = new StreamMessageConsumer < string > ( stream , parser ) ;
159+ consumer . Start ( ) ;
160+
161+ // Act
162+ consumer . Dispose ( ) ;
163+
164+ // Assert
165+ Assert . False ( consumer . IsRunning ) ;
166+ Assert . Throws < ObjectDisposedException > ( ( ) => consumer . Start ( ) ) ;
167+ }
168+
169+ // Helper class for testing error scenarios
170+ private class ErrorThrowingStream : Stream
171+ {
172+ public override bool CanRead => true ;
173+ public override bool CanSeek => false ;
174+ public override bool CanWrite => false ;
175+ public override long Length => throw new NotSupportedException ( ) ;
176+ public override long Position { get => throw new NotSupportedException ( ) ; set => throw new NotSupportedException ( ) ; }
177+
178+ public override void Flush ( ) { }
179+
180+ public override int Read ( byte [ ] buffer , int offset , int count )
181+ {
182+ throw new InvalidOperationException ( "Test error for error handling" ) ;
183+ }
184+
185+ public override long Seek ( long offset , SeekOrigin origin ) => throw new NotSupportedException ( ) ;
186+ public override void SetLength ( long value ) => throw new NotSupportedException ( ) ;
187+ public override void Write ( byte [ ] buffer , int offset , int count ) => throw new NotSupportedException ( ) ;
188+ }
189+ }
0 commit comments