Skip to content

Commit

Permalink
Improved support, added session pooling option, added session testcases.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Jul 12, 2019
1 parent 2b7e24f commit 00dc474
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 81 deletions.
74 changes: 10 additions & 64 deletions cs/src/core/Allocator/MallocFixedPageSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define CALLOC

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -47,26 +48,15 @@ public unsafe class MallocFixedPageSize<T> : IDisposable

private CountdownEvent checkpointEvent;

private readonly LightEpoch epoch;
private readonly bool ownedEpoch;

private FastThreadLocal<Queue<FreeItem>> freeList;
private ConcurrentQueue<long> freeList;

/// <summary>
/// Create new instance
/// </summary>
/// <param name="returnPhysicalAddress"></param>
/// <param name="epoch"></param>
public MallocFixedPageSize(bool returnPhysicalAddress = false, LightEpoch epoch = null)
public MallocFixedPageSize(bool returnPhysicalAddress = false)
{
freeList = new FastThreadLocal<Queue<FreeItem>>();
if (epoch == null)
{
this.epoch = new LightEpoch();
ownedEpoch = true;
}
else
this.epoch = epoch;
freeList = new ConcurrentQueue<long>();

values[0] = new T[PageSize];

Expand Down Expand Up @@ -177,20 +167,15 @@ public void Set(long index, ref T value)
/// Free object
/// </summary>
/// <param name="pointer"></param>
/// <param name="removed_epoch"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void FreeAtEpoch(long pointer, int removed_epoch = -1)
public void Free(long pointer)
{
if (!ReturnPhysicalAddress)
{
values[pointer >> PageSizeBits][pointer & PageSizeMask] = default(T);
values[pointer >> PageSizeBits][pointer & PageSizeMask] = default;
}

freeList.InitializeThread();

if (freeList.Value == null)
freeList.Value = new Queue<FreeItem>();
freeList.Value.Enqueue(new FreeItem { removed_item = pointer, removal_epoch = removed_epoch });
freeList.Enqueue(pointer);
}

private const int kAllocateChunkSize = 16;
Expand Down Expand Up @@ -308,19 +293,8 @@ public long BulkAllocate()
/// <returns></returns>
public long Allocate()
{
freeList.InitializeThread();
if (freeList.Value == null)
{
freeList.Value = new Queue<FreeItem>();
}
if (freeList.Value.Count > 0)
{
if (freeList.Value.Peek().removal_epoch <= epoch.SafeToReclaimEpoch)
return freeList.Value.Dequeue().removed_item;

//if (freeList.Count % 64 == 0)
// LightEpoch.Instance.BumpCurrentEpoch();
}
if (freeList.TryDequeue(out long result))
return result;

// Determine insertion index.
// ReSharper disable once CSharpWarnings::CS0420
Expand Down Expand Up @@ -423,26 +397,6 @@ public long Allocate()
return index;
}

/// <summary>
/// Acquire thread
/// </summary>
public void Acquire()
{
if (ownedEpoch)
epoch.Acquire();
freeList.InitializeThread();
}

/// <summary>
/// Release thread
/// </summary>
public void Release()
{
if (ownedEpoch)
epoch.Release();
freeList.DisposeThread();
}

/// <summary>
/// Dispose
/// </summary>
Expand All @@ -458,9 +412,7 @@ public void Dispose()
values = null;
values0 = null;
count = 0;
if (ownedEpoch)
epoch.Dispose();
freeList.Dispose();
freeList = null;
}


Expand Down Expand Up @@ -645,10 +597,4 @@ private void AsyncPageReadCallback(
}
#endregion
}

internal struct FreeItem
{
public long removed_item;
public int removal_epoch;
}
}
47 changes: 45 additions & 2 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,12 @@ public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken)
public Session<Key, Value, Input, Output, Context, Functions> StartSharedSession()
{
StartSession();
return new Session<Key, Value, Input, Output, Context, Functions>
return Session<Key, Value, Input, Output, Context, Functions>.GetOrCreate
(this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
}

/// <summary>
/// Continue shared (not thread-specific) session with FASTER
/// Recover or continue shared (not thread-specific) session with FASTER
/// </summary>
/// <returns></returns>
public Session<Key, Value, Input, Output, Context, Functions> ContinueSharedSession(Guid guid)
Expand All @@ -293,13 +293,56 @@ public Session<Key, Value, Input, Output, Context, Functions> ContinueSharedSess
(this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
}

/// <summary>
/// Return shared (not thread-specific) session with FASTER
/// </summary>
/// <returns></returns>
public Session<Key, Value, Input, Output, Context, Functions> GetSharedSession()
{
return Session<Key, Value, Input, Output, Context, Functions>.GetOrCreate
(this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
}

internal void SetContext(FasterExecutionContext prevThreadCtx, FasterExecutionContext threadCtx, int epochEntry)
{
if (!this.prevThreadCtx.IsInitializedForThread) this.prevThreadCtx.InitializeThread();
if (!this.threadCtx.IsInitializedForThread) this.threadCtx.InitializeThread();
if (!epoch.ThreadEntry.IsInitializedForThread) epoch.ThreadEntry.InitializeThread();

this.prevThreadCtx.Value = prevThreadCtx;
this.threadCtx.Value = threadCtx;
epoch.ThreadEntry.Value = epochEntry;
}

/// <summary>
/// Suspend session with FASTER
/// </summary>
internal void SuspendSession(bool completePending = false)
{
if (completePending)
{
while (true)
{
bool done = true;
if (threadCtx.Value.retryRequests.Count != 0 || threadCtx.Value.ioPendingRequests.Count != 0)
done = false;

if (prevThreadCtx.Value != default(FasterExecutionContext))
{
if (prevThreadCtx.Value.retryRequests.Count != 0 || prevThreadCtx.Value.ioPendingRequests.Count != 0)
done = false;
}
if (threadCtx.Value.phase != Phase.REST)
done = false;

if (done) break;
Refresh();
}
}
epoch.Release();
}


/// <summary>
/// Start session with FASTER - call once per thread before using FASTER
/// </summary>
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/FASTERBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public unsafe partial class FasterBase
public FasterBase()
{
epoch = new LightEpoch();
overflowBucketsAllocator = new MallocFixedPageSize<HashBucket>(false, epoch);
overflowBucketsAllocator = new MallocFixedPageSize<HashBucket>(false);
}

internal Status Free()
Expand Down Expand Up @@ -598,7 +598,7 @@ private bool FindTagOrFreeInternal(long hash, ushort tag, ref HashBucket* bucket
if (compare_word != result_word)
{
// Install failed, undo allocation; use the winner's entry
overflowBucketsAllocator.FreeAtEpoch(logicalBucketAddress, 0);
overflowBucketsAllocator.Free(logicalBucketAddress);
target_entry_word = result_word;
}
else
Expand Down
3 changes: 0 additions & 3 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functio
internal Guid InternalAcquire()
{
epoch.Acquire();
overflowBucketsAllocator.Acquire();
threadCtx.InitializeThread();
prevThreadCtx.InitializeThread();
Phase phase = _systemState.phase;
Expand All @@ -36,7 +35,6 @@ internal Guid InternalAcquire()
internal long InternalContinue(Guid guid)
{
epoch.Acquire();
overflowBucketsAllocator.Acquire();
threadCtx.InitializeThread();
prevThreadCtx.InitializeThread();
if (_recoveredSessions != null)
Expand Down Expand Up @@ -117,7 +115,6 @@ internal void InternalRelease()
threadCtx.DisposeThread();
prevThreadCtx.DisposeThread();
epoch.Release();
overflowBucketsAllocator.Release();
}

internal void InitLocalContext(Guid token)
Expand Down
106 changes: 100 additions & 6 deletions cs/src/core/Index/FASTER/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,32 @@ public class Session<Key, Value, Input, Output, Context, Functions> : IDisposabl
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
FasterKV<Key, Value, Input, Output, Context, Functions> fht;
private static Session<Key, Value, Input, Output, Context, Functions>[] _sessions
= new Session<Key, Value, Input, Output, Context, Functions>[LightEpoch.kTableSize];

private FasterKV<Key, Value, Input, Output, Context, Functions> fht;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx;
private int epochEntry;
private readonly int epochEntry;

internal static Session<Key, Value, Input, Output, Context, Functions> GetOrCreate(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx,
int epochEntry)
{
if (_sessions[epochEntry] == null)
{
_sessions[epochEntry] = new Session<Key, Value, Input, Output, Context, Functions>(fht, prevThreadCtx, threadCtx, epochEntry);
return _sessions[epochEntry];
}

var session = _sessions[epochEntry];
session.fht = fht;
session.prevThreadCtx = prevThreadCtx;
session.threadCtx = threadCtx;
return session;
}

internal Session(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
Expand All @@ -49,11 +71,7 @@ internal Session(
public void Dispose()
{
Resume();

fht.StopSession();
prevThreadCtx = threadCtx = null;
epochEntry = 0;
fht = null;
}

/// <summary>
Expand All @@ -63,5 +81,81 @@ public void Resume()
{
fht.SetContext(prevThreadCtx, threadCtx, epochEntry);
}

/// <summary>
/// Return shared (not thread-specific) session with FASTER
/// </summary>
/// <returns></returns>
public void Return(bool completePending = false)
{
fht.SuspendSession(completePending);
}

/// <summary>
/// Read operation
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="output"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long monotonicSerialNum)
{
Resume();
return fht.Read(ref key, ref input, ref output, userContext, monotonicSerialNum);
}

/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="desiredValue"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long monotonicSerialNum)
{
Resume();
return fht.Upsert(ref key, ref desiredValue, userContext, monotonicSerialNum);
}

/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
public Status RMW(ref Key key, ref Input input, Context userContext, long monotonicSerialNum)
{
Resume();
return fht.RMW(ref key, ref input, userContext, monotonicSerialNum);
}

/// <summary>
///
/// </summary>
/// <param name="key"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
public Status Delete(ref Key key, Context userContext, long monotonicSerialNum)
{
Resume();
return fht.Delete(ref key, userContext, monotonicSerialNum);
}

/// <summary>
/// Complete outstanding pending operations
/// </summary>
/// <param name="wait"></param>
/// <returns></returns>
public bool CompletePending(bool wait = false)
{
Resume();
return fht.CompletePending(wait);
}
}
}
Loading

0 comments on commit 00dc474

Please sign in to comment.