From 5b1139ee07422cb348bbc5d21f4128545118acf7 Mon Sep 17 00:00:00 2001 From: VIEWPOINT Date: Sat, 20 Feb 2016 12:13:44 +0900 Subject: [PATCH] Improved pooling of buffers when a buffer was released in other thread. Motivation: When buffer was released in other thread, this buffer couldn't be released. I think that need to release a buffer in other thread for memory effectiveness. Modifications: Implement WeakOrderQueue for releasing a buffer that other thread owned. Result: When buffer was released in othread thread, owned thread can use this buffer. --- src/DotNetty.Common/ThreadLocalPool.cs | 384 +++++++++++++++++- .../DotNetty.Common.Tests.csproj | 1 + .../ThreadLocalPoolTest.cs | 154 +++++++ 3 files changed, 525 insertions(+), 14 deletions(-) create mode 100644 test/DotNetty.Common.Tests/ThreadLocalPoolTest.cs diff --git a/src/DotNetty.Common/ThreadLocalPool.cs b/src/DotNetty.Common/ThreadLocalPool.cs index 2a910ba4a..7983d430e 100644 --- a/src/DotNetty.Common/ThreadLocalPool.cs +++ b/src/DotNetty.Common/ThreadLocalPool.cs @@ -12,50 +12,375 @@ public class ThreadLocalPool { public sealed class Handle { + internal int lastRecycledId; + internal int recycleId; + public object Value; - internal readonly Stack Stack; + internal Stack Stack; internal Handle(Stack stack) { this.Stack = stack; } - public bool Release(T value) + public void Release(T value) where T : class { Contract.Requires(value == this.Value, "value differs from one backed by this handle."); Stack stack = this.Stack; - if (stack.Thread != Thread.CurrentThread) + Thread thread = Thread.CurrentThread; + if (stack.Thread == thread) + { + stack.Push(this); + return; + } + + Dictionary queueDictionary = DelayedPool.Value; + WeakOrderQueue queue; + if (!queueDictionary.TryGetValue(stack, out queue)) + { + var newQueue = new WeakOrderQueue(stack, thread); + queue = newQueue; + queueDictionary.Add(stack, queue); + } + queue.Add(this); + } + } + + internal sealed class WeakOrderQueue + { + const int LinkCapacity = 16; + + sealed class Link + { + private int readIndex; + private int writeIndex; + + internal readonly Handle[] elements; + internal Link next; + + internal int ReadIndex + { + get { return this.readIndex; } + set { this.readIndex = value; } + } + + internal int WriteIndex + { + get { return Volatile.Read(ref this.writeIndex); } + set { Volatile.Write(ref this.writeIndex, value); } + } + + internal Link() + { + this.elements = new Handle[LinkCapacity]; + } + } + + Link head, tail; + internal WeakOrderQueue next; + internal WeakReference ownerThread; + int id = Interlocked.Increment(ref idSource); + + internal bool IsEmpty + { + get { return this.tail.ReadIndex == this.tail.WriteIndex; } + } + + internal WeakOrderQueue(Stack stack, Thread thread) + { + Contract.Requires(stack != null); + + this.ownerThread = new WeakReference(thread); + this.head = this.tail = new Link(); + lock (stack) + { + this.next = stack.HeadQueue; + stack.HeadQueue = this; + } + } + + internal void Add(Handle handle) + { + Contract.Requires(handle != null); + + handle.lastRecycledId = this.id; + + Link tail = this.tail; + int writeIndex = tail.WriteIndex; + if (writeIndex == LinkCapacity) + { + this.tail = tail = tail.next = new Link(); + writeIndex = tail.WriteIndex; + } + tail.elements[writeIndex] = handle; + handle.Stack = null; + tail.WriteIndex = writeIndex + 1; + } + + internal bool Transfer(Stack dst) + { + // This method must be called by owner thread. + Contract.Requires(dst != null); + + Link head = this.head; + if (head == null) { return false; } - if (stack.Count == stack.Owner.MaxCapacity) + if (head.ReadIndex == LinkCapacity) + { + if (head.next == null) + { + return false; + } + this.head = head = head.next; + } + + int srcStart = head.ReadIndex; + int srcEnd = head.WriteIndex; + int srcSize = srcEnd - srcStart; + if (srcSize == 0) { return false; } - stack.Push(this); - return true; + int dstSize = dst.size; + int expectedCapacity = dstSize + srcSize; + + if (expectedCapacity > dst.elements.Length) + { + int actualCapacity = dst.IncreaseCapacity(expectedCapacity); + srcEnd = Math.Min(srcStart + actualCapacity - dstSize, srcEnd); + } + + if (srcStart != srcEnd) + { + Handle[] srcElems = head.elements; + Handle[] dstElems = dst.elements; + int newDstSize = dstSize; + for (int i = srcStart; i < srcEnd; i++) + { + Handle element = srcElems[i]; + if (element.recycleId == 0) + { + element.recycleId = element.lastRecycledId; + } + else if (element.recycleId != element.lastRecycledId) + { + throw new InvalidOperationException("recycled already"); + } + element.Stack = dst; + dstElems[newDstSize++] = element; + srcElems[i] = null; + } + dst.size = newDstSize; + + if (srcEnd == LinkCapacity && head.next != null) + { + this.head = head.next; + } + + head.ReadIndex = srcEnd; + return true; + } + else + { + // The destination stack is full already. + return false; + } } } - internal sealed class Stack : Stack + internal sealed class Stack { - public readonly ThreadLocalPool Owner; - public readonly Thread Thread; + internal readonly ThreadLocalPool Parent; + internal readonly Thread Thread; + + internal Handle[] elements; + + int maxCapacity; + internal int size; + + WeakOrderQueue headQueue; + WeakOrderQueue cursorQueue; + WeakOrderQueue prevQueue; + + internal WeakOrderQueue HeadQueue + { + get { return Volatile.Read(ref this.headQueue); } + set { Volatile.Write(ref this.headQueue, value); } + } + + internal int Size + { + get { return this.size; } + } - public Stack(int initialCapacity, ThreadLocalPool owner, Thread thread) - : base(initialCapacity) + internal Stack(int maxCapacity, ThreadLocalPool parent, Thread thread) { - this.Owner = owner; + this.maxCapacity = maxCapacity; + this.Parent = parent; this.Thread = thread; + + this.elements = new Handle[Math.Min(InitialCapacity, maxCapacity)]; + } + + internal int IncreaseCapacity(int expectedCapacity) + { + int newCapacity = this.elements.Length; + int maxCapacity = this.maxCapacity; + do + { + newCapacity <<= 1; + } + while (newCapacity < expectedCapacity && newCapacity < maxCapacity); + + newCapacity = Math.Min(newCapacity, maxCapacity); + if (newCapacity != this.elements.Length) + { + Array.Resize(ref this.elements, newCapacity); + } + + return newCapacity; + } + + internal void Push(Handle item) + { + Contract.Requires(item != null); + if ((item.recycleId | item.lastRecycledId) != 0) + { + throw new InvalidOperationException("released already"); + } + item.recycleId = item.lastRecycledId = ownThreadId; + + int size = this.size; + if (size >= this.maxCapacity) + { + // Hit the maximum capacity - drop the possibly youngest object. + return; + } + if (size == this.elements.Length) + { + Array.Resize(ref this.elements, Math.Min(size << 1, this.maxCapacity)); + } + + this.elements[size] = item; + this.size = size + 1; + } + + internal bool TryPop(out Handle item) + { + int size = this.size; + if (size == 0) + { + if (!this.Scavenge()) + { + item = null; + return false; + } + size = this.size; + } + size--; + Handle ret = this.elements[size]; + if (ret.lastRecycledId != ret.recycleId) + { + throw new InvalidOperationException("recycled multiple times"); + } + ret.recycleId = 0; + ret.lastRecycledId = 0; + item = ret; + this.size = size; + + return true; + } + + bool Scavenge() + { + // continue an existing scavenge, if any + if (this.ScavengeSome()) + { + return true; + } + + // reset our scavenge cursor + this.prevQueue = null; + this.cursorQueue = this.HeadQueue; + return false; + } + + bool ScavengeSome() + { + WeakOrderQueue cursor = this.cursorQueue; + if (cursor == null) + { + cursor = this.HeadQueue; + if (cursor == null) + { + return false; + } + } + + bool success = false; + WeakOrderQueue prev = this.prevQueue; + do + { + if (cursor.Transfer(this)) + { + success = true; + break; + } + + WeakOrderQueue next = cursor.next; + Thread ownerThread; + if (!cursor.ownerThread.TryGetTarget(out ownerThread)) + { + // If the thread associated with the queue is gone, unlink it, after + // performing a volatile read to confirm there is no data left to collect. + // We never unlink the first queue, as we don't want to synchronize on updating the head. + if (!cursor.IsEmpty) + { + for (;;) + { + if (cursor.Transfer(this)) + { + success = true; + } + else + { + break; + } + } + } + if (prev != null) + { + prev.next = next; + } + } + else + { + prev = cursor; + } + + cursor = next; + } + while (cursor != null && !success); + + this.prevQueue = prev; + this.cursorQueue = cursor; + return success; } } internal static readonly int DefaultMaxCapacity = 262144; internal static readonly int InitialCapacity = Math.Min(256, DefaultMaxCapacity); + static int idSource = int.MinValue; + static int ownThreadId = Interlocked.Increment(ref idSource); + + internal static readonly ThreadLocal> DelayedPool = + new ThreadLocal>(() => new Dictionary()); public ThreadLocalPool(int maxCapacity) { @@ -96,7 +421,7 @@ public ThreadLocalPool(Func valueFactory, int maxCapacity, bool preCr Stack InitializeStorage() { - var stack = new Stack(InitialCapacity, this, Thread.CurrentThread); + var stack = new Stack(this.MaxCapacity, this, Thread.CurrentThread); if (this.preCreate) { for (int i = 0; i < this.MaxCapacity; i++) @@ -110,7 +435,11 @@ Stack InitializeStorage() public T Take() { Stack stack = this.threadLocal.Value; - Handle handle = stack.Count == 0 ? this.CreateValue(stack) : stack.Pop(); + Handle handle; + if (!stack.TryPop(out handle)) + { + handle = this.CreateValue(stack); + } return (T)handle.Value; } @@ -121,5 +450,32 @@ Handle CreateValue(Stack stack) handle.Value = value; return handle; } + + public bool Release(T o, Handle handle) + { + if (handle.Stack.Parent != this) + { + return false; + } + + handle.Release(o); + return true; + } + + internal int ThreadLocalCapacity + { + get + { + return this.threadLocal.Value.elements.Length; + } + } + + internal int ThreadLocalSize + { + get + { + return this.threadLocal.Value.Size; + } + } } } \ No newline at end of file diff --git a/test/DotNetty.Common.Tests/DotNetty.Common.Tests.csproj b/test/DotNetty.Common.Tests/DotNetty.Common.Tests.csproj index eb0bd5418..876f20a50 100644 --- a/test/DotNetty.Common.Tests/DotNetty.Common.Tests.csproj +++ b/test/DotNetty.Common.Tests/DotNetty.Common.Tests.csproj @@ -82,6 +82,7 @@ + diff --git a/test/DotNetty.Common.Tests/ThreadLocalPoolTest.cs b/test/DotNetty.Common.Tests/ThreadLocalPoolTest.cs new file mode 100644 index 000000000..62d1bcf28 --- /dev/null +++ b/test/DotNetty.Common.Tests/ThreadLocalPoolTest.cs @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace DotNetty.Buffers.Tests +{ + using System; + using System.Threading.Tasks; + using DotNetty.Common; + using Xunit; + + public class ThreadLocalPoolTest + { + [Fact] + public void MultipleReleaseTest() + { + RecyclableObject obj = RecyclableObject.NewInstance(); + Assert.True(obj.Release()); + var exception = Assert.ThrowsAny(() => obj.Release()); + Assert.True(exception != null); + } + + [Fact] + public void ReleaseTest() + { + RecyclableObject obj = RecyclableObject.NewInstance(); + Assert.True(obj.Release()); + RecyclableObject obj2 = RecyclableObject.NewInstance(); + Assert.Same(obj, obj2); + Assert.True(obj2.Release()); + } + + [Fact] + public void RecycleAtDifferentThreadTest() + { + RecyclableObject obj = RecyclableObject.NewInstance(); + + RecyclableObject prevObject = obj; + Task.Run(() => { Assert.True(obj.Release()); }).Wait(); + obj = RecyclableObject.NewInstance(); + + Assert.True(obj == prevObject); + Assert.True(obj.Release()); + } + + class RecyclableObject + { + internal static readonly ThreadLocalPool pool = + new ThreadLocalPool(handle => + new RecyclableObject(handle), 1, true); + + readonly ThreadLocalPool.Handle handle; + + public RecyclableObject(ThreadLocalPool.Handle handle) + { + this.handle = handle; + } + + public static RecyclableObject NewInstance() + { + return pool.Take(); + } + + public bool Release() + { + return pool.Release(this, this.handle); + } + } + + class HandledObject + { + internal readonly ThreadLocalPool.Handle handle; + + internal HandledObject(ThreadLocalPool.Handle handle) + { + this.handle = handle; + } + + public void Release() + { + this.handle.Release(this); + } + } + + [Fact] + public void MaxCapacityTest() + { + this.MaxCapacityTest(300); + Random rand = new Random(); + for (int i = 0; i < 50; i++) + { + this.MaxCapacityTest(rand.Next((1000) + 256)); // 256 - 1256 + } + } + + void MaxCapacityTest(int maxCapacity) + { + var recycler = new ThreadLocalPool(handle => new HandledObject(handle), maxCapacity); + + var objects = new HandledObject[maxCapacity * 3]; + for (int i = 0; i < objects.Length; i++) + { + objects[i] = recycler.Take(); + } + + for (int i = 0; i < objects.Length; i++) + { + objects[i].Release(); + objects[i] = null; + } + Assert.Equal(maxCapacity, recycler.ThreadLocalCapacity); + } + + [Fact] + public void MaxCapacityWithRecycleAtDifferentThreadTest() + { + const int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY + var recycler = new ThreadLocalPool(handle => new HandledObject(handle), maxCapacity); + + // Borrow 2 * maxCapacity objects. + // Return the half from the same thread. + // Return the other half from the different thread. + + HandledObject[] array = new HandledObject[maxCapacity * 3]; + for (int i = 0; i < array.Length; i++) + { + array[i] = recycler.Take(); + } + + for (int i = 0; i < maxCapacity; i++) + { + array[i].Release(); + } + + Task.Run(() => + { + for (int i = maxCapacity; i < array.Length; i++) + { + array[i].Release(); + } + }).Wait(); + + Assert.Equal(recycler.ThreadLocalCapacity, maxCapacity); + Assert.Equal(recycler.ThreadLocalSize, maxCapacity); + + for (int i = 0; i < array.Length; i++) + { + recycler.Take(); + } + + Assert.Equal(maxCapacity, recycler.ThreadLocalCapacity); + Assert.Equal(0, recycler.ThreadLocalSize); + } + } +}