Skip to content

Commit

Permalink
Updates to session support
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Jul 13, 2019
1 parent 00dc474 commit 533652b
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/MallocFixedPageSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void Free(long pointer)
{
if (!ReturnPhysicalAddress)
{
values[pointer >> PageSizeBits][pointer & PageSizeMask] = default;
values[pointer >> PageSizeBits][pointer & PageSizeMask] = default(T);
}

freeList.Enqueue(pointer);
Expand Down
37 changes: 28 additions & 9 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,18 @@ public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken)
InternalRecover(indexCheckpointToken, hybridLogCheckpointToken);
}

#region Shared Sessions
private Session<Key, Value, Input, Output, Context, Functions>[] _sessions = new Session<Key, Value, Input, Output, Context, Functions>[LightEpoch.kTableSize];

/// <summary>
/// Start shared (not thread-specific) session with FASTER
/// Start new shared (not thread-specific) session with FASTER
/// </summary>
/// <returns></returns>
public Session<Key, Value, Input, Output, Context, Functions> StartSharedSession()
{
StartSession();
return Session<Key, Value, Input, Output, Context, Functions>.GetOrCreate
(this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
return Session<Key, Value, Input, Output, Context, Functions>.Create
(_sessions, this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
}

/// <summary>
Expand All @@ -289,18 +291,35 @@ public Session<Key, Value, Input, Output, Context, Functions> StartSharedSession
public Session<Key, Value, Input, Output, Context, Functions> ContinueSharedSession(Guid guid)
{
ContinueSession(guid);
return new Session<Key, Value, Input, Output, Context, Functions>
(this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
return Session<Key, Value, Input, Output, Context, Functions>.Create
(_sessions, this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
}

/// <summary>
/// Return shared (not thread-specific) session with FASTER
/// Return existing (or start new) shared session with FASTER
/// </summary>
/// <returns></returns>
public Session<Key, Value, Input, Output, Context, Functions> GetSharedSession()
{
return Session<Key, Value, Input, Output, Context, Functions>.GetOrCreate
(this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
epoch.Acquire();
threadCtx.InitializeThread();
prevThreadCtx.InitializeThread();
Phase phase = _systemState.phase;
if (phase != Phase.REST)
{
throw new Exception("Can acquire only in REST phase!");
}

var session = _sessions[epoch.ThreadEntry.Value];

if (session != null)
return session;

Guid guid = Guid.NewGuid();
InitLocalContext(guid);
prevThreadCtx.Value = null;
return Session<Key, Value, Input, Output, Context, Functions>.Create
(_sessions, this, prevThreadCtx.Value, threadCtx.Value, epoch.ThreadEntry.Value);
}

internal void SetContext(FasterExecutionContext prevThreadCtx, FasterExecutionContext threadCtx, int epochEntry)
Expand Down Expand Up @@ -341,7 +360,7 @@ internal void SuspendSession(bool completePending = false)
}
epoch.Release();
}

#endregion

/// <summary>
/// Start session with FASTER - call once per thread before using FASTER
Expand Down
10 changes: 4 additions & 6 deletions cs/src/core/Index/FASTER/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ public class Session<Key, Value, Input, Output, Context, Functions> : IDisposabl
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
private static Session<Key, Value, Input, Output, Context, Functions>[] _sessions
= new Session<Key, Value, Input, Output, Context, Functions>[LightEpoch.kTableSize];

private FasterKV<Key, Value, Input, Output, Context, Functions> fht;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx;
private readonly int epochEntry;

internal static Session<Key, Value, Input, Output, Context, Functions> GetOrCreate(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
internal static Session<Key, Value, Input, Output, Context, Functions> Create(
Session<Key, Value, Input, Output, Context, Functions>[] _sessions,
FasterKV <Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx,
int epochEntry)
Expand All @@ -48,7 +46,7 @@ internal static Session<Key, Value, Input, Output, Context, Functions> GetOrCrea
return session;
}

internal Session(
private Session(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx,
Expand Down
85 changes: 75 additions & 10 deletions cs/test/SessionFASTERTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public void SessionTest1()
{
using (var session = fht.StartSharedSession())
{
InputStruct input = default;
OutputStruct output = default;
InputStruct input = default(InputStruct);
OutputStruct output = default(OutputStruct);

var key1 = new KeyStruct { kfield1 = 13, kfield2 = 14 };
var value = new ValueStruct { vfield1 = 23, vfield2 = 24 };
Expand Down Expand Up @@ -73,8 +73,8 @@ public void SessionTest2()
using (var session1 = fht.StartSharedSession())
using (var session2 = fht.StartSharedSession())
{
InputStruct input = default;
OutputStruct output = default;
InputStruct input = default(InputStruct);
OutputStruct output = default(OutputStruct);

var key1 = new KeyStruct { kfield1 = 14, kfield2 = 15 };
var value1 = new ValueStruct { vfield1 = 24, vfield2 = 25 };
Expand Down Expand Up @@ -121,8 +121,8 @@ public void SessionTest3()
{
Task.CompletedTask.ContinueWith((t) =>
{
InputStruct input = default;
OutputStruct output = default;
InputStruct input = default(InputStruct);
OutputStruct output = default(OutputStruct);

var key1 = new KeyStruct { kfield1 = 13, kfield2 = 14 };
var value = new ValueStruct { vfield1 = 23, vfield2 = 24 };
Expand Down Expand Up @@ -153,8 +153,8 @@ public void SessionTest4()
{
var t1 = Task.CompletedTask.ContinueWith((t) =>
{
InputStruct input = default;
OutputStruct output = default;
InputStruct input = default(InputStruct);
OutputStruct output = default(OutputStruct);

var key1 = new KeyStruct { kfield1 = 14, kfield2 = 15 };
var value1 = new ValueStruct { vfield1 = 24, vfield2 = 25 };
Expand All @@ -177,8 +177,8 @@ public void SessionTest4()

var t2 = Task.CompletedTask.ContinueWith((t) =>
{
InputStruct input = default;
OutputStruct output = default;
InputStruct input = default(InputStruct);
OutputStruct output = default(OutputStruct);

var key2 = new KeyStruct { kfield1 = 15, kfield2 = 16 };
var value2 = new ValueStruct { vfield1 = 25, vfield2 = 26 };
Expand All @@ -204,5 +204,70 @@ public void SessionTest4()
t2.Wait();
}
}

[Test]
public void SessionTest5()
{
var session = fht.GetSharedSession();
var id = session.ID;

InputStruct input = default(InputStruct);
OutputStruct output = default(OutputStruct);

var key1 = new KeyStruct { kfield1 = 16, kfield2 = 17 };
var value1 = new ValueStruct { vfield1 = 26, vfield2 = 27 };

session.Upsert(ref key1, ref value1, Empty.Default, 0);
var status = session.Read(ref key1, ref input, ref output, Empty.Default, 0);

if (status == Status.PENDING)
{
session.CompletePending(true);
}
else
{
Assert.IsTrue(status == Status.OK);
}

Assert.IsTrue(output.value.vfield1 == value1.vfield1);
Assert.IsTrue(output.value.vfield2 == value1.vfield2);

session.Return();

session = fht.GetSharedSession();

// Make sure we get back the older suspended session
Assert.IsTrue(id == session.ID);

var key2 = new KeyStruct { kfield1 = 17, kfield2 = 18 };
var value2 = new ValueStruct { vfield1 = 27, vfield2 = 28 };

session.Upsert(ref key2, ref value2, Empty.Default, 0);

status = session.Read(ref key2, ref input, ref output, Empty.Default, 0);

if (status == Status.PENDING)
{
session.CompletePending(true);
}
else
{
Assert.IsTrue(status == Status.OK);
}

status = session.Read(ref key2, ref input, ref output, Empty.Default, 0);

if (status == Status.PENDING)
{
session.CompletePending(true);
}
else
{
Assert.IsTrue(status == Status.OK);
}

Assert.IsTrue(output.value.vfield1 == value2.vfield1);
Assert.IsTrue(output.value.vfield2 == value2.vfield2);
}
}
}

0 comments on commit 533652b

Please sign in to comment.