diff --git a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs index 10b97e15d7cc0..1fb04dfab99c2 100644 --- a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs +++ b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs @@ -6,15 +6,22 @@ namespace System.Net.ServerSentEvents { + public static partial class SseFormatter + { + public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable> source, System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable> source, System.IO.Stream destination, System.Action, System.Buffers.IBufferWriter> itemFormatter, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } public delegate T SseItemParser(string eventType, System.ReadOnlySpan data); public readonly partial struct SseItem { private readonly T _Data_k__BackingField; private readonly object _dummy; private readonly int _dummyPrimitive; - public SseItem(T data, string? eventType) { throw null; } + public SseItem(T data, string? eventType = null) { throw null; } public T Data { get { throw null; } } + public string? EventId { get { throw null; } init { } } public string EventType { get { throw null; } } + public System.TimeSpan? ReconnectionInterval { get { throw null; } init { } } } public static partial class SseParser { diff --git a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj index 50bc340a7191a..bb3a652626696 100644 --- a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj +++ b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj @@ -8,6 +8,10 @@ + + + + diff --git a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx index a0ea42d131d14..e414cea32c071 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx +++ b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx @@ -120,4 +120,10 @@ The enumerable may be enumerated only once. + + The argument cannot contain line breaks. + + + The argument cannot be a negative value. + diff --git a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj index 65c1959305695..7ca797752fa43 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj +++ b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj @@ -1,4 +1,4 @@ - + $(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) @@ -11,10 +11,19 @@ System.Net.ServerSentEvents.SseParser + + + + + + + + + diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs new file mode 100644 index 0000000000000..4639c84cd3ded --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs @@ -0,0 +1,100 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Diagnostics; +using System.Globalization; +using System.IO; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.ServerSentEvents +{ + internal static class Helpers + { + public static void WriteUtf8Number(this IBufferWriter writer, long value) + { +#if NET + const int MaxDecimalDigits = 20; + Span buffer = writer.GetSpan(MaxDecimalDigits); + Debug.Assert(MaxDecimalDigits <= buffer.Length); + + bool success = value.TryFormat(buffer, out int bytesWritten, provider: CultureInfo.InvariantCulture); + Debug.Assert(success); + writer.Advance(bytesWritten); +#else + writer.WriteUtf8String(value.ToString(CultureInfo.InvariantCulture)); +#endif + } + + public static void WriteUtf8String(this IBufferWriter writer, ReadOnlySpan value) + { + if (value.IsEmpty) + { + return; + } + + Span buffer = writer.GetSpan(value.Length); + Debug.Assert(value.Length <= buffer.Length); + value.CopyTo(buffer); + writer.Advance(value.Length); + } + + public static unsafe void WriteUtf8String(this IBufferWriter writer, ReadOnlySpan value) + { + if (value.IsEmpty) + { + return; + } + + int maxByteCount = Encoding.UTF8.GetMaxByteCount(value.Length); + Span buffer = writer.GetSpan(maxByteCount); + Debug.Assert(maxByteCount <= buffer.Length); + int bytesWritten; +#if NET + bytesWritten = Encoding.UTF8.GetBytes(value, buffer); +#else + fixed (char* chars = value) + fixed (byte* bytes = buffer) + { + bytesWritten = Encoding.UTF8.GetBytes(chars, value.Length, bytes, maxByteCount); + } +#endif + writer.Advance(bytesWritten); + } + + public static bool ContainsLineBreaks(this ReadOnlySpan text) => + text.IndexOfAny('\r', '\n') >= 0; + +#if !NET + + public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + if (MemoryMarshal.TryGetArray(buffer, out ArraySegment segment)) + { + return new ValueTask(stream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken)); + } + else + { + return WriteAsyncUsingPooledBuffer(stream, buffer, cancellationToken); + + static async ValueTask WriteAsyncUsingPooledBuffer(Stream stream, ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + byte[] sharedBuffer = ArrayPool.Shared.Rent(buffer.Length); + buffer.Span.CopyTo(sharedBuffer); + try + { + await stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); + } + finally + { + ArrayPool.Shared.Return(sharedBuffer); + } + } + } + } +#endif + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs new file mode 100644 index 0000000000000..81e5070b765d0 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs @@ -0,0 +1,33 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Diagnostics; + +namespace System.Net.ServerSentEvents +{ + internal sealed class PooledByteBufferWriter : IBufferWriter, IDisposable + { + private ArrayBuffer _buffer = new(initialSize: 256, usePool: true); + + public void Advance(int count) => _buffer.Commit(count); + + public Memory GetMemory(int sizeHint = 0) + { + _buffer.EnsureAvailableSpace(sizeHint); + return _buffer.AvailableMemory; + } + + public Span GetSpan(int sizeHint = 0) + { + _buffer.EnsureAvailableSpace(sizeHint); + return _buffer.AvailableSpan; + } + + public ReadOnlyMemory WrittenMemory => _buffer.ActiveMemory; + public int Capacity => _buffer.Capacity; + public int WrittenCount => _buffer.ActiveLength; + public void Reset() => _buffer.Discard(_buffer.ActiveLength); + public void Dispose() => _buffer.Dispose(); + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs new file mode 100644 index 0000000000000..3b9c950f4594e --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs @@ -0,0 +1,166 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.ServerSentEvents +{ + /// + /// Provides methods for formatting server-sent events. + /// + public static class SseFormatter + { + private static readonly byte[] s_newLine = "\n"u8.ToArray(); + + /// + /// Writes the of server-sent events to the stream. + /// + /// The events to write to the stream. + /// The destination stream to write the events. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + public static Task WriteAsync(IAsyncEnumerable> source, Stream destination, CancellationToken cancellationToken = default) + { + if (source is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(source)); + } + + if (destination is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(destination)); + } + + return WriteAsyncCore(source, destination, static (item, writer) => writer.WriteUtf8String(item.Data), cancellationToken); + } + + /// + /// Writes the of server-sent events to the stream. + /// + /// The data type of the event. + /// The events to write to the stream. + /// The destination stream to write the events. + /// The formatter for the data field of given event. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + public static Task WriteAsync(IAsyncEnumerable> source, Stream destination, Action, IBufferWriter> itemFormatter, CancellationToken cancellationToken = default) + { + if (source is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(source)); + } + + if (destination is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(destination)); + } + + if (itemFormatter is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(itemFormatter)); + } + + return WriteAsyncCore(source, destination, itemFormatter, cancellationToken); + } + + private static async Task WriteAsyncCore(IAsyncEnumerable> source, Stream destination, Action, IBufferWriter> itemFormatter, CancellationToken cancellationToken) + { + using PooledByteBufferWriter bufferWriter = new(); + using PooledByteBufferWriter userDataBufferWriter = new(); + + await foreach (SseItem item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) + { + itemFormatter(item, userDataBufferWriter); + + FormatSseEvent( + bufferWriter, + eventType: item._eventType, // Do not use the public property since it normalizes to "message" if null + data: userDataBufferWriter.WrittenMemory.Span, + eventId: item.EventId, + reconnectionInterval: item.ReconnectionInterval); + + await destination.WriteAsync(bufferWriter.WrittenMemory, cancellationToken).ConfigureAwait(false); + + userDataBufferWriter.Reset(); + bufferWriter.Reset(); + } + } + + private static void FormatSseEvent( + PooledByteBufferWriter bufferWriter, + string? eventType, + ReadOnlySpan data, + string? eventId, + TimeSpan? reconnectionInterval) + { + Debug.Assert(bufferWriter.WrittenCount is 0); + + if (eventType is not null) + { + Debug.Assert(!eventType.ContainsLineBreaks()); + + bufferWriter.WriteUtf8String("event: "u8); + bufferWriter.WriteUtf8String(eventType); + bufferWriter.WriteUtf8String(s_newLine); + } + + WriteLinesWithPrefix(bufferWriter, prefix: "data: "u8, data); + bufferWriter.Write(s_newLine); + + if (eventId is not null) + { + Debug.Assert(!eventId.ContainsLineBreaks()); + + bufferWriter.WriteUtf8String("id: "u8); + bufferWriter.WriteUtf8String(eventId); + bufferWriter.WriteUtf8String(s_newLine); + } + + if (reconnectionInterval is { } retry) + { + Debug.Assert(retry >= TimeSpan.Zero); + + bufferWriter.WriteUtf8String("retry: "u8); + bufferWriter.WriteUtf8Number((long)retry.TotalMilliseconds); + bufferWriter.WriteUtf8String(s_newLine); + } + + bufferWriter.WriteUtf8String(s_newLine); + } + + private static void WriteLinesWithPrefix(PooledByteBufferWriter writer, ReadOnlySpan prefix, ReadOnlySpan data) + { + // Writes a potentially multi-line string, prefixing each line with the given prefix. + // Both \n and \r\n sequences are normalized to \n. + + while (true) + { + writer.WriteUtf8String(prefix); + + int i = data.IndexOfAny((byte)'\r', (byte)'\n'); + if (i < 0) + { + writer.WriteUtf8String(data); + return; + } + + int lineLength = i; + if (data[i++] == '\r' && i < data.Length && data[i] == '\n') + { + i++; + } + + ReadOnlySpan nextLine = data.Slice(0, lineLength); + data = data.Slice(i); + + writer.WriteUtf8String(nextLine); + writer.WriteUtf8String(s_newLine); + } + } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs index c4f966d62b779..b73e4aef46e5f 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs @@ -9,12 +9,22 @@ public readonly struct SseItem { /// The event's type. internal readonly string? _eventType; + /// The event's id. + private readonly string? _eventId; + /// The event's reconnection interval. + private readonly TimeSpan? _reconnectionInterval; /// Initializes the server-sent event. /// The event's payload. /// The event's type. - public SseItem(T data, string? eventType) + /// Thrown when contains a line break. + public SseItem(T data, string? eventType = null) { + if (eventType?.ContainsLineBreaks() is true) + { + ThrowHelper.ThrowArgumentException_CannotContainLineBreaks(nameof(eventType)); + } + Data = data; _eventType = eventType; } @@ -24,5 +34,39 @@ public SseItem(T data, string? eventType) /// Gets the event's type. public string EventType => _eventType ?? SseParser.EventTypeDefault; + + /// Gets the event's id. + /// Thrown when the value contains a line break. + public string? EventId + { + get => _eventId; + init + { + if (value?.ContainsLineBreaks() is true) + { + ThrowHelper.ThrowArgumentException_CannotContainLineBreaks(nameof(EventId)); + } + + _eventId = value; + } + } + + /// Gets the event's retry interval. + /// + /// When specified on an event, instructs the client to update its reconnection time to the specified value. + /// + public TimeSpan? ReconnectionInterval + { + get => _reconnectionInterval; + init + { + if (value < TimeSpan.Zero) + { + ThrowHelper.ThrowArgumentException_CannotBeNegative(nameof(ReconnectionInterval)); + } + + _reconnectionInterval = value; + } + } } } diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs index d25a10a4ec54f..733bfcc3978f9 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs @@ -34,10 +34,20 @@ public static SseParser Create(Stream sseStream) => /// The parser to use to transform each payload of bytes into a data element. /// The enumerable, which can be enumerated synchronously or asynchronously. /// or is null. - public static SseParser Create(Stream sseStream, SseItemParser itemParser) => - new SseParser( - sseStream ?? throw new ArgumentNullException(nameof(sseStream)), - itemParser ?? throw new ArgumentNullException(nameof(itemParser))); + public static SseParser Create(Stream sseStream, SseItemParser itemParser) + { + if (sseStream is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(sseStream)); + } + + if (itemParser is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(itemParser)); + } + + return new SseParser(sseStream, itemParser); + } /// Encoding.UTF8.GetString(bytes) internal static unsafe string Utf8GetString(ReadOnlySpan bytes) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs index 98beedb7048fa..bff2149a6b9f2 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs @@ -28,6 +28,9 @@ public sealed class SseParser /// Carriage Return Line Feed. private static ReadOnlySpan CRLF => "\r\n"u8; + /// The maximum number of milliseconds representible by . + private readonly long TimeSpan_MaxValueMilliseconds = (long)TimeSpan.MaxValue.TotalMilliseconds; + /// The default size of an ArrayPool buffer to rent. /// Larger size used by default to minimize number of reads. Smaller size used in debug to stress growth/shifting logic. private const int DefaultArrayPoolRentSize = @@ -71,7 +74,13 @@ public sealed class SseParser private bool _dataAppended; /// The event type for the next event. - private string _eventType = SseParser.EventTypeDefault; + private string? _eventType; + + /// The event id for the next event. + private string? _eventId; + + /// The reconnection interval for the next event. + private TimeSpan? _nextReconnectionInterval; /// Initialize the enumerable. /// The stream to parse. @@ -314,8 +323,11 @@ private bool ProcessLine(out SseItem sseItem, out int advance) if (_dataAppended) { - sseItem = new SseItem(_itemParser(_eventType, _dataBuffer.AsSpan(0, _dataLength)), _eventType); - _eventType = SseParser.EventTypeDefault; + T data = _itemParser(_eventType ?? SseParser.EventTypeDefault, _dataBuffer.AsSpan(0, _dataLength)); + sseItem = new SseItem(data, _eventType) { EventId = _eventId, ReconnectionInterval = _nextReconnectionInterval }; + _eventType = null; + _eventId = null; + _nextReconnectionInterval = null; _dataLength = 0; _dataAppended = false; return true; @@ -365,8 +377,11 @@ private bool ProcessLine(out SseItem sseItem, out int advance) (remainder[0] is LF || (remainder[0] is CR && remainder.Length > 1))) { advance = line.Length + newlineLength + (remainder.StartsWith(CRLF) ? 2 : 1); - sseItem = new SseItem(_itemParser(_eventType, fieldValue), _eventType); - _eventType = SseParser.EventTypeDefault; + T data = _itemParser(_eventType ?? SseParser.EventTypeDefault, fieldValue); + sseItem = new SseItem(data, _eventType) { EventId = _eventId, ReconnectionInterval = _nextReconnectionInterval }; + _eventType = null; + _eventId = null; + _nextReconnectionInterval = null; return true; } } @@ -398,7 +413,7 @@ private bool ProcessLine(out SseItem sseItem, out int advance) if (fieldValue.IndexOf((byte)'\0') < 0) { // Note that fieldValue might be empty, in which case LastEventId will naturally be reset to the empty string. This is per spec. - LastEventId = SseParser.Utf8GetString(fieldValue); + LastEventId = _eventId = SseParser.Utf8GetString(fieldValue); } } else if (fieldName.SequenceEqual("retry"u8)) @@ -411,9 +426,12 @@ private bool ProcessLine(out SseItem sseItem, out int advance) #else SseParser.Utf8GetString(fieldValue), #endif - NumberStyles.None, CultureInfo.InvariantCulture, out long milliseconds)) + NumberStyles.None, CultureInfo.InvariantCulture, out long milliseconds) && + 0 <= milliseconds && milliseconds <= TimeSpan_MaxValueMilliseconds) { - ReconnectionInterval = TimeSpan.FromMilliseconds(milliseconds); + // Workaround for TimeSpan.FromMilliseconds not being able to roundtrip TimeSpan.MaxValue + TimeSpan timeSpan = milliseconds == TimeSpan_MaxValueMilliseconds ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(milliseconds); + _nextReconnectionInterval = ReconnectionInterval = timeSpan; } } else diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/ThrowHelper.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/ThrowHelper.cs new file mode 100644 index 0000000000000..cb6a4c58132bc --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/ThrowHelper.cs @@ -0,0 +1,26 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; + +namespace System.Net.ServerSentEvents +{ + internal static class ThrowHelper + { + [DoesNotReturn] + public static void ThrowArgumentNullException(string parameterName) + { + throw new ArgumentNullException(parameterName); + } + + public static void ThrowArgumentException_CannotContainLineBreaks(string parameterName) + { + throw new ArgumentException(SR.ArgumentException_CannotContainLineBreaks, parameterName); + } + + public static void ThrowArgumentException_CannotBeNegative(string parameterName) + { + throw new ArgumentException(SR.ArgumentException_CannotBeNegative, parameterName); + } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs new file mode 100644 index 0000000000000..a21f74f8cacb2 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs @@ -0,0 +1,173 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Runtime.CompilerServices; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Net.ServerSentEvents.Tests +{ + public static partial class SseFormatterTests + { + [Fact] + public static void WriteAsync_InvalidArguments_Throws() + { + AssertExtensions.Throws("source", () => SseFormatter.WriteAsync(source: null, new MemoryStream())); + AssertExtensions.Throws("source", () => SseFormatter.WriteAsync(source: null, new MemoryStream(), (_,_) => { })); + + AssertExtensions.Throws("destination", () => SseFormatter.WriteAsync(GetItemsAsync(), destination: null)); + AssertExtensions.Throws("destination", () => SseFormatter.WriteAsync(GetItemsAsync(), destination: null, (_,_) => { })); + + AssertExtensions.Throws("itemFormatter", () => SseFormatter.WriteAsync(GetItemsAsync(), new MemoryStream(), itemFormatter: null)); + } + + [Fact] + public static async Task WriteAsync_HasExpectedFormat() + { + // Arrange + string expectedFormat = + "event: eventType1\ndata: data1\n\n" + + "event: eventType2\ndata: data2\nretry: 300\n\n" + + "data: data3\n\n" + + "data: \n\n" + + "event: eventType4\ndata: data4\nid: id4\n\n" + + "event: eventType4\ndata: data\ndata: \ndata: with\ndata: multiple \ndata: line\ndata: breaks\n\n" + + "data: LF at end\ndata: \n\n" + + "data: CR at end\ndata: \n\n" + + "data: CRLF at end\ndata: \n\n" + + "data: LFCR at end\ndata: \ndata: \n\n"; + + using MemoryStream stream = new(); + + // Act + await SseFormatter.WriteAsync(GetItemsAsync(), stream); + + // Assert + string actualFormat = Encoding.UTF8.GetString(stream.ToArray()); + Assert.Equal(expectedFormat, actualFormat); + } + + [Fact] + public static async Task WriteAsync_ItemFormatter_HasExpectedFormat() + { + // Arrange + string expectedFormat = + "event: eventType1\ndata: data1_suffix\n\n" + + "event: eventType2\ndata: data2_suffix\nretry: 300\n\n" + + "data: data3_suffix\n\n" + + "data: _suffix\n\n" + + "event: eventType4\ndata: data4_suffix\nid: id4\n\n" + + "event: eventType4\ndata: data\ndata: \ndata: with\ndata: multiple \ndata: line\ndata: breaks_suffix\n\n" + + "data: LF at end\ndata: _suffix\n\n" + + "data: CR at end\ndata: _suffix\n\n" + + "data: CRLF at end\ndata: _suffix\n\n" + + "data: LFCR at end\ndata: \ndata: _suffix\n\n"; + + using MemoryStream stream = new(); + + // Act + await SseFormatter.WriteAsync(GetItemsAsync(), stream, (item, writer) => writer.Write(Encoding.UTF8.GetBytes(item.Data + "_suffix"))); + + // Assert + string actualFormat = Encoding.UTF8.GetString(stream.ToArray()); + Assert.Equal(expectedFormat, actualFormat); + } + + private static async IAsyncEnumerable> GetItemsAsync() + { + yield return new SseItem("data1", "eventType1"); + yield return new SseItem("data2", "eventType2") { ReconnectionInterval = TimeSpan.FromMilliseconds(300) }; + await Task.Yield(); + yield return new SseItem("data3", null); + yield return new SseItem(data: null!, null); + yield return new SseItem("data4", "eventType4") { EventId = "id4" }; + await Task.Yield(); + yield return new SseItem("data\n\r with\nmultiple \rline\r\nbreaks", "eventType4"); + yield return new SseItem("LF at end\n", null); + yield return new SseItem("CR at end\r", null); + yield return new SseItem("CRLF at end\r\n", null); + yield return new SseItem("LFCR at end\n\r", null); + } + + [Fact] + public static async Task WriteAsync_HonorsCancellationToken() + { + CancellationToken token = new(canceled: true); + + await Assert.ThrowsAsync(() => SseFormatter.WriteAsync(GetItemsAsync(), new MemoryStream(), token)); + await Assert.ThrowsAsync(() => + SseFormatter.WriteAsync( + GetItemsAsync(), + new MemoryStream(), + (item, writer) => writer.Write(Encoding.UTF8.GetBytes(item.Data)), + token)); + + async IAsyncEnumerable> GetItemsAsync([EnumeratorCancellation] CancellationToken token = default) + { + yield return new SseItem("data"); + await Task.Delay(20); + token.ThrowIfCancellationRequested(); + } + } + + [Fact] + public static async Task WriteAsync_ParserCanRoundtripJsonEvents() + { + MemoryStream stream = new(); + await SseFormatter.WriteAsync(GetItemsAsync(), stream, FormatJson); + + stream.Position = 0; + SseParser parser = SseParser.Create(stream, ParseJson); + await ValidateParseResults(parser.EnumerateAsync()); + + async IAsyncEnumerable> GetItemsAsync() + { + for (int i = 0; i < 50; i++) + { + string? eventType = i % 2 == 0 ? null : "eventType"; + string? eventId = i % 3 == 2 ? i.ToString() : null; + TimeSpan? reconnectionInterval = i % 5 == 4 ? TimeSpan.FromSeconds(i) : null; + yield return new SseItem(new MyPoco(i), eventType) { EventId = eventId, ReconnectionInterval = reconnectionInterval }; + await Task.Yield(); + } + } + + async Task ValidateParseResults(IAsyncEnumerable> results) + { + int i = 0; + await foreach (SseItem item in results) + { + Assert.Equal(i % 2 == 0 ? "message" : "eventType", item.EventType); + Assert.Equal(i % 3 == 2 ? i.ToString() : null, item.EventId); + Assert.Equal(i % 5 == 4 ? TimeSpan.FromSeconds(i) : null, item.ReconnectionInterval); + Assert.Equal(i, item.Data.Value); + i++; + } + } + + static void FormatJson(SseItem item, IBufferWriter writer) + { + JsonWriterOptions writerOptions = new() { Indented = true }; + using Utf8JsonWriter jsonWriter = new(writer, writerOptions); + JsonSerializer.Serialize(jsonWriter, item.Data, JsonContext.Default.MyPoco); + } + + static MyPoco ParseJson(string eventType, ReadOnlySpan data) + { + return JsonSerializer.Deserialize(data, JsonContext.Default.MyPoco); + } + } + + public record MyPoco(int Value); + + [JsonSerializable(typeof(MyPoco))] + partial class JsonContext : JsonSerializerContext; + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs index cf5a0d06382b6..75b5bf1b8cec7 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs @@ -15,14 +15,37 @@ public void SseItem_Roundtrips() item = default; Assert.Null(item.Data); Assert.Equal(SseParser.EventTypeDefault, item.EventType); + Assert.Null(item.EventId); + Assert.Null(item.ReconnectionInterval); item = new SseItem("some data", null); Assert.Equal("some data", item.Data); Assert.Equal(SseParser.EventTypeDefault, item.EventType); + Assert.Null(item.EventId); + Assert.Null(item.ReconnectionInterval); - item = new SseItem("some data", "eventType"); + item = new SseItem("some data", "eventType") { EventId = "eventId", ReconnectionInterval = TimeSpan.FromSeconds(3) }; Assert.Equal("some data", item.Data); Assert.Equal("eventType", item.EventType); + Assert.Equal("eventId", item.EventId); + Assert.Equal(TimeSpan.FromSeconds(3), item.ReconnectionInterval); + } + + [Theory] + [InlineData("\n")] + [InlineData("Hello, World!\n")] + [InlineData("Hello, \r\nWorld!")] + [InlineData("Hello, \rWorld!")] + public void SseItem_MetadataWithLineBreak_ThrowsArgumentException(string metadataWithLineBreak) + { + Assert.Throws("eventType", () => new SseItem("data", eventType: metadataWithLineBreak)); + Assert.Throws("EventId", () => new SseItem("data", "eventType") { EventId = metadataWithLineBreak }); + } + + [Fact] + public void SseItem_ReconnectionInterval_NegativeTimeSpan_ThrowsArgumentException() + { + Assert.Throws("ReconnectionInterval", () => new SseItem("data") { ReconnectionInterval = TimeSpan.FromSeconds(-1) }); } } } diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs index d0004ec47d476..13b2747af7905 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs @@ -121,9 +121,9 @@ await ReadAllEventsAsync(stream) : Assert.Equal(stream.Length, stream.Position); Assert.Equal(3, items.Count); - AssertSseItemEqual(new SseItem("1", "A"), items[0]); - AssertSseItemEqual(new SseItem("4", "B"), items[1]); - AssertSseItemEqual(new SseItem("7", "C"), items[2]); + AssertSseItemEqual(new SseItem("1", "A") { EventId = "2", ReconnectionInterval = TimeSpan.FromMilliseconds(300) }, items[0]); + AssertSseItemEqual(new SseItem("4", "B") { EventId = "5", ReconnectionInterval = TimeSpan.FromMilliseconds(600) }, items[1]); + AssertSseItemEqual(new SseItem("7", "C") { EventId = "8", ReconnectionInterval = TimeSpan.FromMilliseconds(900) }, items[2]); } [Theory] @@ -217,11 +217,11 @@ public async Task Parse_HtmlSpec_Example4(string newline, bool trickle, bool use using IEnumerator> e = parser.Enumerate().GetEnumerator(); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + AssertSseItemEqual(new SseItem("first event", "message") { EventId = "1" }, e.Current); Assert.Equal("1", parser.LastEventId); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + AssertSseItemEqual(new SseItem("second event", "message") { EventId = "" }, e.Current); Assert.Equal(string.Empty, parser.LastEventId); Assert.True(e.MoveNext()); @@ -235,11 +235,11 @@ public async Task Parse_HtmlSpec_Example4(string newline, bool trickle, bool use await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + AssertSseItemEqual(new SseItem("first event", "message") { EventId = "1" }, e.Current); Assert.Equal("1", parser.LastEventId); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + AssertSseItemEqual(new SseItem("second event", "message") { EventId = "" }, e.Current); Assert.Equal(string.Empty, parser.LastEventId); Assert.True(await e.MoveNextAsync()); @@ -273,7 +273,7 @@ public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool tric using IEnumerator> e = parser.Enumerate().GetEnumerator(); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + AssertSseItemEqual(new SseItem("first event", "message") { EventId = "1" }, e.Current); Assert.Equal("1", parser.LastEventId); Assert.True(e.MoveNext()); @@ -281,7 +281,7 @@ public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool tric Assert.Equal("1", parser.LastEventId); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + AssertSseItemEqual(new SseItem(" third event", "message") { EventId = "42" }, e.Current); Assert.Equal("42", parser.LastEventId); } else @@ -291,7 +291,7 @@ public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool tric await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + AssertSseItemEqual(new SseItem("first event", "message") { EventId = "1" }, e.Current); Assert.Equal("1", parser.LastEventId); Assert.True(await e.MoveNextAsync()); @@ -299,7 +299,7 @@ public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool tric Assert.Equal("1", parser.LastEventId); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + AssertSseItemEqual(new SseItem(" third event", "message") { EventId = "42" }, e.Current); Assert.Equal("42", parser.LastEventId); } } @@ -430,20 +430,22 @@ public async Task Retry_SetsReconnectionInterval(string newline, bool trickle, b Assert.Equal(Timeout.InfiniteTimeSpan, parser.ReconnectionInterval); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem("second event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(42), parser.ReconnectionInterval); + TimeSpan firstRetry = TimeSpan.FromMilliseconds(42); + AssertSseItemEqual(new SseItem("second event", "message") { ReconnectionInterval = firstRetry}, e.Current); + Assert.Equal(firstRetry, parser.ReconnectionInterval); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + TimeSpan secondRetry = TimeSpan.FromMilliseconds(12345678910); + AssertSseItemEqual(new SseItem(" third event", "message") { ReconnectionInterval = secondRetry }, e.Current); + Assert.Equal(secondRetry, parser.ReconnectionInterval); Assert.True(e.MoveNext()); AssertSseItemEqual(new SseItem("fourth event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + Assert.Equal(secondRetry, parser.ReconnectionInterval); Assert.True(e.MoveNext()); AssertSseItemEqual(new SseItem("fifth event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + Assert.Equal(secondRetry, parser.ReconnectionInterval); } else { @@ -456,23 +458,59 @@ public async Task Retry_SetsReconnectionInterval(string newline, bool trickle, b AssertSseItemEqual(new SseItem("first event", "message"), e.Current); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem("second event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(42), parser.ReconnectionInterval); + TimeSpan firstRetry = TimeSpan.FromMilliseconds(42); + AssertSseItemEqual(new SseItem("second event", "message") { ReconnectionInterval = firstRetry }, e.Current); + Assert.Equal(firstRetry, parser.ReconnectionInterval); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + TimeSpan secondRetry = TimeSpan.FromMilliseconds(12345678910); + AssertSseItemEqual(new SseItem(" third event", "message") { ReconnectionInterval = secondRetry }, e.Current); + Assert.Equal(secondRetry, parser.ReconnectionInterval); Assert.True(await e.MoveNextAsync()); AssertSseItemEqual(new SseItem("fourth event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + Assert.Equal(secondRetry, parser.ReconnectionInterval); Assert.True(await e.MoveNextAsync()); AssertSseItemEqual(new SseItem("fifth event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + Assert.Equal(secondRetry, parser.ReconnectionInterval); } } + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(922337203685477)] // TimeSpan.MaxValue.TotalMilliseconds + [InlineData(922337203685476)] + public async Task Retry_ValidRetryField_IsReturned(long retryValue) + { + // Workaround for TimeSpan.FromMillisecond not being able to roundtrip TimeSpan.MaxValue + TimeSpan expectedInterval = retryValue == TimeSpan.MaxValue.TotalMilliseconds ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(retryValue); + using Stream stream = GetStream($"data: test\nretry: {retryValue}\n\n", trickle: false); + + List> items = await ReadAllEventsAsync(stream); + + Assert.Equal(1, items.Count); + Assert.Equal(expectedInterval, items[0].ReconnectionInterval); + } + + [Theory] + [InlineData("")] + [InlineData("-1")] + [InlineData("-922337203685477")] // TimeSpan.MinValue.TotalMilliseconds + [InlineData("922337203685478")] // TimeSpan.MaxValue.TotalMilliseconds + 1 + [InlineData("9223372036854775807")] // long.MaxValue + [InlineData("invalidmilliseconds")] + public async Task Retry_InvalidRetryField_IsIgnored(string retryValue) + { + using Stream stream = GetStream($"data: test\nretry: {retryValue}\n\n", trickle: false); + + List> items = await ReadAllEventsAsync(stream); + + Assert.Equal(1, items.Count); + Assert.Null(items[0].ReconnectionInterval); + } + [Theory] [MemberData(nameof(NewlineTrickleAsyncData))] public async Task JsonContent_DelegateInvoked(string newline, bool trickle, bool useAsync) @@ -865,14 +903,9 @@ public async Task ArrayPoolRental_Closure(string newline, bool trickle, bool use private static void AssertSseItemEqual(SseItem left, SseItem right) { Assert.Equal(left.EventType, right.EventType); - if (left.Data is string leftData && right.Data is string rightData) - { - Assert.Equal($"{leftData.Length} {leftData}", $"{rightData.Length} {rightData}"); - } - else - { - Assert.Equal(left.Data, right.Data); - } + Assert.Equal(left.EventId, right.EventId); + Assert.Equal(left.ReconnectionInterval, right.ReconnectionInterval); + Assert.Equal(left.Data, right.Data); } public static IEnumerable NewlineTrickleAsyncData() => diff --git a/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj b/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj index d0f19860f75dc..56cf948fb7259 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj +++ b/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj @@ -5,10 +5,15 @@ + + + + +