From b762a599e20848019d1b96e386db618bbd24618b Mon Sep 17 00:00:00 2001 From: Anthony James Date: Fri, 21 Jul 2017 17:09:31 -0500 Subject: [PATCH] EventCollectorSink inherits PeriodicBatchingSink. The EventCollectorSink now inherits from PeriodicBatchingSink and allows the base class to handle the batching of log events. I've also removed the RepeatAction class as it was only used by the EventCollectorSink and is no longer necessary. --- .../Serilog.Sinks.Splunk.csproj | 1 + .../Sinks/Splunk/EventCollectorSink.cs | 111 +++--------------- .../Sinks/Splunk/RepeatAction.cs | 47 -------- 3 files changed, 17 insertions(+), 142 deletions(-) delete mode 100644 src/Serilog.Sinks.Splunk/Sinks/Splunk/RepeatAction.cs diff --git a/src/Serilog.Sinks.Splunk/Serilog.Sinks.Splunk.csproj b/src/Serilog.Sinks.Splunk/Serilog.Sinks.Splunk.csproj index a73a5aa..e2071e7 100644 --- a/src/Serilog.Sinks.Splunk/Serilog.Sinks.Splunk.csproj +++ b/src/Serilog.Sinks.Splunk/Serilog.Sinks.Splunk.csproj @@ -36,6 +36,7 @@ + diff --git a/src/Serilog.Sinks.Splunk/Sinks/Splunk/EventCollectorSink.cs b/src/Serilog.Sinks.Splunk/Sinks/Splunk/EventCollectorSink.cs index 647fe6b..cdfe05f 100644 --- a/src/Serilog.Sinks.Splunk/Sinks/Splunk/EventCollectorSink.cs +++ b/src/Serilog.Sinks.Splunk/Sinks/Splunk/EventCollectorSink.cs @@ -13,33 +13,30 @@ // limitations under the License. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Http; -using System.Threading; using System.Threading.Tasks; -using Serilog.Core; using Serilog.Debugging; using Serilog.Events; using Serilog.Formatting; +using Serilog.Sinks.PeriodicBatching; namespace Serilog.Sinks.Splunk { /// /// A sink to log to the Event Collector available in Splunk 6.3 /// - public class EventCollectorSink : ILogEventSink, IDisposable + public class EventCollectorSink : PeriodicBatchingSink { private readonly string _splunkHost; private readonly string _uriPath; - private readonly int _batchSizeLimitLimit; private readonly ITextFormatter _jsonFormatter; - private readonly ConcurrentQueue _queue; private readonly EventCollectorClient _httpClient; + /// /// Taken from Splunk.Logging.Common /// @@ -115,6 +112,7 @@ public EventCollectorSink( messageHandler) { } + /// /// Creates a new instance of the sink with Customfields /// @@ -152,12 +150,11 @@ public EventCollectorSink( uriPath, batchIntervalInSeconds, batchSizeLimit, - new SplunkJsonFormatter(renderTemplate, formatProvider, source, sourceType, host, index,fields), + new SplunkJsonFormatter(renderTemplate, formatProvider, source, sourceType, host, index, fields), messageHandler) { } - /// /// Creates a new instance of the sink /// @@ -176,67 +173,25 @@ public EventCollectorSink( int batchSizeLimit, ITextFormatter jsonFormatter, HttpMessageHandler messageHandler = null) + : base(batchSizeLimit, TimeSpan.FromSeconds(batchIntervalInSeconds)) { _uriPath = uriPath; _splunkHost = splunkHost; - _queue = new ConcurrentQueue(); _jsonFormatter = jsonFormatter; - _batchSizeLimitLimit = batchSizeLimit; - var batchInterval = TimeSpan.FromSeconds(batchIntervalInSeconds); _httpClient = messageHandler != null ? new EventCollectorClient(eventCollectorToken, messageHandler) : new EventCollectorClient(eventCollectorToken); - - var cancellationToken = new CancellationToken(); - - RepeatAction.OnInterval( - batchInterval, - async () => await ProcessQueue(), - cancellationToken); } /// - /// Emits the provided log event from a sink + /// Emit a batch of log events, running asynchronously. /// - /// - public void Emit(LogEvent logEvent) - { - if (logEvent == null) throw new ArgumentNullException(nameof(logEvent)); - - _queue.Enqueue(logEvent); - } - - private async Task ProcessQueue() - { - try - { - do - { - var count = 0; - var events = new Queue(); - LogEvent next; - - while (count < _batchSizeLimitLimit && _queue.TryDequeue(out next)) - { - count++; - events.Enqueue(next); - } - - if (events.Count == 0) - return; - - await Send(events); - - } while (true); - } - catch (Exception ex) - { - SelfLog.WriteLine("Exception while emitting batch from {0}: {1}", this, ex); - } - } - - private async Task Send(IEnumerable events) + /// The events to emit. + /// + /// Override either or , not both. + /// + protected override async Task EmitBatchAsync(IEnumerable events) { var allEvents = new StringWriter(); @@ -248,56 +203,22 @@ private async Task Send(IEnumerable events) var request = new EventCollectorRequest(_splunkHost, allEvents.ToString(), _uriPath); var response = await _httpClient.SendAsync(request).ConfigureAwait(false); - if (response.IsSuccessStatusCode) - { - //Do Nothing? - } - else + if (!response.IsSuccessStatusCode) { //Application Errors sent via HTTP Event Collector if (HttpEventCollectorApplicationErrors.Any(x => x == response.StatusCode)) { + // By not throwing an exception here the PeriodicBatchingSink will assume the batch succeeded and not send it again. SelfLog.WriteLine( "A status code of {0} was received when attempting to send to {1}. The event has been discarded and will not be placed back in the queue.", response.StatusCode.ToString(), _splunkHost); } else { - //Put the item back in the queue & retry on next go - SelfLog.WriteLine( - "A status code of {0} was received when attempting to send to {1}. The event has been placed back in the queue", - response.StatusCode.ToString(), _splunkHost); - - foreach (var logEvent in events) - { - _queue.Enqueue(logEvent); - } + // EnsureSuccessStatusCode will throw an exception and the PeriodicBatchingSink will catch/log the exception and retry the batch. + response.EnsureSuccessStatusCode(); } } } - - /// - public void Dispose() - { - Dispose(true); - } - - /// - protected virtual void Dispose(bool disposing) - { - if (!disposing) return; - - var remainingEvents = new List(); - - while (!_queue.IsEmpty) - { - LogEvent next; - _queue.TryDequeue(out next); - remainingEvents.Add(next); - } - - Send(remainingEvents).Wait(); - _httpClient.Dispose(); - } } } diff --git a/src/Serilog.Sinks.Splunk/Sinks/Splunk/RepeatAction.cs b/src/Serilog.Sinks.Splunk/Sinks/Splunk/RepeatAction.cs deleted file mode 100644 index ee39d6f..0000000 --- a/src/Serilog.Sinks.Splunk/Sinks/Splunk/RepeatAction.cs +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2016 Serilog Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Serilog.Sinks.Splunk -{ - internal static class RepeatAction - { - public static Task OnInterval(TimeSpan pollInterval, Action action, CancellationToken token, - TaskCreationOptions taskCreationOptions, TaskScheduler taskScheduler) - { - return Task.Factory.StartNew(() => - { - for (;;) - { - if (token.WaitCancellationRequested(pollInterval)) - break; - action(); - } - }, token, taskCreationOptions, taskScheduler); - } - - public static Task OnInterval(TimeSpan pollInterval, Action action, CancellationToken token) - { - return OnInterval(pollInterval, action, token, TaskCreationOptions.LongRunning, TaskScheduler.Default); - } - - public static bool WaitCancellationRequested(this CancellationToken token, TimeSpan timeout) - { - return token.WaitHandle.WaitOne(timeout); - } - } -} \ No newline at end of file