Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to support use as a wrapper sink; add sink options #39

Merged
merged 3 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,6 @@ _Pvt_Extensions
.fake/

BenchmarkDotNet.Artifacts/
.idea
*.orig

54 changes: 49 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,53 @@
# Serilog.Sinks.PeriodicBatching
# Serilog.Sinks.PeriodicBatching [![Build status](https://ci.appveyor.com/api/projects/status/w2agqyd8rn0jur9y?svg=true)](https://ci.appveyor.com/project/serilog/serilog-sinks-periodicbatching) [![NuGet Version](http://img.shields.io/nuget/v/Serilog.Sinks.periodicbatching.svg?style=flat)](https://www.nuget.org/packages/Serilog.Sinks.periodicbatching/)

A base for Serilog sinks that batch and asynchronously send events to a slow/remote target.
A wrapper for Serilog sinks that asynchronously emits events in batches, useful when logging to a slow and/or remote target.

[![Build status](https://ci.appveyor.com/api/projects/status/w2agqyd8rn0jur9y?svg=true)](https://ci.appveyor.com/project/serilog/serilog-sinks-periodicbatching) [![NuGet Version](http://img.shields.io/nuget/v/Serilog.Sinks.periodicbatching.svg?style=flat)](https://www.nuget.org/packages/Serilog.Sinks.periodicbatching/)
### Getting started

* [Documentation](https://github.com/serilog/serilog/wiki)
Sinks that, for performance reasons, need to emit events in batches, can be implemented using `PeriodicBatchingSink`
from this package.

Copyright © 2016 Serilog Contributors - Provided under the [Apache License, Version 2.0](http://apache.org/licenses/LICENSE-2.0.html).
First, install the package into your Sink project:

```
dotnet add package Serilog.Sinks.PeriodicBatching
```

Then, instead of implementing Serilog's `ILogEventSink`, implement `IBatchedLogEventSink` in your sink class:

```csharp
class ExampleBatchedSink : IBatchedLogEventSink
{
public async Task EmitBatchAsync(IEnumerable<LogEvent> batch)
{
foreach (var logEvent in batch)
Console.WriteLine(logEvent);
}

public Task OnEmptyBatchAsync() { }
}
```

Finally, in your sink's configuration method, construct a `PeriodicBatchingSink` that wraps your batched sink:

```csharp
public static class LoggerSinkExampleConfiguration
{
public static LoggerConfiguration Example(this LoggerSinkConfiguration loggerSinkConfiguration)
{
var exampleSink = new ExampleBatchedSink();

var batchingOptions = new PeriodicBatchingSinkOptions
{
BatchSize = 100,
Period = TimeSpan.FromSeconds(2),
EagerlyEmitFirstEvent = true,
QueueSizeLimit = 10000
};

var batchingSink = new PeriodicBatchingSink(exampleSink, batchingOptions);

return loggerSinkConfiguration.Sink(batchingSink);
}
}
```
3 changes: 1 addition & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
version: '{build}'
skip_tags: true
image: Visual Studio 2017
configuration: Release
image: Visual Studio 2019
install:
- ps: mkdir -Force ".\build\" | Out-Null
- ps: Invoke-WebRequest "https://raw.githubusercontent.com/dotnet/cli/rel/1.0.0/scripts/obtain/dotnet-install.ps1" -OutFile ".\build\installcli.ps1"
Expand Down
4 changes: 4 additions & 0 deletions serilog-sinks-periodicbatching.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=liveness/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Serilog/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=sink_0027s/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<Description>The periodic batching sink for Serilog</Description>
<VersionPrefix>2.2.1</VersionPrefix>
<VersionPrefix>2.3.0</VersionPrefix>
<Authors>Serilog Contributors</Authors>
<TargetFrameworks>net45;netstandard1.1;netstandard1.2;netstandard2.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2016 Serilog Contributors
// Copyright 2013-2020 Serilog Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2013-2020 Serilog Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Concurrent;
using System.Threading;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2013-2020 Serilog Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Generic;
using System.Threading.Tasks;
using Serilog.Events;

namespace Serilog.Sinks.PeriodicBatching
{
/// <summary>
/// Interface for targets that accept events in batches.
/// </summary>
public interface IBatchedLogEventSink
{
/// <summary>
/// Emit a batch of log events, running asynchronously.
/// </summary>
/// <param name="batch">The batch of events to emit.</param>
Task EmitBatchAsync(IEnumerable<LogEvent> batch);

/// <summary>
/// Allows sinks to perform periodic work without requiring additional threads
/// or timers (thus avoiding additional flush/shut-down complexity).
/// </summary>
Task OnEmptyBatchAsync();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2016 Serilog Contributors
// Copyright 2013-2020 Serilog Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


using System;
using System.Collections.Generic;
using System.Threading.Tasks;
Expand All @@ -21,6 +20,8 @@
using Serilog.Events;
using System.Threading;

// ReSharper disable MemberCanBePrivate.Global, UnusedMember.Global, VirtualMemberNeverOverridden.Global

namespace Serilog.Sinks.PeriodicBatching
{
/// <summary>
Expand All @@ -33,14 +34,16 @@ namespace Serilog.Sinks.PeriodicBatching
/// that want to change this behavior need to either implement from scratch, or
/// embed retry logic in the batch emitting functions.
/// </remarks>
public abstract class PeriodicBatchingSink : ILogEventSink, IDisposable
public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEventSink
{
/// <summary>
/// Constant used to indicate that the internal queue shouldn't be limited.
/// </summary>
public const int NoQueueLimit = BoundedConcurrentQueue<LogEvent>.NonBounded;

readonly IBatchedLogEventSink _batchedLogEventSink;
readonly int _batchSizeLimit;
readonly bool _eagerlyEmitFirstEvent;
readonly BoundedConcurrentQueue<LogEvent> _queue;
readonly BatchedConnectionStatus _status;
readonly Queue<LogEvent> _waitingBatch = new Queue<LogEvent>();
Expand All @@ -53,43 +56,71 @@ public abstract class PeriodicBatchingSink : ILogEventSink, IDisposable
bool _started;

/// <summary>
/// Construct a sink posting to the specified database.
/// Construct a <see cref="PeriodicBatchingSink"/>.
/// </summary>
/// <param name="batchedSink">A <see cref="IBatchedLogEventSink"/> to send log event batches to. Batches and empty
/// batch notifications will not be sent concurrently. When the <see cref="PeriodicBatchingSink"/> is disposed,
/// it will dispose this object if possible.</param>
/// <param name="options">Options controlling behavior of the sink.</param>
public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSinkOptions options)
: this(options)
{
_batchedLogEventSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink));
}

/// <summary>
/// Construct a <see cref="PeriodicBatchingSink"/>. New code should avoid subclassing
/// <see cref="PeriodicBatchingSink"/> and use
/// <see cref="PeriodicBatchingSink(Serilog.Sinks.PeriodicBatching.IBatchedLogEventSink,Serilog.Sinks.PeriodicBatching.PeriodicBatchingSinkOptions)"/>
/// instead.
/// </summary>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time to wait between checking for event batches.</param>
protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period)
: this(batchSizeLimit, period, null)
: this(new PeriodicBatchingSinkOptions
{
BatchSizeLimit = batchSizeLimit,
Period = period,
EagerlyEmitFirstEvent = true
})
{
_batchedLogEventSink = this;
}

/// <summary>
/// Construct a sink posting to the specified database.
/// Construct a <see cref="PeriodicBatchingSink"/>. New code should avoid subclassing
/// <see cref="PeriodicBatchingSink"/> and use
/// <see cref="PeriodicBatchingSink(Serilog.Sinks.PeriodicBatching.IBatchedLogEventSink,Serilog.Sinks.PeriodicBatching.PeriodicBatchingSinkOptions)"/>
/// instead.
/// </summary>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time to wait between checking for event batches.</param>
/// <param name="queueLimit">Maximum number of events in the queue - use <see cref="NoQueueLimit"/> for an unbounded queue.</param>
protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period, int queueLimit)
: this(batchSizeLimit, period, (int?)queueLimit)
: this(new PeriodicBatchingSinkOptions
{
BatchSizeLimit = batchSizeLimit,
Period = period,
EagerlyEmitFirstEvent = true,
QueueLimit = queueLimit
})
{
_batchedLogEventSink = this;
}

/// <summary>
/// Construct a sink posting to the specified database.
/// </summary>
/// <param name="batchSizeLimit">The maximum number of events to include in a single batch.</param>
/// <param name="period">The time to wait between checking for event batches.</param>
/// <param name="queueLimit">Maximum number of events in the queue.</param>
protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period, int? queueLimit)
PeriodicBatchingSink(PeriodicBatchingSinkOptions options)
{
if (batchSizeLimit <= 0)
throw new ArgumentOutOfRangeException(nameof(batchSizeLimit), "The batch size limit must be greater than zero.");
if (period <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(period), "The period must be greater than zero.");
if (options == null) throw new ArgumentNullException(nameof(options));

_batchSizeLimit = batchSizeLimit;
_queue = new BoundedConcurrentQueue<LogEvent>(queueLimit);
_status = new BatchedConnectionStatus(period);
if (options.BatchSizeLimit <= 0)
throw new ArgumentOutOfRangeException(nameof(options), "The batch size limit must be greater than zero.");
if (options.Period <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(options), "The period must be greater than zero.");

_batchSizeLimit = options.BatchSizeLimit;
_queue = new BoundedConcurrentQueue<LogEvent>(options.QueueLimit);
_status = new BatchedConnectionStatus(options.Period);
_eagerlyEmitFirstEvent = options.EagerlyEmitFirstEvent;
_timer = new PortableTimer(cancel => OnTick());
}

Expand All @@ -108,9 +139,12 @@ void CloseAndFlush()
// This is the place where SynchronizationContext.Current is unknown and can be != null
// so we prevent possible deadlocks here for sync-over-async downstream implementations
ResetSyncContextAndWait(OnTick);

if (_batchedLogEventSink != this)
(_batchedLogEventSink as IDisposable)?.Dispose();
}

void ResetSyncContextAndWait(Func<Task> taskFactory)
static void ResetSyncContextAndWait(Func<Task> taskFactory)
{
var prevContext = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(null);
Expand Down Expand Up @@ -174,21 +208,20 @@ async Task OnTick()
bool batchWasFull;
do
{
LogEvent next;
while (_waitingBatch.Count < _batchSizeLimit &&
_queue.TryDequeue(out next))
_queue.TryDequeue(out var next))
{
if (CanInclude(next))
_waitingBatch.Enqueue(next);
}

if (_waitingBatch.Count == 0)
{
await OnEmptyBatchAsync();
await _batchedLogEventSink.OnEmptyBatchAsync();
return;
}

await EmitBatchAsync(_waitingBatch);
await _batchedLogEventSink.EmitBatchAsync(_waitingBatch);

batchWasFull = _waitingBatch.Count >= _batchSizeLimit;
_waitingBatch.Clear();
Expand All @@ -208,8 +241,7 @@ async Task OnTick()

if (_status.ShouldDropQueue)
{
LogEvent evt;
while (_queue.TryDequeue(out evt)) { }
while (_queue.TryDequeue(out _)) { }
}

lock (_stateLock)
Expand Down Expand Up @@ -250,11 +282,20 @@ public void Emit(LogEvent logEvent)
if (_unloading) return;
if (!_started)
{
// Special handling to try to get the first event across as quickly
// as possible to show we're alive!
_queue.TryEnqueue(logEvent);
_started = true;
SetTimer(TimeSpan.Zero);

if (_eagerlyEmitFirstEvent)
{
// Special handling to try to get the first event across as quickly
// as possible to show we're alive!
SetTimer(TimeSpan.Zero);
}
else
{
SetTimer(_status.NextInterval);
}

return;
}
}
Expand All @@ -267,9 +308,10 @@ public void Emit(LogEvent logEvent)
/// Determine whether a queued log event should be included in the batch. If
/// an override returns false, the event will be dropped.
/// </summary>
/// <param name="evt"></param>
/// <returns></returns>
protected virtual bool CanInclude(LogEvent evt)
/// <param name="logEvent">An event to test for inclusion.</param>
/// <returns>True if the event should be included in the batch; otherwise, false.</returns>
// ReSharper disable once UnusedParameter.Global
protected virtual bool CanInclude(LogEvent logEvent)
{
return true;
}
Expand All @@ -296,5 +338,8 @@ protected virtual async Task OnEmptyBatchAsync()
{
OnEmptyBatch();
}

Task IBatchedLogEventSink.EmitBatchAsync(IEnumerable<LogEvent> batch) => EmitBatchAsync(batch);
Task IBatchedLogEventSink.OnEmptyBatchAsync() => OnEmptyBatchAsync();
}
}
Loading