Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed May 13, 2021
2 parents 36de988 + 31db806 commit 2a511bb
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 53 deletions.
25 changes: 10 additions & 15 deletions cs/samples/MemOnlyCache/CacheSizeTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace MemOnlyCache
public class CacheSizeTracker : IObserver<IFasterScanIterator<CacheKey, CacheValue>>
{
readonly FasterKV<CacheKey, CacheValue> store;
long storeSize;
long storeHeapSize;

/// <summary>
/// Target size request for FASTER
Expand All @@ -23,34 +23,29 @@ public class CacheSizeTracker : IObserver<IFasterScanIterator<CacheKey, CacheVal
/// <summary>
/// Total size (bytes) used by FASTER including index and log
/// </summary>
public long TotalSizeBytes => storeSize + store.OverflowBucketCount * 64;
public long TotalSizeBytes => storeHeapSize + store.IndexSize * 64 + store.Log.MemorySizeBytes + (store.ReadCache != null ? store.ReadCache.MemorySizeBytes : 0) + store.OverflowBucketCount * 64;

/// <summary>
/// Class to track and update cache size
/// </summary>
/// <param name="store">FASTER store instance</param>
/// <param name="memorySizeBits">Memory size (bits) used by FASTER log settings</param>
/// <param name="targetMemoryBytes">Target memory size of FASTER in bytes</param>
public CacheSizeTracker(FasterKV<CacheKey, CacheValue> store, int memorySizeBits, long targetMemoryBytes = long.MaxValue)
/// <param name="targetMemoryBytes">Initial target memory size of FASTER in bytes</param>
public CacheSizeTracker(FasterKV<CacheKey, CacheValue> store, long targetMemoryBytes = long.MaxValue)
{
this.store = store;
if (targetMemoryBytes < long.MaxValue)
{
Console.WriteLine("**** Setting initial target memory: {0,11:N2}KB", targetMemoryBytes / 1024.0);
this.TargetSizeBytes = targetMemoryBytes;
}
this.TargetSizeBytes = targetMemoryBytes;

storeSize = store.IndexSize * 64 + store.Log.MemorySizeBytes;
Console.WriteLine("Index size: {0}", store.IndexSize * 64);
Console.WriteLine("Total store size: {0}", TotalSizeBytes);

// Register subscriber to receive notifications of log evictions from memory
store.Log.SubscribeEvictions(this);

// Include the separate read cache, if enabled
if (store.ReadCache != null)
{
storeSize += store.ReadCache.MemorySizeBytes;
store.ReadCache.SubscribeEvictions(this);
}
}

/// <summary>
Expand All @@ -72,7 +67,7 @@ public void SetTargetSizeBytes(long newTargetSize)
/// Add to the tracked size of FASTER. This is called by IFunctions as well as the subscriber to evictions (OnNext)
/// </summary>
/// <param name="size"></param>
public void AddTrackedSize(int size) => Interlocked.Add(ref storeSize, size);
public void AddTrackedSize(int size) => Interlocked.Add(ref storeHeapSize, size);

/// <summary>
/// Subscriber to pages as they are getting evicted from main memory
Expand All @@ -90,9 +85,9 @@ public void OnNext(IFasterScanIterator<CacheKey, CacheValue> iter)
AddTrackedSize(-size);

// Adjust empty page count to drive towards desired memory utilization
if (TotalSizeBytes > TargetSizeBytes)
if (TotalSizeBytes > TargetSizeBytes && store.Log.AllocatableMemorySizeBytes >= store.Log.MemorySizeBytes)
store.Log.EmptyPageCount++;
else if (TotalSizeBytes < TargetSizeBytes)
else if (TotalSizeBytes < TargetSizeBytes && store.Log.AllocatableMemorySizeBytes <= store.Log.MemorySizeBytes)
store.Log.EmptyPageCount--;
}

Expand Down
2 changes: 1 addition & 1 deletion cs/samples/MemOnlyCache/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static void Main()
var numBucketBits = (int)Math.Ceiling(Math.Log2(numRecords));

h = new FasterKV<CacheKey, CacheValue>(1L << numBucketBits, logSettings, comparer: new CacheKey());
sizeTracker = new CacheSizeTracker(h, logSettings.MemorySizeBits, targetSize);
sizeTracker = new CacheSizeTracker(h, targetSize);

// Initially populate store
PopulateStore(numRecords);
Expand Down
44 changes: 28 additions & 16 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// <summary>
/// HeadOFfset lag address
/// </summary>
protected long HeadOffsetLagAddress;
internal long HeadOffsetLagAddress;

/// <summary>
/// Log mutable fraction
Expand Down Expand Up @@ -565,6 +565,9 @@ internal void WriteAsync<TContext>(IntPtr alignedSourceAddress, ulong alignedDes
/// <param name="page">Page number to be cleared</param>
/// <param name="offset">Offset to clear from (if partial clear)</param>
internal abstract void ClearPage(long page, int offset = 0);

internal abstract void FreePage(long page);

/// <summary>
/// Write page (async)
/// </summary>
Expand Down Expand Up @@ -729,6 +732,11 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
AlignedPageSizeBytes = ((PageSize + (sectorSize - 1)) & ~(sectorSize - 1));
}

/// <summary>
/// Number of extra overflow pages allocated
/// </summary>
internal abstract int OverflowPageCount { get; }

/// <summary>
/// Initialize allocator
/// </summary>
Expand Down Expand Up @@ -791,6 +799,10 @@ public virtual void Dispose()
OnEvictionObserver?.OnCompleted();
}

/// <summary>
/// Number of pages in circular buffer that are allocated
/// </summary>
public int AllocatedPageCount;

/// <summary>
/// How many pages do we leave empty in the in-memory buffer (between 0 and BufferSize-1)
Expand Down Expand Up @@ -818,7 +830,7 @@ public int EmptyPageCount
}

// Force eviction now if empty page count has increased
if (value > oldEPC)
if (value >= oldEPC)
{
bool prot = true;
if (!epoch.ThisInstanceProtected())
Expand Down Expand Up @@ -969,19 +981,21 @@ public long TryAllocate(int numSlots = 1)
#region HANDLE PAGE OVERFLOW
if (localTailPageOffset.Offset > PageSize)
{
int pageIndex = localTailPageOffset.Page + 1;

// All overflow threads try to shift addresses
long shiftAddress = ((long)(localTailPageOffset.Page + 1)) << LogPageSizeBits;
long shiftAddress = ((long)pageIndex) << LogPageSizeBits;
PageAlignedShiftReadOnlyAddress(shiftAddress);
PageAlignedShiftHeadAddress(shiftAddress);

if (offset > PageSize)
{
if (NeedToWait(localTailPageOffset.Page + 1))
if (NeedToWait(pageIndex))
return 0; // RETRY_LATER
return -1; // RETRY_NOW
}

if (NeedToWait(localTailPageOffset.Page + 1))
if (NeedToWait(pageIndex))
{
// Reset to end of page so that next attempt can retry
localTailPageOffset.Offset = PageSize;
Expand All @@ -990,20 +1004,21 @@ public long TryAllocate(int numSlots = 1)
}

// The thread that "makes" the offset incorrect should allocate next page and set new tail
if (CannotAllocate(localTailPageOffset.Page + 1))
if (CannotAllocate(pageIndex))
{
// Reset to end of page so that next attempt can retry
localTailPageOffset.Offset = PageSize;
Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset);
return -1; // RETRY_NOW
}

// Allocate this page, if needed
if (!IsAllocated(pageIndex % BufferSize))
AllocatePage(pageIndex % BufferSize);

// Allocate next page in advance, if needed
int nextPageIndex = (localTailPageOffset.Page + 2) % BufferSize;
if ((!IsAllocated(nextPageIndex)))
{
AllocatePage(nextPageIndex);
}
if (!IsAllocated((pageIndex + 1) % BufferSize))
AllocatePage((pageIndex + 1) % BufferSize);

localTailPageOffset.Page++;
localTailPageOffset.Offset = numSlots;
Expand Down Expand Up @@ -1069,7 +1084,7 @@ public long TryAllocateRetryNow(int numSlots = 1)
return logicalAddress;
}


private bool CannotAllocate(int page) => page >= BufferSize + (ClosedUntilAddress >> LogPageSizeBits);

private bool NeedToWait(int page) => page >= BufferSize + (FlushedUntilAddress >> LogPageSizeBits);
Expand Down Expand Up @@ -1236,10 +1251,7 @@ public void OnPagesClosed(long newSafeHeadAddress)
int closePage = (int)(closePageAddress >> LogPageSizeBits);
int closePageIndex = closePage % BufferSize;

if (!IsAllocated(closePageIndex))
AllocatePage(closePageIndex);
else
ClearPage(closePage);
FreePage(closePage);

Utility.MonotonicUpdate(ref PageStatusIndicator[closePageIndex].LastClosedUntilAddress, closePageAddress + PageSize, out _);
ShiftClosedUntilAddress();
Expand Down
34 changes: 34 additions & 0 deletions cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ public unsafe sealed class BlittableAllocator<Key, Value> : AllocatorBase<Key, V
private static readonly int keySize = Utility.GetSize(default(Key));
private static readonly int valueSize = Utility.GetSize(default(Value));

private readonly OverflowPool<PageUnit> overflowPagePool;

public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null, Action<CommitInfo> flushCallback = null)
: base(settings, comparer, evictCallback, epoch, flushCallback)
{
overflowPagePool = new OverflowPool<PageUnit>(4, p => p.handle.Free());

values = new byte[BufferSize][];
handles = new GCHandle[BufferSize];
pointers = new long[BufferSize];
Expand Down Expand Up @@ -107,6 +111,7 @@ public override void Dispose()
handles = null;
pointers = null;
values = null;
overflowPagePool.Dispose();
}

public override AddressInfo* GetKeyAddressInfo(long physicalAddress)
Expand All @@ -125,6 +130,16 @@ public override void Dispose()
/// <param name="index"></param>
internal override void AllocatePage(int index)
{
Interlocked.Increment(ref AllocatedPageCount);

if (overflowPagePool.TryGet(out var item))
{
handles[index] = item.handle;
pointers[index] = item.pointer;
values[index] = item.value;
return;
}

var adjustedSize = PageSize + 2 * sectorSize;
byte[] tmp = new byte[adjustedSize];
Array.Clear(tmp, 0, adjustedSize);
Expand All @@ -135,6 +150,8 @@ internal override void AllocatePage(int index)
values[index] = tmp;
}

internal override int OverflowPageCount => overflowPagePool.Count;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override long GetPhysicalAddress(long logicalAddress)
{
Expand Down Expand Up @@ -208,6 +225,23 @@ internal override void ClearPage(long page, int offset)
}
}

internal override void FreePage(long page)
{
ClearPage(page, 0);
if (EmptyPageCount > 0)
{
int index = (int)(page % BufferSize);
overflowPagePool.TryAdd(new PageUnit {
handle = handles[index],
pointer = pointers[index],
value = values[index] });
values[index] = null;
pointers[index] = 0;
handles[index] = default;
Interlocked.Decrement(ref AllocatedPageCount);
}
}

/// <summary>
/// Delete in-memory portion of the log
/// </summary>
Expand Down
23 changes: 23 additions & 0 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ public unsafe sealed class GenericAllocator<Key, Value> : AllocatorBase<Key, Val
private readonly bool keyBlittable = Utility.IsBlittable<Key>();
private readonly bool valueBlittable = Utility.IsBlittable<Value>();

private readonly OverflowPool<Record<Key, Value>[]> overflowPagePool;

public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> serializerSettings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null, Action<CommitInfo> flushCallback = null)
: base(settings, comparer, evictCallback, epoch, flushCallback)
{
overflowPagePool = new OverflowPool<Record<Key, Value>[]>(4);

if (settings.ObjectLogDevice == null)
{
throw new FasterException("LogSettings.ObjectLogDevice needs to be specified (e.g., use Devices.CreateLogDevice, AzureStorageDevice, or NullDevice)");
Expand Down Expand Up @@ -75,6 +79,8 @@ public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> ser
}
}

internal override int OverflowPageCount => overflowPagePool.Count;

public override void Initialize()
{
Initialize(recordSize);
Expand Down Expand Up @@ -188,6 +194,7 @@ public override void Dispose()
}
values = null;
}
overflowPagePool.Dispose();
base.Dispose();
}

Expand Down Expand Up @@ -224,6 +231,11 @@ internal override void AllocatePage(int index)

internal Record<Key, Value>[] AllocatePage()
{
Interlocked.Increment(ref AllocatedPageCount);

if (overflowPagePool.TryGet(out var item))
return item;

Record<Key, Value>[] tmp;
if (PageSize % recordSize == 0)
tmp = new Record<Key, Value>[PageSize / recordSize];
Expand Down Expand Up @@ -308,6 +320,17 @@ internal override void ClearPage(long page, int offset)
}
}

internal override void FreePage(long page)
{
ClearPage(page, 0);
if (EmptyPageCount > 0)
{
overflowPagePool.TryAdd(values[page % BufferSize]);
values[page % BufferSize] = default;
Interlocked.Decrement(ref AllocatedPageCount);
}
}

private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddress, uint numBytesToWrite,
DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult,
IDevice device, IDevice objlogDevice, long intendedDestinationPage = -1, long[] localSegmentOffsets = null)
Expand Down
18 changes: 18 additions & 0 deletions cs/src/core/Allocator/PageUnit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Runtime.InteropServices;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

namespace FASTER.core
{
struct PageUnit
{
public byte[] value;
public GCHandle handle;
public long pointer;
}
}


Loading

0 comments on commit 2a511bb

Please sign in to comment.