From 360c0c179721004b4bf0ab7f38a662e18bea1be0 Mon Sep 17 00:00:00 2001 From: Andrew Jackson Date: Fri, 10 May 2024 16:02:35 +0100 Subject: [PATCH] Add support for netstandard2.0 Requires reverting the code to C# 7.3 and making the DisposeAsync conditional on the net6.0 build --- .../AwsCloudWatchConfigurationExtension.cs | 2 +- .../CloudWatchLogSink.cs | 9 +- ...iodicBatchingSinkImplementationCallback.cs | 583 +++++++++--------- .../Serilog.Sinks.AwsCloudWatch.csproj | 2 +- 4 files changed, 301 insertions(+), 295 deletions(-) diff --git a/src/Serilog.Sinks.AwsCloudWatch/AwsCloudWatchConfigurationExtension.cs b/src/Serilog.Sinks.AwsCloudWatch/AwsCloudWatchConfigurationExtension.cs index cfef9a1..c65d393 100644 --- a/src/Serilog.Sinks.AwsCloudWatch/AwsCloudWatchConfigurationExtension.cs +++ b/src/Serilog.Sinks.AwsCloudWatch/AwsCloudWatchConfigurationExtension.cs @@ -42,7 +42,7 @@ public static LoggerConfiguration AmazonCloudWatch(this LoggerSinkConfiguration // the batched sink is var batchedSink = new PeriodicBatchingSinkImplementationCallback(cloudWatchClient, options); - var sink = new PeriodicBatchingSink(batchedSink, new() + var sink = new PeriodicBatchingSink(batchedSink, new PeriodicBatchingSinkOptions() { BatchSizeLimit = options.BatchSizeLimit, Period = options.Period, diff --git a/src/Serilog.Sinks.AwsCloudWatch/CloudWatchLogSink.cs b/src/Serilog.Sinks.AwsCloudWatch/CloudWatchLogSink.cs index 2b1b390..78463f6 100644 --- a/src/Serilog.Sinks.AwsCloudWatch/CloudWatchLogSink.cs +++ b/src/Serilog.Sinks.AwsCloudWatch/CloudWatchLogSink.cs @@ -10,7 +10,10 @@ namespace Serilog.Sinks.AwsCloudWatch /// /// A Serilog log sink that publishes to AWS CloudWatch Logs /// - public class CloudWatchLogSink : ILogEventSink, IDisposable, IAsyncDisposable + public class CloudWatchLogSink : ILogEventSink, IDisposable +#if NET6_0_OR_GREATER + , IAsyncDisposable +#endif { private readonly PeriodicBatchingSink batchingSink; @@ -52,7 +55,7 @@ public class CloudWatchLogSink : ILogEventSink, IDisposable, IAsyncDisposable public CloudWatchLogSink(IAmazonCloudWatchLogs cloudWatchClient, ICloudWatchSinkOptions options) { var batchedSink = new PeriodicBatchingSinkImplementationCallback(cloudWatchClient, options); - batchingSink = new(batchedSink, new() { BatchSizeLimit = options.BatchSizeLimit, Period = options.Period, QueueLimit = options.QueueSizeLimit }); + batchingSink = new PeriodicBatchingSink(batchedSink, new PeriodicBatchingSinkOptions() { BatchSizeLimit = options.BatchSizeLimit, Period = options.Period, QueueLimit = options.QueueSizeLimit }); } /// @@ -67,10 +70,12 @@ public void Dispose() batchingSink.Dispose(); } +#if NET6_0_OR_GREATER /// public ValueTask DisposeAsync() { return batchingSink.DisposeAsync(); } +#endif } } \ No newline at end of file diff --git a/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs b/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs index effcc5e..014f261 100644 --- a/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs +++ b/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs @@ -11,373 +11,374 @@ using Serilog.Formatting; using Serilog.Sinks.PeriodicBatching; -namespace Serilog.Sinks.AwsCloudWatch; - -internal class PeriodicBatchingSinkImplementationCallback: IBatchedLogEventSink +namespace Serilog.Sinks.AwsCloudWatch { - private readonly IAmazonCloudWatchLogs cloudWatchClient; - private readonly ICloudWatchSinkOptions options; - private bool hasInit; - private string logStreamName; - private string nextSequenceToken; - private readonly ITextFormatter textFormatter; + internal class PeriodicBatchingSinkImplementationCallback: IBatchedLogEventSink + { + private readonly IAmazonCloudWatchLogs cloudWatchClient; + private readonly ICloudWatchSinkOptions options; + private bool hasInit; + private string logStreamName; + private string nextSequenceToken; + private readonly ITextFormatter textFormatter; - private readonly SemaphoreSlim syncObject = new SemaphoreSlim(1); + private readonly SemaphoreSlim syncObject = new SemaphoreSlim(1); - public PeriodicBatchingSinkImplementationCallback(IAmazonCloudWatchLogs cloudWatchClient, ICloudWatchSinkOptions options) - { - if (string.IsNullOrEmpty(options?.LogGroupName)) - { - throw new ArgumentException($"{nameof(ICloudWatchSinkOptions)}.{nameof(options.LogGroupName)} must be specified."); - } - if (options.BatchSizeLimit < 1) + public PeriodicBatchingSinkImplementationCallback(IAmazonCloudWatchLogs cloudWatchClient, ICloudWatchSinkOptions options) { - throw new ArgumentException($"{nameof(ICloudWatchSinkOptions)}.{nameof(options.BatchSizeLimit)} must be a value greater than 0."); - } - this.cloudWatchClient = cloudWatchClient; - this.options = options; + if (string.IsNullOrEmpty(options?.LogGroupName)) + { + throw new ArgumentException($"{nameof(ICloudWatchSinkOptions)}.{nameof(options.LogGroupName)} must be specified."); + } + if (options.BatchSizeLimit < 1) + { + throw new ArgumentException($"{nameof(ICloudWatchSinkOptions)}.{nameof(options.BatchSizeLimit)} must be a value greater than 0."); + } + this.cloudWatchClient = cloudWatchClient; + this.options = options; - if (options.TextFormatter == null) - { - throw new ArgumentException($"{nameof(options.TextFormatter)} is required"); - } + if (options.TextFormatter == null) + { + throw new ArgumentException($"{nameof(options.TextFormatter)} is required"); + } - textFormatter = options.TextFormatter; - } + textFormatter = options.TextFormatter; + } - /// - /// Ensures the component is initialized. - /// - private async Task EnsureInitializedAsync() - { - if (hasInit) + /// + /// Ensures the component is initialized. + /// + private async Task EnsureInitializedAsync() { - return; - } + if (hasInit) + { + return; + } - // create log group - await CreateLogGroupAsync(); + // create log group + await CreateLogGroupAsync(); - // create log stream - UpdateLogStreamName(); - await CreateLogStreamAsync(); + // create log stream + UpdateLogStreamName(); + await CreateLogStreamAsync(); - hasInit = true; - } + hasInit = true; + } - /// - /// Creates the log group. - /// - private async Task CreateLogGroupAsync() - { - if (options.CreateLogGroup) + /// + /// Creates the log group. + /// + private async Task CreateLogGroupAsync() { - // see if the log group already exists - var describeRequest = new DescribeLogGroupsRequest + if (options.CreateLogGroup) { - LogGroupNamePrefix = options.LogGroupName - }; - - var logGroups = await cloudWatchClient - .DescribeLogGroupsAsync(describeRequest); + // see if the log group already exists + var describeRequest = new DescribeLogGroupsRequest + { + LogGroupNamePrefix = options.LogGroupName + }; - var logGroup = logGroups - .LogGroups - .FirstOrDefault(lg => string.Equals(lg.LogGroupName, options.LogGroupName, StringComparison.Ordinal)); + var logGroups = await cloudWatchClient + .DescribeLogGroupsAsync(describeRequest); - // create log group if it doesn't exist - if (logGroup == null) - { - var createRequest = new CreateLogGroupRequest(options.LogGroupName); - var createResponse = await cloudWatchClient.CreateLogGroupAsync(createRequest); + var logGroup = logGroups + .LogGroups + .FirstOrDefault(lg => string.Equals(lg.LogGroupName, options.LogGroupName, StringComparison.Ordinal)); - // update the retention policy if a specific period is defined - if (options.LogGroupRetentionPolicy != LogGroupRetentionPolicy.Indefinitely) + // create log group if it doesn't exist + if (logGroup == null) { - var putRetentionRequest = new PutRetentionPolicyRequest(options.LogGroupName, (int)options.LogGroupRetentionPolicy); - await cloudWatchClient.PutRetentionPolicyAsync(putRetentionRequest); + var createRequest = new CreateLogGroupRequest(options.LogGroupName); + var createResponse = await cloudWatchClient.CreateLogGroupAsync(createRequest); + + // update the retention policy if a specific period is defined + if (options.LogGroupRetentionPolicy != LogGroupRetentionPolicy.Indefinitely) + { + var putRetentionRequest = new PutRetentionPolicyRequest(options.LogGroupName, (int)options.LogGroupRetentionPolicy); + await cloudWatchClient.PutRetentionPolicyAsync(putRetentionRequest); + } } } } - } - - /// - /// Updates the name of the log stream. - /// - private void UpdateLogStreamName() - { - logStreamName = options.LogStreamNameProvider.GetLogStreamName(); - nextSequenceToken = null; // always reset on a new stream - } - - /// - /// Creates the log stream if needed. - /// - private async Task CreateLogStreamAsync() - { - // see if the log stream already exists - var logStream = await GetLogStreamAsync(); - // create log stream if it doesn't exist - if (logStream == null) + /// + /// Updates the name of the log stream. + /// + private void UpdateLogStreamName() { - var createLogStreamRequest = new CreateLogStreamRequest - { - LogGroupName = options.LogGroupName, - LogStreamName = logStreamName - }; - var createLogStreamResponse = await cloudWatchClient.CreateLogStreamAsync(createLogStreamRequest); + logStreamName = options.LogStreamNameProvider.GetLogStreamName(); + nextSequenceToken = null; // always reset on a new stream } - else - { - nextSequenceToken = logStream.UploadSequenceToken; - } - } - - /// - /// Updates the log stream sequence token. - /// - private async Task UpdateLogStreamSequenceTokenAsync() - { - var logStream = await GetLogStreamAsync(); - nextSequenceToken = logStream?.UploadSequenceToken; - } - - /// - /// Attempts to get the log stream defined by . - /// - /// The matching log stream or null if no match can be found. - private async Task GetLogStreamAsync() - { - var describeLogStreamsRequest = new DescribeLogStreamsRequest - { - LogGroupName = options.LogGroupName, - LogStreamNamePrefix = logStreamName - }; - - var describeLogStreamsResponse = await cloudWatchClient - .DescribeLogStreamsAsync(describeLogStreamsRequest); - - return describeLogStreamsResponse - .LogStreams - .SingleOrDefault(ls => string.Equals(ls.LogStreamName, logStreamName, StringComparison.Ordinal)); - } - - /// - /// Creates a batch of events. - /// - /// The entire set of log events. - /// A batch of events meeting defined restrictions. - private List CreateBatch(Queue logEvents) - { - DateTime? first = null; - var batchSize = 0; - var batch = new List(); - while (batch.Count < CloudWatchLogSink.MaxLogEventBatchCount && logEvents.Count > 0) // ensure < max batch count + /// + /// Creates the log stream if needed. + /// + private async Task CreateLogStreamAsync() { - var @event = logEvents.Peek(); + // see if the log stream already exists + var logStream = await GetLogStreamAsync(); - if (!first.HasValue) - { - first = @event.Timestamp; - } - else if (@event.Timestamp.Subtract(first.Value) > CloudWatchLogSink.MaxBatchEventSpan) // ensure batch spans no more than 24 hours + // create log stream if it doesn't exist + if (logStream == null) { - break; - } - - var proposedBatchSize = batchSize + System.Text.Encoding.UTF8.GetByteCount(@event.Message) + CloudWatchLogSink.MessageBufferSize; - if (proposedBatchSize < CloudWatchLogSink.MaxLogEventBatchSize) // ensure < max batch size - { - batchSize = proposedBatchSize; - batch.Add(@event); - logEvents.Dequeue(); + var createLogStreamRequest = new CreateLogStreamRequest + { + LogGroupName = options.LogGroupName, + LogStreamName = logStreamName + }; + var createLogStreamResponse = await cloudWatchClient.CreateLogStreamAsync(createLogStreamRequest); } else { - break; + nextSequenceToken = logStream.UploadSequenceToken; } } - return batch; - } - - /// - /// Publish the batch of log events to AWS CloudWatch Logs. - /// - /// The request. - private async Task PublishBatchAsync(List batch) - { - if (batch?.Count == 0) + /// + /// Updates the log stream sequence token. + /// + private async Task UpdateLogStreamSequenceTokenAsync() { - return; + var logStream = await GetLogStreamAsync(); + nextSequenceToken = logStream?.UploadSequenceToken; } - var success = false; - var attemptIndex = 0; - while (!success && attemptIndex <= options.RetryAttempts) + /// + /// Attempts to get the log stream defined by . + /// + /// The matching log stream or null if no match can be found. + private async Task GetLogStreamAsync() { - try + var describeLogStreamsRequest = new DescribeLogStreamsRequest { - // creates the request to upload a new event to CloudWatch - var putLogEventsRequest = new PutLogEventsRequest - { - LogGroupName = options.LogGroupName, - LogStreamName = logStreamName, - SequenceToken = nextSequenceToken, - LogEvents = batch - }; + LogGroupName = options.LogGroupName, + LogStreamNamePrefix = logStreamName + }; - // actually upload the event to CloudWatch - var putLogEventsResponse = await cloudWatchClient.PutLogEventsAsync(putLogEventsRequest); + var describeLogStreamsResponse = await cloudWatchClient + .DescribeLogStreamsAsync(describeLogStreamsRequest); - // remember the next sequence token, which is required - nextSequenceToken = putLogEventsResponse.NextSequenceToken; + return describeLogStreamsResponse + .LogStreams + .SingleOrDefault(ls => string.Equals(ls.LogStreamName, logStreamName, StringComparison.Ordinal)); + } - success = true; - } - catch (ServiceUnavailableException e) - { - // retry with back-off - Debugging.SelfLog.WriteLine("Service unavailable. Attempt: {0} Error: {1}", attemptIndex, e); - await Task.Delay(CloudWatchLogSink.ErrorBackoffStartingInterval.Milliseconds * (int)Math.Pow(2, attemptIndex)); - attemptIndex++; - } - catch (ResourceNotFoundException e) - { - // no retry with back-off because.. - // if one of these fails, we get out of the loop. - // if they're both successful, we don't hit this case again. - Debugging.SelfLog.WriteLine("Resource was not found. Error: {0}", e); - await CreateLogGroupAsync(); - await CreateLogStreamAsync(); - } - catch (DataAlreadyAcceptedException e) + /// + /// Creates a batch of events. + /// + /// The entire set of log events. + /// A batch of events meeting defined restrictions. + private List CreateBatch(Queue logEvents) + { + DateTime? first = null; + var batchSize = 0; + var batch = new List(); + + while (batch.Count < CloudWatchLogSink.MaxLogEventBatchCount && logEvents.Count > 0) // ensure < max batch count { - Debugging.SelfLog.WriteLine("Data already accepted. Attempt: {0} Error: {1}", attemptIndex, e); - try + var @event = logEvents.Peek(); + + if (!first.HasValue) { - await UpdateLogStreamSequenceTokenAsync(); + first = @event.Timestamp; } - catch (Exception ex) + else if (@event.Timestamp.Subtract(first.Value) > CloudWatchLogSink.MaxBatchEventSpan) // ensure batch spans no more than 24 hours { - Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex); - - // try again with a different log stream - UpdateLogStreamName(); - await CreateLogStreamAsync(); + break; } - attemptIndex++; - } - catch (InvalidSequenceTokenException e) - { - Debugging.SelfLog.WriteLine("Invalid sequence token. Attempt: {0} Error: {1}", attemptIndex, e); - try + + var proposedBatchSize = batchSize + System.Text.Encoding.UTF8.GetByteCount(@event.Message) + CloudWatchLogSink.MessageBufferSize; + if (proposedBatchSize < CloudWatchLogSink.MaxLogEventBatchSize) // ensure < max batch size { - await UpdateLogStreamSequenceTokenAsync(); + batchSize = proposedBatchSize; + batch.Add(@event); + logEvents.Dequeue(); } - catch (Exception ex) + else { - Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex); - - // try again with a different log stream - UpdateLogStreamName(); - await CreateLogStreamAsync(); + break; } - attemptIndex++; - } - catch (Exception e) - { - Debugging.SelfLog.WriteLine("Unhandled exception. Error: {0}", e); - break; } + + return batch; } - } - /// - /// Emit a batch of log events, running asynchronously. - /// - /// The events to emit. - public async Task EmitBatchAsync(IEnumerable events) - { - try + /// + /// Publish the batch of log events to AWS CloudWatch Logs. + /// + /// The request. + private async Task PublishBatchAsync(List batch) { - await syncObject.WaitAsync(); - - if (events?.Count() == 0) + if (batch?.Count == 0) { return; } - try - { - await EnsureInitializedAsync(); - } - catch (Exception ex) + var success = false; + var attemptIndex = 0; + while (!success && attemptIndex <= options.RetryAttempts) { - Debugging.SelfLog.WriteLine("Error initializing log stream. No logs will be sent to AWS CloudWatch. Exception was {0}.", ex); - return; + try + { + // creates the request to upload a new event to CloudWatch + var putLogEventsRequest = new PutLogEventsRequest + { + LogGroupName = options.LogGroupName, + LogStreamName = logStreamName, + SequenceToken = nextSequenceToken, + LogEvents = batch + }; + + // actually upload the event to CloudWatch + var putLogEventsResponse = await cloudWatchClient.PutLogEventsAsync(putLogEventsRequest); + + // remember the next sequence token, which is required + nextSequenceToken = putLogEventsResponse.NextSequenceToken; + + success = true; + } + catch (ServiceUnavailableException e) + { + // retry with back-off + Debugging.SelfLog.WriteLine("Service unavailable. Attempt: {0} Error: {1}", attemptIndex, e); + await Task.Delay(CloudWatchLogSink.ErrorBackoffStartingInterval.Milliseconds * (int)Math.Pow(2, attemptIndex)); + attemptIndex++; + } + catch (ResourceNotFoundException e) + { + // no retry with back-off because.. + // if one of these fails, we get out of the loop. + // if they're both successful, we don't hit this case again. + Debugging.SelfLog.WriteLine("Resource was not found. Error: {0}", e); + await CreateLogGroupAsync(); + await CreateLogStreamAsync(); + } + catch (DataAlreadyAcceptedException e) + { + Debugging.SelfLog.WriteLine("Data already accepted. Attempt: {0} Error: {1}", attemptIndex, e); + try + { + await UpdateLogStreamSequenceTokenAsync(); + } + catch (Exception ex) + { + Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex); + + // try again with a different log stream + UpdateLogStreamName(); + await CreateLogStreamAsync(); + } + attemptIndex++; + } + catch (InvalidSequenceTokenException e) + { + Debugging.SelfLog.WriteLine("Invalid sequence token. Attempt: {0} Error: {1}", attemptIndex, e); + try + { + await UpdateLogStreamSequenceTokenAsync(); + } + catch (Exception ex) + { + Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex); + + // try again with a different log stream + UpdateLogStreamName(); + await CreateLogStreamAsync(); + } + attemptIndex++; + } + catch (Exception e) + { + Debugging.SelfLog.WriteLine("Unhandled exception. Error: {0}", e); + break; + } } + } + /// + /// Emit a batch of log events, running asynchronously. + /// + /// The events to emit. + public async Task EmitBatchAsync(IEnumerable events) + { try { - var logEvents = - new Queue(events - .OrderBy(e => e.Timestamp) // log events need to be ordered by timestamp within a single bulk upload to CloudWatch - .Select( // transform - @event => - { - string message = null; - using (var writer = new StringWriter()) - { - textFormatter.Format(@event, writer); - writer.Flush(); - message = writer.ToString(); - } - var messageLength = Encoding.UTF8.GetByteCount(message); - if (messageLength > CloudWatchLogSink.MaxLogEventSize) - { - // truncate event message - Debugging.SelfLog.WriteLine("Truncating log event with length of {0}", messageLength); - var buffer = Encoding.UTF8.GetBytes(message); - message = Encoding.UTF8.GetString(buffer, 0, CloudWatchLogSink.MaxLogEventSize); - } - return new InputLogEvent - { - Message = message, - Timestamp = @event.Timestamp.UtcDateTime - }; - })); + await syncObject.WaitAsync(); - while (logEvents.Count > 0) + if (events?.Count() == 0) { - var batch = CreateBatch(logEvents); + return; + } - await PublishBatchAsync(batch); + try + { + await EnsureInitializedAsync(); } - } - catch (Exception ex) - { + catch (Exception ex) + { + Debugging.SelfLog.WriteLine("Error initializing log stream. No logs will be sent to AWS CloudWatch. Exception was {0}.", ex); + return; + } + try { - Debugging.SelfLog.WriteLine("Error sending logs. No logs will be sent to AWS CloudWatch. Error was {0}", ex); + var logEvents = + new Queue(events + .OrderBy(e => e.Timestamp) // log events need to be ordered by timestamp within a single bulk upload to CloudWatch + .Select( // transform + @event => + { + string message = null; + using (var writer = new StringWriter()) + { + textFormatter.Format(@event, writer); + writer.Flush(); + message = writer.ToString(); + } + var messageLength = Encoding.UTF8.GetByteCount(message); + if (messageLength > CloudWatchLogSink.MaxLogEventSize) + { + // truncate event message + Debugging.SelfLog.WriteLine("Truncating log event with length of {0}", messageLength); + var buffer = Encoding.UTF8.GetBytes(message); + message = Encoding.UTF8.GetString(buffer, 0, CloudWatchLogSink.MaxLogEventSize); + } + return new InputLogEvent + { + Message = message, + Timestamp = @event.Timestamp.UtcDateTime + }; + })); + + while (logEvents.Count > 0) + { + var batch = CreateBatch(logEvents); + + await PublishBatchAsync(batch); + } } - catch + catch (Exception ex) { - // we even failed to log to the trace logger - giving up trying to put something out + try + { + Debugging.SelfLog.WriteLine("Error sending logs. No logs will be sent to AWS CloudWatch. Error was {0}", ex); + } + catch + { + // we even failed to log to the trace logger - giving up trying to put something out + } } } + finally + { + syncObject.Release(); + } } - finally + + /// + public Task OnEmptyBatchAsync() { - syncObject.Release(); + return Task.CompletedTask; } } - - /// - public Task OnEmptyBatchAsync() - { - return Task.CompletedTask; - } } \ No newline at end of file diff --git a/src/Serilog.Sinks.AwsCloudWatch/Serilog.Sinks.AwsCloudWatch.csproj b/src/Serilog.Sinks.AwsCloudWatch/Serilog.Sinks.AwsCloudWatch.csproj index 7f206d7..f5b5fae 100644 --- a/src/Serilog.Sinks.AwsCloudWatch/Serilog.Sinks.AwsCloudWatch.csproj +++ b/src/Serilog.Sinks.AwsCloudWatch/Serilog.Sinks.AwsCloudWatch.csproj @@ -4,7 +4,7 @@ AWS Cloud Watch Serilog Sink 0.0.1 thoean;wparad - net6.0 + netstandard2.0;net6.0 Serilog.Sinks.AwsCloudWatch Serilog.Sinks.AwsCloudWatch serilog;logging;dnx;coreclr;AWS;CloudWatch