Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SimpleBufferPool implementation #5326

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3239,6 +3239,15 @@ namespace Akka.IO.Buffers
System.ArraySegment<byte> Rent();
System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> Rent(int minimumSize);
}
public class SimpleBufferPool : Akka.IO.Buffers.IBufferPool
{
public SimpleBufferPool(Akka.Actor.ExtendedActorSystem system, Akka.Configuration.Config config) { }
public SimpleBufferPool(int bufferSize, int capacity) { }
public void Release(System.ArraySegment<byte> buf) { }
public void Release(System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> bufs) { }
public System.ArraySegment<byte> Rent() { }
public System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> Rent(int minimumSize) { }
}
}
namespace Akka.IO
{
Expand Down
54 changes: 54 additions & 0 deletions src/core/Akka/Configuration/Pigeon.conf
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,24 @@ akka {
buffer-pool-limit = 1024
}

# Default implementation of `Akka.IO.Buffers.IBufferPool` interface.
# It pools and reuse allocated buffers that are released by requesters.
# This cuts down on the number of memory allocation and garbage collection
# for sequential socket connections.
simple-buffer-pool {

# Class implementing `Akka.IO.Buffers.IBufferPool` interface, which
# will be created with this configuration.
class = "Akka.IO.Buffers.SimpleBufferPool, Akka"

# Size of a single byte buffer in bytes.
buffer-size = 512

# Maximum number of byte buffer that can be pooled by this byte buffer pool
# instance.
buffer-pool-capacity = 512000
}

# Default implementation of `Akka.IO.Buffers.IBufferPool` interface.
# Instead of maintaining allocated buffers and reusing them
# between different SocketAsyncEventArgs instances, it allocates new
Expand Down Expand Up @@ -741,6 +759,24 @@ akka {
buffer-pool-limit = 1024
}

# Default implementation of `Akka.IO.Buffers.IBufferPool` interface.
# It pools and reuse allocated buffers that are released by requesters.
# This cuts down on the number of memory allocation and garbage collection
# for sequential socket connections.
simple-buffer-pool {

# Class implementing `Akka.IO.Buffers.IBufferPool` interface, which
# will be created with this configuration.
class = "Akka.IO.Buffers.SimpleBufferPool, Akka"

# Size of a single byte buffer in bytes.
buffer-size = 512

# Maximum number of byte buffer that can be pooled by this byte buffer pool
# instance.
buffer-pool-capacity = 512000
}

# A buffer pool used to acquire and release byte buffers from the managed
# heap. Once byte buffer is no longer needed is can be released, landing
# on the pool again, to be reused later. This way we can reduce a GC pressure
Expand Down Expand Up @@ -836,6 +872,24 @@ akka {
buffer-pool-limit = 1024
}

# Default implementation of `Akka.IO.Buffers.IBufferPool` interface.
# It pools and reuse allocated buffers that are released by requesters.
# This cuts down on the number of memory allocation and garbage collection
# for sequential socket connections.
simple-buffer-pool {

# Class implementing `Akka.IO.Buffers.IBufferPool` interface, which
# will be created with this configuration.
class = "Akka.IO.Buffers.SimpleBufferPool, Akka"

# Size of a single byte buffer in bytes.
buffer-size = 512

# Maximum number of byte buffer that can be pooled by this byte buffer pool
# instance.
buffer-pool-capacity = 512000
}

# A buffer pool used to acquire and release byte buffers from the managed
# heap. Once byte buffer is no longer needed is can be released, landing
# on the pool again, to be reused later. This way we can reduce a GC pressure
Expand Down
79 changes: 79 additions & 0 deletions src/core/Akka/IO/Buffers/SimpleBufferPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// //-----------------------------------------------------------------------
// // <copyright file="SimpleBufferPool.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Configuration;

namespace Akka.IO.Buffers
{
using ByteBuffer = ArraySegment<byte>;

public class SimpleBufferPool : IBufferPool
{
private readonly ConcurrentStack<ByteBuffer> _buffers = new ConcurrentStack<ByteBuffer>();
private readonly int _bufferSize;
private readonly int _capacity;

public SimpleBufferPool(ExtendedActorSystem system, Config config) : this(
bufferSize: config.GetInt("buffer-size", 256),
capacity: config.GetInt("buffer-pool-capacity", 250000))
{
}

public SimpleBufferPool(int bufferSize, int capacity)
{
if (bufferSize <= 0) throw new ArgumentException("Buffer size must be positive number", nameof(bufferSize));
if (capacity <= 0) throw new ArgumentException("Number of maximum pool capacity must be positive", nameof(capacity));

_bufferSize = bufferSize;
_capacity = capacity;
}

public ByteBuffer Rent()
{
return _buffers.TryPop(out var buffer)
? buffer
: new ByteBuffer(new byte[_bufferSize], 0, _bufferSize);
}

public IEnumerable<ByteBuffer> Rent(int minimumSize)
{
var buffersToGet = (int)Math.Ceiling((double) minimumSize / _bufferSize);
var result = new ByteBuffer[buffersToGet];
var received = 0;

while (received < buffersToGet)
{
result[received] = Rent();
received++;
}

return result;
}

public void Release(ByteBuffer buf)
{
if(_buffers.Count < _capacity)
_buffers.Push(buf);
// We don't care about pool overrun, we just let the runtime GC them
}

public void Release(IEnumerable<ByteBuffer> bufs)
{
foreach (var buf in bufs)
{
if(_buffers.Count < _capacity)
_buffers.Push(buf);
else
return; // We don't care about the rest, let the runtime GC them
}
}
}
}