Skip to content

Commit

Permalink
Buffer pools are not shared across instances.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Apr 3, 2019
1 parent 8ea803c commit f41f1e8
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 59 deletions.
2 changes: 0 additions & 2 deletions cs/playground/SumStore/RecoveryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ class RecoveryTest
const long completePendingInterval = 1 << 12;
const int checkpointInterval = 10 * 1000;
readonly int threadCount;
readonly int numActiveThreads;
FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> fht;

BlockingCollection<Input[]> inputArrays;

public RecoveryTest(int threadCount)
{
this.threadCount = threadCount;
numActiveThreads = 0;

// Create FASTER index
var log = Devices.CreateLogDevice("logs\\hlog");
Expand Down
9 changes: 5 additions & 4 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
#endregion

/// <summary>
/// Read buffer pool
/// Buffer pool
/// </summary>
protected SectorAlignedBufferPool readBufferPool;
protected SectorAlignedBufferPool bufferPool;

/// <summary>
/// Read cache
Expand Down Expand Up @@ -492,7 +492,7 @@ protected void Initialize(long firstValidAddress)
Debug.Assert(firstValidAddress <= PageSize);
Debug.Assert(PageSize >= GetRecordSize(0));

readBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize);
bufferPool = new SectorAlignedBufferPool(1, sectorSize);

long tailPage = firstValidAddress >> LogPageSizeBits;
int tailPageIndex = (int)(tailPage % BufferSize);
Expand Down Expand Up @@ -551,6 +551,7 @@ public virtual void Dispose()

if (ownedEpoch)
epoch.Dispose();
bufferPool.Free();
}

/// <summary>
Expand Down Expand Up @@ -1152,7 +1153,7 @@ public void RecoveryReset(long tailAddress, long headAddress)
uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset);
alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1));

var record = readBufferPool.Get((int)alignedReadLength);
var record = bufferPool.Get((int)alignedReadLength);
record.valid_offset = (int)(fileOffset - alignedFileOffset);
record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset));
record.required_bytes = numBytes;
Expand Down
14 changes: 5 additions & 9 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public unsafe sealed class GenericAllocator<Key, Value> : AllocatorBase<Key, Val
private const int ObjectBlockSize = 100 * (1 << 20);
// Tail offsets per segment, in object log
public readonly long[] segmentOffsets;
// Buffer pool for object log related work
SectorAlignedBufferPool ioBufferPool;
// Record sizes
private static readonly int recordSize = Utility.GetSize(default(Record<Key, Value>));
private readonly SerializerSettings<Key, Value> SerializerSettings;
Expand Down Expand Up @@ -66,8 +64,6 @@ public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> ser
if (objectLogDevice == null)
throw new Exception("Objects in key/value, but object log not provided during creation of FASTER instance");
}

ioBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize);
}

public override void Initialize()
Expand Down Expand Up @@ -294,7 +290,7 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
if (localSegmentOffsets == null) localSegmentOffsets = segmentOffsets;

var src = values[flushPage % BufferSize];
var buffer = ioBufferPool.Get((int)numBytesToWrite);
var buffer = bufferPool.Get((int)numBytesToWrite);

if (aligned_start < start && (KeyHasObjects() || ValueHasObjects()))
{
Expand Down Expand Up @@ -388,7 +384,7 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
valueSerializer.BeginSerialize(ms);
}

var _objBuffer = ioBufferPool.Get(_s.Length);
var _objBuffer = bufferPool.Get(_s.Length);

asyncResult.done = new AutoResetEvent(false);

Expand Down Expand Up @@ -459,7 +455,7 @@ protected override void ReadAsync<TContext>(
ulong alignedSourceAddress, int destinationPageIndex, uint aligned_read_length,
IOCompletionCallback callback, PageAsyncReadResult<TContext> asyncResult, IDevice device, IDevice objlogDevice)
{
asyncResult.freeBuffer1 = readBufferPool.Get((int)aligned_read_length);
asyncResult.freeBuffer1 = bufferPool.Get((int)aligned_read_length);
asyncResult.freeBuffer1.required_bytes = (int)aligned_read_length;

if (!(KeyHasObjects() || ValueHasObjects()))
Expand Down Expand Up @@ -584,7 +580,7 @@ private void AsyncReadPageWithObjectsCallback<TContext>(uint errorCode, uint num
if (size > int.MaxValue)
throw new Exception("Unable to read object page, total size greater than 2GB: " + size);

var objBuffer = ioBufferPool.Get((int)size);
var objBuffer = bufferPool.Get((int)size);
result.freeBuffer2 = objBuffer;
var alignedLength = (size + (sectorSize - 1)) & ~(sectorSize - 1);

Expand Down Expand Up @@ -612,7 +608,7 @@ private void AsyncReadPageWithObjectsCallback<TContext>(uint errorCode, uint num
uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset);
alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1));

var record = readBufferPool.Get((int)alignedReadLength);
var record = bufferPool.Get((int)alignedReadLength);
record.valid_offset = (int)(fileOffset - alignedFileOffset);
record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset));
record.required_bytes = numBytes;
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Device/ManagedLocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ManagedLocalStorageDevice : StorageDeviceBase
public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false)
: base(filename, GetSectorSize(filename))
{
pool = SectorAlignedBufferPool.GetPool(1, 1);
pool = new SectorAlignedBufferPool(1, 1);

this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
Expand Down Expand Up @@ -162,6 +162,7 @@ public override void Close()
{
foreach (var logHandle in logHandles.Values)
logHandle.Dispose();
pool.Free();
}


Expand Down
52 changes: 9 additions & 43 deletions cs/src/core/Utilities/BufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,51 +97,16 @@ public override string ToString()
/// </summary>
public class SectorAlignedBufferPool
{
/// <summary>
/// Disable buffer pool
/// </summary>
public static bool Disabled = false;

private const int levels = 32;
private readonly int recordSize;
private readonly int sectorSize;
private readonly ConcurrentQueue<SectorAlignedMemory>[] queue;

private static SafeConcurrentDictionary<Tuple<int, int>, SectorAlignedBufferPool> _instances
= new SafeConcurrentDictionary<Tuple<int, int>, SectorAlignedBufferPool>();

/// <summary>
/// Clear buffer pool
/// </summary>
public static void Clear()
{
foreach (var pool in _instances.Values)
{
pool.Free();
}
_instances.Clear();
}

/// <summary>
/// Print contents of buffer pool
/// </summary>
public static void PrintAll()
{
foreach (var kvp in _instances)
{
Console.WriteLine("Pool Key: {0}", kvp.Key);
kvp.Value.Print();
}
}

/// <summary>
/// Get cached instance of buffer pool for specified params
/// </summary>
/// <param name="recordSize">Record size</param>
/// <param name="sectorSize">Sector size</param>
/// <returns></returns>
public static SectorAlignedBufferPool GetPool(int recordSize, int sectorSize)
{
return
_instances.GetOrAdd(new Tuple<int, int>(recordSize, sectorSize),
t => new SectorAlignedBufferPool(t.Item1, t.Item2));
}

/// <summary>
/// Constructor
/// </summary>
Expand All @@ -159,14 +124,15 @@ public SectorAlignedBufferPool(int recordSize, int sectorSize)
/// </summary>
/// <param name="page"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe void Return(SectorAlignedMemory page)
public void Return(SectorAlignedMemory page)
{
Debug.Assert(queue[page.level] != null);
page.available_bytes = 0;
page.required_bytes = 0;
page.valid_offset = 0;
Array.Clear(page.buffer, 0, page.buffer.Length);
queue[page.level].Enqueue(page);
if (!Disabled)
queue[page.level].Enqueue(page);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -201,7 +167,7 @@ public unsafe SectorAlignedMemory Get(int numRecords)
Interlocked.CompareExchange(ref queue[index], localPool, null);
}

if (queue[index].TryDequeue(out SectorAlignedMemory page))
if (!Disabled && queue[index].TryDequeue(out SectorAlignedMemory page))
{
return page;
}
Expand Down

0 comments on commit f41f1e8

Please sign in to comment.