Skip to content

Commit

Permalink
Added `custom compaction functions (microsoft#272).
Browse files Browse the repository at this point in the history
Fixed a bug where compaction would use stale value if multiple records with different lengths (for varlen structs) where encountered during compaction.
  • Loading branch information
marius-klimantavicius committed Jun 6, 2020
1 parent 08124d4 commit bbdd47f
Show file tree
Hide file tree
Showing 7 changed files with 498 additions and 68 deletions.
4 changes: 2 additions & 2 deletions cs/src/core/Index/Common/AddressInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static string ToString(AddressInfo* info)

public long Size
{
get
readonly get
{
int multiplier = (int)((((long)word & kMultiplierMaskInWord) >> (kAddressBits + kSizeBits)) & kMultiplierMaskInInteger);
return (multiplier == 0 ? 512 : 1<<20)*((((long)word & kSizeMaskInWord) >> kAddressBits) & kSizeMaskInInteger);
Expand Down Expand Up @@ -74,7 +74,7 @@ public long Size

public long Address
{
get
readonly get
{
return (long)word & kAddressMask;
}
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public byte[] ToByteArray()
/// <summary>
/// Print checkpoint info for debugging purposes
/// </summary>
public void DebugPrint()
public readonly void DebugPrint()
{
Debug.WriteLine("******** HybridLog Checkpoint Info for {0} ********", guid);
Debug.WriteLine("Version: {0}", version);
Expand Down Expand Up @@ -453,7 +453,7 @@ public void Recover(Guid guid, ICheckpointManager checkpointManager)
Initialize(s);
}

public byte[] ToByteArray()
public readonly byte[] ToByteArray()
{
using (MemoryStream ms = new MemoryStream())
{
Expand All @@ -472,7 +472,7 @@ public byte[] ToByteArray()
}
}

public void DebugPrint()
public readonly void DebugPrint()
{
Debug.WriteLine("******** Index Checkpoint Info for {0} ********", token);
Debug.WriteLine("Table Size: {0}", table_size);
Expand Down
219 changes: 157 additions & 62 deletions cs/src/core/Index/FASTER/LogAccessor.cs

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions cs/src/core/Index/Interfaces/ICompactionFunctions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
namespace FASTER.core
{
/// <summary>
/// Optional functions to be called during compaction.
/// </summary>
/// <typeparam name="Key"></typeparam>
/// <typeparam name="Value"></typeparam>
public interface ICompactionFunctions<Key, Value>
{
/// <summary>
/// Checks if record in the faster log is logically deleted.
/// If the record was deleted via <see cref="ClientSession{Key, Value, Input, Output, Context, Functions}.Delete(ref Key, Context, long)"/>
/// or <see cref="ClientSession{Key, Value, Input, Output, Context, Functions}.DeleteAsync(ref Key, Context, bool, System.Threading.CancellationToken)"/>
/// then this function is not called for such a record.
/// </summary>
/// <remarks>
/// <para>
/// One possible scenario is if FASTER is used to store reference counted records.
/// Once the record count reaches zero it can be considered to be no longer relevant and
/// compaction can skip the record.
/// </para>
/// <para>
/// Compaction might be implemented by scanning the log thus it is possible that multiple
/// records with the same key and/or different value might be provided. Only the last record will be persisted.
/// </para>
/// <para>
/// This method can be called concurrently with methods in <see cref="IFunctions{Key, Value, Input, Output, Context}"/>. It is responsibility
/// of the implementer to correctly manage concurrency.
/// </para>
/// </remarks>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
bool IsDeleted(in Key key, in Value value);

/// <summary>
/// Copies a value from <paramref name="src"/> to <paramref name="dst"/>.
/// It is possible that value at <paramref name="src"/> might be modified during copy operation thus to prevent torn writes
/// this method is provided to allow the implementer to correctly handle concurrency.
/// This method is counterpart to <see cref="IFunctions{Key, Value, Input, Output, Context}.SingleWriter(ref Key, ref Value, ref Value)"/>.
/// </summary>
/// <param name="src">Managed pointer to value at source</param>
/// <param name="dst">Managed pointer to uninitialized value at destination</param>
/// <param name="valueLength">[Can be null] Variable length struct functions</param>
/// <returns></returns>
void Copy(ref Value src, ref Value dst, IVariableLengthStruct<Value> valueLength);

/// <summary>
/// Copies a value from <paramref name="src"/> to <paramref name="dst"/>.
/// It is possible that value at <paramref name="src"/> might be modified during copy operation thus to prevent torn writes
/// this method is provided to allow the implementer to correctly handle concurrency.
/// This method is counterpart to <see cref="IFunctions{Key, Value, Input, Output, Context}.ConcurrentWriter(ref Key, ref Value, ref Value)"/>.
/// </summary>
/// <param name="src">Managed pointer to value at source</param>
/// <param name="dst">Managed pointer to existing value at destination</param>
/// <param name="valueLength">[Can be null] Variable length struct functions</param>
/// <returns>
/// True - if <paramref name="src"/> can be safely copied to <paramref name="dst"/> (see <see cref="IFunctions{Key, Value, Input, Output, Context}.ConcurrentWriter(ref Key, ref Value, ref Value)"/>).
/// False - if a new record needs to be allocated. In this case <see cref="ICompactionFunctions{Key, Value}.Copy(ref Value, ref Value, IVariableLengthStruct{Value})"/> will be called with
/// managed pointer to new record.
/// </returns>
bool CopyInPlace(ref Value src, ref Value dst, IVariableLengthStruct<Value> valueLength);
}
}
136 changes: 136 additions & 0 deletions cs/test/BlittableLogCompactionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,5 +196,141 @@ public void BlittableLogCompactionTest3()
}
}

[Test]
public void BlittableLogCompactionCustomFunctionsTest1()
{
using var session = fht.NewSession();

InputStruct input = default;

const int totalRecords = 2000;
var start = fht.Log.TailAddress;
var compactUntil = 0L;

for (var i = 0; i < totalRecords; i++)
{
if (i == totalRecords / 2)
compactUntil = fht.Log.TailAddress;

var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 };
var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 };
session.Upsert(ref key1, ref value, 0, 0);
}

var tail = fht.Log.TailAddress;

// Only leave records with even vfield1
fht.Log.Compact(default(EvenCompactionFunctions), compactUntil);
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

// Read 2000 keys - all should be present
for (var i = 0; i < totalRecords; i++)
{
OutputStruct output = default;
var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 };
var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 };

var ctx = (i < (totalRecords / 2) && (i % 2 != 0)) ? 1 : 0;

var status = session.Read(ref key1, ref input, ref output, ctx, 0);
if (status == Status.PENDING)
{
session.CompletePending(true);
}
else
{
if (ctx == 0)
{
Assert.IsTrue(status == Status.OK);
Assert.IsTrue(output.value.vfield1 == value.vfield1);
Assert.IsTrue(output.value.vfield2 == value.vfield2);
}
else
{
Assert.IsTrue(status == Status.NOTFOUND);
}
}
}
}

[Test]
public void BlittableLogCompactionCustomFunctionsTest2()
{
// This test checks if CopyInPlace returning false triggers call to Copy

using var session = fht.NewSession();

var key = new KeyStruct { kfield1 = 100, kfield2 = 101 };
var value = new ValueStruct { vfield1 = 10, vfield2 = 20 };

session.Upsert(ref key, ref value, 0, 0);

fht.Log.Flush(true);

value = new ValueStruct { vfield1 = 11, vfield2 = 21 };
session.Upsert(ref key, ref value, 0, 0);

fht.Log.Flush(true);

var compactionFunctions = new Test2CompactionFunctions();
fht.Log.Compact(compactionFunctions, fht.Log.TailAddress);

Assert.IsTrue(compactionFunctions.CopyCalled);

var input = default(InputStruct);
var output = default(OutputStruct);
var status = session.Read(ref key, ref input, ref output, 0, 0);
if (status == Status.PENDING)
{
session.CompletePending(true);
}
else
{
Assert.IsTrue(status == Status.OK);
Assert.IsTrue(output.value.vfield1 == value.vfield1);
Assert.IsTrue(output.value.vfield2 == value.vfield2);
}
}

private class Test2CompactionFunctions : ICompactionFunctions<KeyStruct, ValueStruct>
{
public bool CopyCalled;

public void Copy(ref ValueStruct src, ref ValueStruct dst, IVariableLengthStruct<ValueStruct> valueLength)
{
if (src.vfield1 == 11 && src.vfield2 == 21)
CopyCalled = true;
dst = src;
}

public bool CopyInPlace(ref ValueStruct src, ref ValueStruct dst, IVariableLengthStruct<ValueStruct> valueLength)
{
return false;
}

public bool IsDeleted(in KeyStruct key, in ValueStruct value)
{
return false;
}
}

private struct EvenCompactionFunctions : ICompactionFunctions<KeyStruct, ValueStruct>
{
public void Copy(ref ValueStruct src, ref ValueStruct dst, IVariableLengthStruct<ValueStruct> valueLength)
{
dst = src;
}

public bool CopyInPlace(ref ValueStruct src, ref ValueStruct dst, IVariableLengthStruct<ValueStruct> valueLength)
{
dst = src;
return true;
}

public bool IsDeleted(in KeyStruct key, in ValueStruct value)
{
return value.vfield1 % 2 != 0;
}
}
}
}
131 changes: 130 additions & 1 deletion cs/test/GenericLogCompactionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ public void GenericLogCompactionTest2()
}
}


[Test]
public void GenericLogCompactionTest3()
{
Expand Down Expand Up @@ -193,5 +192,135 @@ public void GenericLogCompactionTest3()
}
}
}

[Test]
public void GenericLogCompactionCustomFunctionsTest1()
{
MyInput input = new MyInput();

const int totalRecords = 2000;
var compactUntil = 0L;

for (var i = 0; i < totalRecords; i++)
{
if (i == totalRecords / 2)
compactUntil = fht.Log.TailAddress;

var key1 = new MyKey { key = i };
var value = new MyValue { value = i };
session.Upsert(ref key1, ref value, 0, 0);
}

fht.Log.Compact(default(EvenCompactionFunctions), compactUntil);
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

// Read 2000 keys - all should be present
for (var i = 0; i < totalRecords; i++)
{
var output = new MyOutput();
var key1 = new MyKey { key = i };
var value = new MyValue { value = i };

var ctx = (i < (totalRecords / 2) && (i % 2 != 0)) ? 1 : 0;

var status = session.Read(ref key1, ref input, ref output, ctx, 0);
if (status == Status.PENDING)
{
session.CompletePending(true);
}
else
{
if (ctx == 0)
{
Assert.IsTrue(status == Status.OK);
Assert.IsTrue(output.value.value == value.value);
}
else
{
Assert.IsTrue(status == Status.NOTFOUND);
}
}
}
}

[Test]
public void GenericLogCompactionCustomFunctionsTest2()
{
// This test checks if CopyInPlace returning false triggers call to Copy

using var session = fht.NewSession();

var key = new MyKey { key = 100 };
var value = new MyValue { value = 20 };

session.Upsert(ref key, ref value, 0, 0);

fht.Log.Flush(true);

value = new MyValue { value = 21 };
session.Upsert(ref key, ref value, 0, 0);

fht.Log.Flush(true);

var compactionFunctions = new Test2CompactionFunctions();
fht.Log.Compact(compactionFunctions, fht.Log.TailAddress);

Assert.IsTrue(compactionFunctions.CopyCalled);

var input = default(MyInput);
var output = default(MyOutput);
var status = session.Read(ref key, ref input, ref output, 0, 0);
if (status == Status.PENDING)
{
session.CompletePending(true);
}
else
{
Assert.IsTrue(status == Status.OK);
Assert.IsTrue(output.value.value == value.value);
}
}

private class Test2CompactionFunctions : ICompactionFunctions<MyKey, MyValue>
{
public bool CopyCalled;

public void Copy(ref MyValue src, ref MyValue dst, IVariableLengthStruct<MyValue> valueLength)
{
if (src.value == 21)
CopyCalled = true;
dst = src;
}

public bool CopyInPlace(ref MyValue src, ref MyValue dst, IVariableLengthStruct<MyValue> valueLength)
{
return false;
}

public bool IsDeleted(in MyKey key, in MyValue value)
{
return false;
}
}

private struct EvenCompactionFunctions : ICompactionFunctions<MyKey, MyValue>
{
public void Copy(ref MyValue src, ref MyValue dst, IVariableLengthStruct<MyValue> valueLength)
{
dst = src;
}

public bool CopyInPlace(ref MyValue src, ref MyValue dst, IVariableLengthStruct<MyValue> valueLength)
{
dst = src;
return true;
}

public bool IsDeleted(in MyKey key, in MyValue value)
{
return value.value % 2 != 0;
}
}

}
}
Loading

0 comments on commit bbdd47f

Please sign in to comment.