Skip to content

Commit

Permalink
Minor fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Dec 19, 2019
1 parent b31631b commit ecbf696
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 58 deletions.
22 changes: 6 additions & 16 deletions cs/src/core/Allocator/MallocFixedPageSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,12 @@ public class MallocFixedPageSize<T> : IDisposable
private const int PageSizeMask = PageSize - 1;
private const int LevelSizeBits = 12;
private const int LevelSize = 1 << LevelSizeBits;
private const int LevelSizeMask = LevelSize - 1;

private T[][] values = new T[LevelSize][];
private GCHandle[] handles = new GCHandle[LevelSize];
private IntPtr[] pointers = new IntPtr[LevelSize];

private T[] values0;
private readonly GCHandle handles0;
private readonly IntPtr pointers0;
private readonly int RecordSize;
private readonly int AlignedPageSize;

private volatile int writeCacheLevel;

Expand Down Expand Up @@ -89,10 +84,7 @@ public MallocFixedPageSize(bool returnPhysicalAddress = false)
{
handles[0] = GCHandle.Alloc(values[0], GCHandleType.Pinned);
pointers[0] = handles[0].AddrOfPinnedObject();
handles0 = handles[0];
pointers0 = pointers[0];
RecordSize = Marshal.SizeOf(values[0][0]);
AlignedPageSize = RecordSize * PageSize;
}
catch (Exception)
{
Expand All @@ -102,7 +94,6 @@ public MallocFixedPageSize(bool returnPhysicalAddress = false)
}
}

values0 = values[0];
writeCacheLevel = -1;
Interlocked.MemoryBarrier();

Expand Down Expand Up @@ -174,7 +165,7 @@ public void Free(long pointer)
{
if (!ReturnPhysicalAddress)
{
values[pointer >> PageSizeBits][pointer & PageSizeMask] = default(T);
values[pointer >> PageSizeBits][pointer & PageSizeMask] = default;
}

freeList.Enqueue(pointer);
Expand Down Expand Up @@ -220,7 +211,7 @@ public long BulkAllocate()

// Return location.
if (ReturnPhysicalAddress)
return (((long)pointers0) + index * RecordSize);
return (((long)pointers[0]) + index * RecordSize);
else
return index;
}
Expand Down Expand Up @@ -330,7 +321,7 @@ public long Allocate()

// Return location.
if (ReturnPhysicalAddress)
return ((long)pointers0) + index * RecordSize;
return ((long)pointers[0]) + index * RecordSize;
else
return index;
}
Expand Down Expand Up @@ -412,7 +403,6 @@ public void Dispose()
handles = null;
pointers = null;
values = null;
values0 = null;
count = 0;
freeList = null;
}
Expand Down Expand Up @@ -466,7 +456,7 @@ internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong num
numBytesWritten = 0;
for (int i = 0; i < numLevels; i++)
{
OverflowPagesFlushAsyncResult result = default(OverflowPagesFlushAsyncResult);
OverflowPagesFlushAsyncResult result = default;
uint writeSize = (uint)((i == numCompleteLevels) ? (lastLevelSize + (sectorSize - 1)) & ~(sectorSize - 1) : alignedPageSize);

device.WriteAsync(pointers[i], offset + numBytesWritten, writeSize, AsyncFlushCallback, result);
Expand Down Expand Up @@ -526,7 +516,7 @@ public int GetPageSize()
/// <param name="offset"></param>
public void Recover(IDevice device, ulong offset, int buckets, ulong numBytes)
{
BeginRecovery(device, offset, buckets, numBytes, out ulong numBytesRead);
BeginRecovery(device, offset, buckets, numBytes, out _);
}

/// <summary>
Expand Down Expand Up @@ -576,7 +566,7 @@ internal unsafe void BeginRecovery(IDevice device,
{
//read a full page
uint length = (uint)PageSize * (uint)RecordSize; ;
OverflowPagesReadAsyncResult result = default(OverflowPagesReadAsyncResult);
OverflowPagesReadAsyncResult result = default;
device.ReadAsync(offset + numBytesRead, pointers[i], length, AsyncPageReadCallback, result);
numBytesRead += (i == numCompleteLevels) ? lastLevelSize : alignedPageSize;
}
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ public IEnumerable<long> GetPendingRequests()
}

/// <summary>
/// Refresh session, handling checkpointing if needed
/// Refresh session epoch and handle checkpointing phases. Used only
/// in case of thread-affinitized sessions (async support is disabled).
/// </summary>
public void Refresh()
{
Expand Down
48 changes: 25 additions & 23 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,58 @@ public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functio
private Dictionary<string, ClientSession<Key, Value, Input, Output, Context, Functions>> _activeSessions;

/// <summary>
/// Start new client session (not thread-specific) with FASTER.
/// Session starts in dormant state.
/// Start a new client session with FASTER.
/// </summary>
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> NewSession(string sessionName = null, bool supportAsync = true)
/// <param name="sessionId">ID/name of session (auto-generated if not provided)</param>
/// <param name="threadAffinitized">For advanced users. Specifies whether session holds the thread epoch across calls. Do not use with async code. Ensure thread calls session Refresh periodically to move the system epoch forward.</param>
/// <returns>Session instance</returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> NewSession(string sessionId = null, bool threadAffinitized = false)
{
if (supportAsync)
if (!threadAffinitized)
UseRelaxedCPR();

if (sessionName == null)
sessionName = Guid.NewGuid().ToString();
if (sessionId == null)
sessionId = Guid.NewGuid().ToString();
var ctx = new FasterExecutionContext();
InitContext(ctx, sessionName);
InitContext(ctx, sessionId);
var prevCtx = new FasterExecutionContext();
InitContext(prevCtx, sessionName);
InitContext(prevCtx, sessionId);
prevCtx.version--;

ctx.prevCtx = prevCtx;

if (_activeSessions == null)
Interlocked.CompareExchange(ref _activeSessions, new Dictionary<string, ClientSession<Key, Value, Input, Output, Context, Functions>>(), null);

var session = new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, supportAsync);
var session = new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, !threadAffinitized);
lock (_activeSessions)
_activeSessions.Add(sessionName, session);
_activeSessions.Add(sessionId, session);
return session;
}

/// <summary>
/// Continue session with FASTER
/// Resume (continue) prior client session with FASTER, used during
/// recovery from failure.
/// </summary>
/// <param name="sessionName"></param>
/// <param name="cp"></param>
/// <param name="supportAsync"></param>
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSession(string sessionName, out CommitPoint cp, bool supportAsync = true)
/// <param name="sessionId">ID/name of previous session to resume</param>
/// <param name="commitPoint">Prior commit point of durability for session</param>
/// <param name="threadAffinitized">For advanced users. Specifies whether session holds the thread epoch across calls. Do not use with async code. Ensure thread calls session Refresh periodically to move the system epoch forward.</param>
/// <returns>Session instance</returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSession(string sessionId, out CommitPoint commitPoint, bool threadAffinitized = false)
{
if (supportAsync)
if (!threadAffinitized)
UseRelaxedCPR();

cp = InternalContinue(sessionName, out FasterExecutionContext ctx);
if (cp.UntilSerialNo == -1)
throw new Exception($"Unable to find session {sessionName} to recover");
commitPoint = InternalContinue(sessionId, out FasterExecutionContext ctx);
if (commitPoint.UntilSerialNo == -1)
throw new Exception($"Unable to find session {sessionId} to recover");

var session = new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, supportAsync);
var session = new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, !threadAffinitized);

if (_activeSessions == null)
Interlocked.CompareExchange(ref _activeSessions, new Dictionary<string, ClientSession<Key, Value, Input, Output, Context, Functions>>(), null);
lock (_activeSessions)
_activeSessions.Add(sessionName, session);
_activeSessions.Add(sessionId, session);
return session;
}

Expand Down
37 changes: 21 additions & 16 deletions cs/src/core/Index/Interfaces/IFasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ public interface IFasterKV<Key, Value, Input, Output, Context, Functions> : IDis
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
/*
#region Session Operations (Deprecated)

/// <summary>
/// Start a session with FASTER. FASTER sessions correspond to threads issuing
/// operations to FASTER.
/// </summary>
/// <returns>Session identifier</returns>
[Obsolete("Use NewSession() instead.")]
Guid StartSession();

/// <summary>
Expand All @@ -31,23 +31,24 @@ public interface IFasterKV<Key, Value, Input, Output, Context, Functions> : IDis
/// </summary>
/// <param name="guid"></param>
/// <returns>Sequence number for resuming operations</returns>
[Obsolete("Use ResumeSession() instead.")]
CommitPoint ContinueSession(Guid guid);

/// <summary>
/// Stop a session and de-register the thread from FASTER.
/// </summary>
[Obsolete("Use and dispose NewSession() instead.")]
void StopSession();

/// <summary>
/// Refresh the session epoch. The caller is required to invoke Refresh periodically
/// in order to guarantee system liveness.
/// </summary>
[Obsolete("Use NewSession(), where Refresh() is not required by default.")]
void Refresh();

#endregion
*/

/*
#region Core Index Operations (Deprecated)

/// <summary>
Expand All @@ -59,6 +60,7 @@ public interface IFasterKV<Key, Value, Input, Output, Context, Functions> : IDis
/// <param name="context">User context to identify operation in asynchronous callback</param>
/// <param name="serialNo">Increasing sequence number of operation (used for recovery)</param>
/// <returns>Status of operation</returns>
[Obsolete("Use NewSession() and invoke Read() on the session.")]
Status Read(ref Key key, ref Input input, ref Output output, Context context, long serialNo);

/// <summary>
Expand All @@ -69,6 +71,7 @@ public interface IFasterKV<Key, Value, Input, Output, Context, Functions> : IDis
/// <param name="context">User context to identify operation in asynchronous callback</param>
/// <param name="serialNo">Increasing sequence number of operation (used for recovery)</param>
/// <returns>Status of operation</returns>
[Obsolete("Use NewSession() and invoke Upsert() on the session.")]
Status Upsert(ref Key key, ref Value value, Context context, long serialNo);

/// <summary>
Expand All @@ -79,6 +82,7 @@ public interface IFasterKV<Key, Value, Input, Output, Context, Functions> : IDis
/// <param name="context">User context to identify operation in asynchronous callback</param>
/// <param name="serialNo">Increasing sequence number of operation (used for recovery)</param>
/// <returns>Status of operation</returns>
[Obsolete("Use NewSession() and invoke RMW() on the session.")]
Status RMW(ref Key key, ref Input input, Context context, long serialNo);

/// <summary>
Expand All @@ -91,37 +95,38 @@ public interface IFasterKV<Key, Value, Input, Output, Context, Functions> : IDis
/// <param name="context">User context to identify operation in asynchronous callback</param>
/// <param name="serialNo">Increasing sequence number of operation (used for recovery)</param>
/// <returns>Status of operation</returns>
[Obsolete("Use NewSession() and invoke Delete() on the session.")]
Status Delete(ref Key key, Context context, long serialNo);

/// <summary>
/// Complete all pending operations issued by this session
/// </summary>
/// <param name="wait">Whether we spin-wait for pending operations to complete</param>
/// <returns>Whether all pending operations have completed</returns>
[Obsolete("Use NewSession() and invoke CompletePending() on the session.")]
bool CompletePending(bool wait);

#endregion
*/

#region New Session Operations

/// <summary>
/// Start new client session (not thread-specific) with FASTER.
/// Session starts in dormant state.
/// Start a new client session with FASTER.
/// </summary>
/// <param name="sessionId"></param>
/// <param name="supportAsync"></param>
/// <returns></returns>
ClientSession<Key, Value, Input, Output, Context, Functions> NewSession(string sessionId = null, bool supportAsync = true);
/// <param name="sessionId">ID/name of session (auto-generated if not provided)</param>
/// <param name="threadAffinitized">For advanced users. Specifies whether session holds the thread epoch across calls. Do not use with async code. Ensure thread calls session Refresh periodically to move the system epoch forward.</param>
/// <returns>Session instance</returns>
ClientSession<Key, Value, Input, Output, Context, Functions> NewSession(string sessionId = null, bool threadAffinitized = false);

/// <summary>
/// Continue prior client session with FASTER, used during
/// Resume (continue) prior client session with FASTER, used during
/// recovery from failure.
/// </summary>
/// <param name="sessionId"></param>
/// <param name="cp"></param>
/// <param name="supportAsync"></param>
/// <returns></returns>
ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSession(string sessionId, out CommitPoint cp, bool supportAsync = true);
/// <param name="sessionId">ID/name of previous session to resume</param>
/// <param name="commitPoint">Prior commit point of durability for session</param>
/// <param name="threadAffinitized">For advanced users. Specifies whether session holds the thread epoch across calls. Do not use with async code. Ensure thread calls session Refresh periodically to move the system epoch forward.</param>
/// <returns>Session instance</returns>
ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSession(string sessionId, out CommitPoint commitPoint, bool threadAffinitized = false);

#endregion

Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/Recovery/IndexCheckpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private unsafe void BeginMainIndexCheckpoint(
for (int index = 0; index < numChunks; index++)
{
long chunkStartBucket = (long)start + (index * chunkSize);
HashIndexPageAsyncFlushResult result = default(HashIndexPageAsyncFlushResult);
HashIndexPageAsyncFlushResult result = default;
result.chunkIndex = index;
device.WriteAsync((IntPtr)chunkStartBucket, numBytesWritten, chunkSize, AsyncPageFlushCallback, result);
numBytesWritten += chunkSize;
Expand Down Expand Up @@ -119,7 +119,7 @@ private unsafe void AsyncPageFlushCallback(
NativeOverlapped* overlap)
{
//Set the page status to flushed
var result = (HashIndexPageAsyncFlushResult)Overlapped.Unpack(overlap).AsyncResult;
_ = (HashIndexPageAsyncFlushResult)Overlapped.Unpack(overlap).AsyncResult;

try
{
Expand Down

0 comments on commit ecbf696

Please sign in to comment.