diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b929746..5152301 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -30,7 +30,7 @@ jobs: env: NUGET_KEY: ${{ secrets.NUGET_KEY }} run: | - buildNumber="4.2.${GITHUB_RUN_NUMBER}" + buildNumber="4.3.${GITHUB_RUN_NUMBER}" sed "s/0.0.1/${buildNumber}/g" src/Serilog.Sinks.AwsCloudWatch/*.csproj -i dotnet pack -c Release -o artifacts || exit 1 dotnet nuget push artifacts/Serilog.Sinks.AwsCloudWatch.${buildNumber}.nupkg -s "https://api.nuget.org/v3/index.json" -k "$NUGET_KEY" diff --git a/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs b/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs index caa05d2..0db005e 100644 --- a/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs +++ b/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using Amazon.CloudWatchLogs; using Amazon.CloudWatchLogs.Model; -using Serilog.Events; using Serilog.Formatting; using Serilog.Sinks.PeriodicBatching; using LogEvent = Serilog.Events.LogEvent; @@ -20,7 +19,6 @@ internal class PeriodicBatchingSinkImplementationCallback: IBatchedLogEventSink private readonly ICloudWatchSinkOptions options; private bool hasInit; private string logStreamName; - private string nextSequenceToken; private readonly ITextFormatter textFormatter; private readonly SemaphoreSlim syncObject = new SemaphoreSlim(1); @@ -108,7 +106,6 @@ private async Task CreateLogGroupAsync() private void UpdateLogStreamName() { logStreamName = options.LogStreamNameProvider.GetLogStreamName(); - nextSequenceToken = null; // always reset on a new stream } /// @@ -129,19 +126,6 @@ private async Task CreateLogStreamAsync() }; var createLogStreamResponse = await cloudWatchClient.CreateLogStreamAsync(createLogStreamRequest); } - else - { - nextSequenceToken = logStream.UploadSequenceToken; - } - } - - /// - /// Updates the log stream sequence token. - /// - private async Task UpdateLogStreamSequenceTokenAsync() - { - var logStream = await GetLogStreamAsync(); - nextSequenceToken = logStream?.UploadSequenceToken; } /// @@ -226,16 +210,12 @@ private async Task PublishBatchAsync(List batch) { LogGroupName = options.LogGroupName, LogStreamName = logStreamName, - SequenceToken = nextSequenceToken, LogEvents = batch }; // actually upload the event to CloudWatch var putLogEventsResponse = await cloudWatchClient.PutLogEventsAsync(putLogEventsRequest); - // remember the next sequence token, which is required - nextSequenceToken = putLogEventsResponse.NextSequenceToken; - success = true; } catch (ServiceUnavailableException e) @@ -254,40 +234,6 @@ private async Task PublishBatchAsync(List batch) await CreateLogGroupAsync(); await CreateLogStreamAsync(); } - catch (DataAlreadyAcceptedException e) - { - Debugging.SelfLog.WriteLine("Data already accepted. Attempt: {0} Error: {1}", attemptIndex, e); - try - { - await UpdateLogStreamSequenceTokenAsync(); - } - catch (Exception ex) - { - Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex); - - // try again with a different log stream - UpdateLogStreamName(); - await CreateLogStreamAsync(); - } - attemptIndex++; - } - catch (InvalidSequenceTokenException e) - { - Debugging.SelfLog.WriteLine("Invalid sequence token. Attempt: {0} Error: {1}", attemptIndex, e); - try - { - await UpdateLogStreamSequenceTokenAsync(); - } - catch (Exception ex) - { - Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex); - - // try again with a different log stream - UpdateLogStreamName(); - await CreateLogStreamAsync(); - } - attemptIndex++; - } catch (Exception e) { Debugging.SelfLog.WriteLine("Unhandled exception. Error: {0}", e); diff --git a/src/Serilog.Sinks.AwsCloudWatch/Serilog.Sinks.AwsCloudWatch.csproj b/src/Serilog.Sinks.AwsCloudWatch/Serilog.Sinks.AwsCloudWatch.csproj index c66c8a8..69a5ad7 100644 --- a/src/Serilog.Sinks.AwsCloudWatch/Serilog.Sinks.AwsCloudWatch.csproj +++ b/src/Serilog.Sinks.AwsCloudWatch/Serilog.Sinks.AwsCloudWatch.csproj @@ -1,4 +1,4 @@ - + A Serilog sink that logs to AWS CloudWatch AWS Cloud Watch Serilog Sink @@ -28,7 +28,7 @@ - + diff --git a/test/Serilog.Sinks.AwsCloudWatch.Tests/PeriodicBatchedSinkImplementationCallbackTests.cs b/test/Serilog.Sinks.AwsCloudWatch.Tests/PeriodicBatchedSinkImplementationCallbackTests.cs index 73cb583..6fcf316 100644 --- a/test/Serilog.Sinks.AwsCloudWatch.Tests/PeriodicBatchedSinkImplementationCallbackTests.cs +++ b/test/Serilog.Sinks.AwsCloudWatch.Tests/PeriodicBatchedSinkImplementationCallbackTests.cs @@ -66,7 +66,6 @@ public async Task SingleBatch() .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, - NextSequenceToken = Guid.NewGuid().ToString() }); await sink.EmitBatchAsync(events); @@ -75,7 +74,6 @@ public async Task SingleBatch() var request = putLogEventsCalls.First().Request; Assert.Equal(options.LogGroupName, request.LogGroupName); - Assert.Null(request.SequenceToken); Assert.Equal(10, request.LogEvents.Count); for (var i = 0; i < events.Length; i++) { @@ -130,7 +128,6 @@ public async Task SingleBatch_LogGroupExists() .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, - NextSequenceToken = Guid.NewGuid().ToString() }); await sink.EmitBatchAsync(events); @@ -139,7 +136,6 @@ public async Task SingleBatch_LogGroupExists() var request = putLogEventsCalls.First().Request; Assert.Equal(options.LogGroupName, request.LogGroupName); - Assert.Null(request.SequenceToken); Assert.Equal(10, request.LogEvents.Count); for (var i = 0; i < events.Length; i++) { @@ -181,7 +177,6 @@ public async Task SingleBatch_WithoutCreatingLogGroup() .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, - NextSequenceToken = Guid.NewGuid().ToString() }); await sink.EmitBatchAsync(events); @@ -190,7 +185,6 @@ public async Task SingleBatch_WithoutCreatingLogGroup() var request = putLogEventsCalls.First().Request; Assert.Equal(options.LogGroupName, request.LogGroupName); - Assert.Null(request.SequenceToken); Assert.Equal(10, request.LogEvents.Count); for (var i = 0; i < events.Length; i++) { @@ -252,7 +246,6 @@ public async Task SingleBatch_LogStreamExists() .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, - NextSequenceToken = Guid.NewGuid().ToString() }); await sink.EmitBatchAsync(events); @@ -262,7 +255,6 @@ public async Task SingleBatch_LogStreamExists() var request = putLogEventsCalls.First().Request; Assert.Equal(options.LogGroupName, request.LogGroupName); Assert.Equal(options.LogStreamNameProvider.GetLogStreamName(), request.LogStreamName); - Assert.Null(request.SequenceToken); Assert.Equal(10, request.LogEvents.Count); for (var i = 0; i < events.Length; i++) { @@ -311,7 +303,6 @@ public async Task LargeMessage() .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, - NextSequenceToken = Guid.NewGuid().ToString() }); await sink.EmitBatchAsync(events); @@ -320,7 +311,6 @@ public async Task LargeMessage() var request = putLogEventsCalls.First().Request; Assert.Equal(options.LogGroupName, request.LogGroupName); - Assert.Null(request.SequenceToken); Assert.Single(request.LogEvents); Assert.Equal(largeEventMessage.Substring(0, CloudWatchLogSink.MaxLogEventSize), request.LogEvents.First().Message); @@ -365,7 +355,6 @@ public async Task MultipleDays() .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, - NextSequenceToken = Guid.NewGuid().ToString() }); await sink.EmitBatchAsync(events); @@ -385,15 +374,6 @@ public async Task MultipleDays() { Assert.True(call.Request.LogEvents.ElementAt(index).Timestamp >= call.Request.LogEvents.ElementAt(index - 1).Timestamp); } - - if (i == 0) // first call - { - Assert.Null(request.SequenceToken); - } - else - { - Assert.NotNull(request.SequenceToken); - } } client.VerifyAll(); @@ -437,7 +417,6 @@ public async Task MoreThanMaxBatchCount() .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, - NextSequenceToken = Guid.NewGuid().ToString() }); await sink.EmitBatchAsync(events); @@ -453,12 +432,10 @@ public async Task MoreThanMaxBatchCount() if (i == 0) // first call { - Assert.Null(request.SequenceToken); Assert.Equal(CloudWatchLogSink.MaxLogEventBatchCount, request.LogEvents.Count); } else { - Assert.NotNull(request.SequenceToken); Assert.Single(request.LogEvents); } } @@ -504,7 +481,6 @@ public async Task MoreThanMaxBatchSize() .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, - NextSequenceToken = Guid.NewGuid().ToString() }); await sink.EmitBatchAsync(events); @@ -520,12 +496,10 @@ public async Task MoreThanMaxBatchSize() if (i == 0) // first call { - Assert.Null(request.SequenceToken); Assert.Equal(203, request.LogEvents.Count); // expect 203 of the 256 messages in the first batch } else { - Assert.NotNull(request.SequenceToken); Assert.Equal(53, request.LogEvents.Count); // expect 53 of the 256 messages in the second batch } } @@ -620,7 +594,7 @@ public async Task ServiceUnavailable_WithEventualSuccess() client.SetupSequence(mock => mock.PutLogEventsAsync(It.IsAny(), It.IsAny())) .ThrowsAsync(new ServiceUnavailableException("unavailable")) - .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, NextSequenceToken = Guid.NewGuid().ToString() }); + .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); await sink.EmitBatchAsync(events); @@ -664,7 +638,7 @@ public async Task ResourceNotFound() client.SetupSequence(mock => mock.PutLogEventsAsync(It.IsAny(), It.IsAny())) .ThrowsAsync(new ResourceNotFoundException("no resource")) - .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, NextSequenceToken = Guid.NewGuid().ToString() }); + .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); await sink.EmitBatchAsync(events); @@ -768,159 +742,6 @@ public async Task InvalidParameter() client.VerifyAll(); } - [Fact(DisplayName = "EmitBatchAsync - Invalid sequence token")] - public async Task InvalidSequenceToken() - { - // expect update of sequence token and successful retry - - var client = new Mock(MockBehavior.Strict); - var textFormatterMock = new Mock(MockBehavior.Strict); - textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t)); - var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" }; - var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options); - var events = Enumerable.Range(0, 10) - .Select(_ => // create 10 events with message length of 12 - new LogEvent( - DateTimeOffset.UtcNow, - LogEventLevel.Information, - null, - new MessageTemplateParser().Parse(CreateMessage(12)), - Enumerable.Empty())) - .ToArray(); - - client.Setup(mock => mock.DescribeLogGroupsAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new DescribeLogGroupsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.Setup(mock => mock.CreateLogGroupAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new CreateLogGroupResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.Setup(mock => mock.DescribeLogStreamsAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new DescribeLogStreamsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, NextToken = Guid.NewGuid().ToString() }); - - List createLogStreamRequests = new List(); - client.Setup(mock => mock.CreateLogStreamAsync(It.IsAny(), It.IsAny())) - .Callback((createLogStreamRequest, cancellationToken) => createLogStreamRequests.Add(createLogStreamRequest)) - .ReturnsAsync(new CreateLogStreamResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.SetupSequence(mock => mock.PutLogEventsAsync(It.IsAny(), It.IsAny())) - .ThrowsAsync(new InvalidSequenceTokenException("invalid sequence")) - .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, NextSequenceToken = Guid.NewGuid().ToString() }); - - await sink.EmitBatchAsync(events); - - client.Verify(mock => mock.PutLogEventsAsync(It.IsAny(), It.IsAny()), Times.Exactly(2)); - client.Verify(mock => mock.DescribeLogStreamsAsync(It.Is(req => req.LogGroupName == options.LogGroupName && req.LogStreamNamePrefix == createLogStreamRequests.First().LogStreamName), It.IsAny()), Times.Exactly(2)); - client.Verify(mock => mock.CreateLogStreamAsync(It.IsAny(), It.IsAny()), Times.Once); - - client.VerifyAll(); - } - - [Fact(DisplayName = "EmitBatchAsync - Invalid sequence token with new log stream")] - public async Task InvalidSequenceToken_CannotUpdateSequenceToken() - { - // expect update of sequence token and success on a new log stream - - var logStreamNameProvider = new Mock(); - logStreamNameProvider.SetupSequence(mock => mock.GetLogStreamName()) - .Returns("a") - .Returns("b"); - - var client = new Mock(MockBehavior.Strict); - var textFormatterMock = new Mock(MockBehavior.Strict); - textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t)); - var options = new CloudWatchSinkOptions { LogStreamNameProvider = logStreamNameProvider.Object, TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" }; - var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options); - var events = Enumerable.Range(0, 10) - .Select(_ => // create 10 events with message length of 12 - new LogEvent( - DateTimeOffset.UtcNow, - LogEventLevel.Information, - null, - new MessageTemplateParser().Parse(CreateMessage(12)), - Enumerable.Empty())) - .ToArray(); - - client.Setup(mock => mock.DescribeLogGroupsAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new DescribeLogGroupsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.Setup(mock => mock.CreateLogGroupAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new CreateLogGroupResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.SetupSequence(mock => mock.DescribeLogStreamsAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new DescribeLogStreamsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }) - .ThrowsAsync(new Exception("no describe log stream")) - .ReturnsAsync(new DescribeLogStreamsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - List createLogStreamRequests = new List(); - client.Setup(mock => mock.CreateLogStreamAsync(It.IsAny(), It.IsAny())) - .Callback((createLogStreamRequest, cancellationToken) => createLogStreamRequests.Add(createLogStreamRequest)) - .ReturnsAsync(new CreateLogStreamResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.Setup(mock => mock.PutLogEventsAsync(It.Is(req => req.LogStreamName == "a"), It.IsAny())) - .ThrowsAsync(new InvalidSequenceTokenException("invalid sequence")); - - client.Setup(mock => mock.PutLogEventsAsync(It.Is(req => req.LogStreamName == "b"), It.IsAny())) - .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, NextSequenceToken = Guid.NewGuid().ToString() }); - - await sink.EmitBatchAsync(events); - - client.Verify(mock => mock.PutLogEventsAsync(It.IsAny(), It.IsAny()), Times.Exactly(2)); - client.Verify(mock => mock.DescribeLogStreamsAsync(It.Is(req => req.LogGroupName == options.LogGroupName && req.LogStreamNamePrefix == createLogStreamRequests.First().LogStreamName), It.IsAny()), Times.Exactly(2)); - client.Verify(mock => mock.CreateLogStreamAsync(It.IsAny(), It.IsAny()), Times.Exactly(2)); - - Assert.Equal(2, createLogStreamRequests.Count); - Assert.NotEqual(createLogStreamRequests.ElementAt(0).LogStreamName, createLogStreamRequests.ElementAt(1).LogStreamName); - - client.VerifyAll(); - } - - [Fact(DisplayName = "EmitBatchAsync - Data already accepted")] - public async Task DataAlreadyAccepted() - { - // expect update of sequence token and successful retry - - var client = new Mock(MockBehavior.Strict); - var textFormatterMock = new Mock(MockBehavior.Strict); - textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t)); - var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" }; - var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options); - var events = Enumerable.Range(0, 10) - .Select(_ => // create 10 events with message length of 12 - new LogEvent( - DateTimeOffset.UtcNow, - LogEventLevel.Information, - null, - new MessageTemplateParser().Parse(CreateMessage(12)), - Enumerable.Empty())) - .ToArray(); - - client.Setup(mock => mock.DescribeLogGroupsAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new DescribeLogGroupsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.Setup(mock => mock.CreateLogGroupAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new CreateLogGroupResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.Setup(mock => mock.DescribeLogStreamsAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(new DescribeLogStreamsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, NextToken = Guid.NewGuid().ToString() }); - - List createLogStreamRequests = new List(); - client.Setup(mock => mock.CreateLogStreamAsync(It.IsAny(), It.IsAny())) - .Callback((createLogStreamRequest, cancellationToken) => createLogStreamRequests.Add(createLogStreamRequest)) - .ReturnsAsync(new CreateLogStreamResponse { HttpStatusCode = System.Net.HttpStatusCode.OK }); - - client.SetupSequence(mock => mock.PutLogEventsAsync(It.IsAny(), It.IsAny())) - .ThrowsAsync(new DataAlreadyAcceptedException("data already accepted")) - .ReturnsAsync(new PutLogEventsResponse { HttpStatusCode = System.Net.HttpStatusCode.OK, NextSequenceToken = Guid.NewGuid().ToString() }); - - await sink.EmitBatchAsync(events); - - client.Verify(mock => mock.PutLogEventsAsync(It.IsAny(), It.IsAny()), Times.Exactly(2)); - client.Verify(mock => mock.DescribeLogStreamsAsync(It.Is(req => req.LogGroupName == options.LogGroupName && req.LogStreamNamePrefix == createLogStreamRequests.First().LogStreamName), It.IsAny()), Times.Exactly(2)); - client.Verify(mock => mock.CreateLogStreamAsync(It.IsAny(), It.IsAny()), Times.Once); - - client.VerifyAll(); - } - /// /// Creates a message of random characters of the given size. ///