Skip to content

Commit

Permalink
[C#] Allow log compaction to non-record-boundary addresses (#361)
Browse files Browse the repository at this point in the history
* [C#] Allow log compaction to non-record-boundary addresses
* Add option to ShiftBeginAddress, to snap given address to nearest earlier page start
  • Loading branch information
badrishc authored Oct 26, 2020
1 parent a08b95b commit 075a10a
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 42 deletions.
3 changes: 0 additions & 3 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -751,10 +751,7 @@ public long TryAllocate(int numSlots = 1)
return 0;

// Determine insertion index.
// ReSharper disable once CSharpWarnings::CS0420
#pragma warning disable 420
localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots);
#pragma warning restore 420

int page = localTailPageOffset.Page;
int offset = localTailPageOffset.Offset - numSlots;
Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Allocator/BlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public sealed class BlittableScanIterator<Key, Value> : IFasterScanIterator<Key,
/// </summary>
public long CurrentAddress => currentAddress;

/// <summary>
/// Next address
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// Constructor
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Allocator/GenericScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public sealed class GenericScanIterator<Key, Value> : IFasterScanIterator<Key, V
/// </summary>
public long CurrentAddress => currentAddress;

/// <summary>
/// Next address
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// Constructor
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Allocator/IFasterScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,10 @@ public interface IFasterScanIterator<Key, Value> : IDisposable
/// Current address
/// </summary>
long CurrentAddress { get; }

/// <summary>
/// Next address
/// </summary>
long NextAddress { get; }
}
}
5 changes: 5 additions & 0 deletions cs/src/core/Allocator/VarLenBlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public sealed class VariableLengthBlittableScanIterator<Key, Value> : IFasterSca
/// </summary>
public long CurrentAddress => currentAddress;

/// <summary>
/// Next address
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// Constructor
/// </summary>
Expand Down
10 changes: 6 additions & 4 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,10 @@ public async ValueTask WaitForCommitAsync(CancellationToken token = default)
/// <param name="shiftBeginAddress">Whether to shift begin address to untilAddress after compaction. To avoid
/// data loss on failure, set this to false, and shift begin address only after taking a checkpoint. This
/// ensures that records written to the tail during compaction are first made stable.</param>
public void Compact(long untilAddress, bool shiftBeginAddress)
/// <returns>Address until which compaction was done</returns>
public long Compact(long untilAddress, bool shiftBeginAddress)
{
Compact(untilAddress, shiftBeginAddress, default(DefaultCompactionFunctions<Key, Value>));
return Compact(untilAddress, shiftBeginAddress, default(DefaultCompactionFunctions<Key, Value>));
}

/// <summary>
Expand All @@ -595,7 +596,8 @@ public void Compact(long untilAddress, bool shiftBeginAddress)
/// <param name="compactionFunctions">User provided compaction functions (see <see cref="ICompactionFunctions{Key, Value}"/>).</param>
/// data loss on failure, set this to false, and shift begin address only after taking a checkpoint. This
/// ensures that records written to the tail during compaction are first made stable.</param>
public void Compact<CompactionFunctions>(long untilAddress, bool shiftBeginAddress, CompactionFunctions compactionFunctions)
/// <returns>Address until which compaction was done</returns>
public long Compact<CompactionFunctions>(long untilAddress, bool shiftBeginAddress, CompactionFunctions compactionFunctions)
where CompactionFunctions : ICompactionFunctions<Key, Value>
{
if (!SupportAsync)
Expand All @@ -612,7 +614,7 @@ public void Compact<CompactionFunctions>(long untilAddress, bool shiftBeginAddre
};
}

fht.Log.Compact(this, functions, compactionFunctions, untilAddress, vl, shiftBeginAddress);
return fht.Log.Compact(this, functions, compactionFunctions, untilAddress, vl, shiftBeginAddress);
}

/// <summary>
Expand Down
5 changes: 2 additions & 3 deletions cs/src/core/Index/FASTER/FASTERIterator.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#pragma warning disable 0162

using System;

namespace FASTER.core
{
Expand Down Expand Up @@ -99,6 +96,8 @@ public FasterKVIterator(FasterKV<Key, Value> fht, Functions functions, Compactio

public long CurrentAddress => enumerationPhase == 0 ? iter1.CurrentAddress : iter2.CurrentAddress;

public long NextAddress => enumerationPhase == 0 ? iter1.NextAddress : iter2.NextAddress;

public void Dispose()
{
iter1.Dispose();
Expand Down
50 changes: 31 additions & 19 deletions cs/src/core/Index/FASTER/LogAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ public LogAccessor(FasterKV<Key, Value> fht, AllocatorBase<Key, Value> allocator
public long BeginAddress => allocator.BeginAddress;

/// <summary>
/// Truncate the log until, but not including, untilAddress
/// Truncate the log until, but not including, untilAddress. Make sure address corresponds to record boundary if snapToPageStart is set to false.
/// </summary>
/// <param name="untilAddress"></param>
public void ShiftBeginAddress(long untilAddress)
/// <param name="untilAddress">Address to shift begin address until</param>
/// <param name="snapToPageStart">Whether given address should be snapped to nearest earlier page start address</param>
public void ShiftBeginAddress(long untilAddress, bool snapToPageStart = false)
{
if (snapToPageStart)
untilAddress &= ~allocator.PageSizeMask;

allocator.ShiftBeginAddress(untilAddress);
}

Expand Down Expand Up @@ -146,7 +150,7 @@ public void ShiftReadOnlyAddress(long newReadOnlyAddress, bool wait)
}

/// <summary>
/// Scan the log given address range
/// Scan the log given address range, returns all records with address less than endAddress
/// </summary>
/// <param name="beginAddress"></param>
/// <param name="endAddress"></param>
Expand Down Expand Up @@ -197,8 +201,9 @@ public void DisposeFromMemory()
/// <param name="shiftBeginAddress">Whether to shift begin address to untilAddress after compaction. To avoid
/// data loss on failure, set this to false, and shift begin address only after taking a checkpoint. This
/// ensures that records written to the tail during compaction are first made stable.</param>
/// <returns>Address until which compaction was done</returns>
[Obsolete("Invoke Compact() on a client session (ClientSession) instead")]
public void Compact(long untilAddress, bool shiftBeginAddress)
public long Compact(long untilAddress, bool shiftBeginAddress)
{
if (allocator is VariableLengthBlittableAllocator<Key, Value> varLen)
{
Expand All @@ -207,14 +212,14 @@ public void Compact(long untilAddress, bool shiftBeginAddress)
{
MethodInfo method = GetType().GetMethod("CompactReadOnly", BindingFlags.NonPublic | BindingFlags.Instance);
MethodInfo generic = method.MakeGenericMethod(typeof(Key).GetGenericArguments()[0]);
generic.Invoke(this, new object[] { untilAddress, shiftBeginAddress });
return (long)generic.Invoke(this, new object[] { untilAddress, shiftBeginAddress });
}
else if (typeof(Key).IsGenericType && (typeof(Key).GetGenericTypeDefinition() == typeof(Memory<>)) && Utility.IsBlittableType(typeof(Key).GetGenericArguments()[0])
&& typeof(Value).IsGenericType && (typeof(Value).GetGenericTypeDefinition() == typeof(Memory<>)) && Utility.IsBlittableType(typeof(Value).GetGenericArguments()[0]))
{
MethodInfo method = GetType().GetMethod("CompactMemory", BindingFlags.NonPublic | BindingFlags.Instance);
MethodInfo generic = method.MakeGenericMethod(typeof(Key).GetGenericArguments()[0]);
generic.Invoke(this, new object[] { untilAddress, shiftBeginAddress });
return (long)generic.Invoke(this, new object[] { untilAddress, shiftBeginAddress });
}
else
{
Expand All @@ -225,12 +230,12 @@ public void Compact(long untilAddress, bool shiftBeginAddress)
valueLength = varLen.ValueLength,
};

Compact(functions, default(DefaultVariableCompactionFunctions<Key, Value>), untilAddress, variableLengthStructSettings, shiftBeginAddress);
return Compact(functions, default(DefaultVariableCompactionFunctions<Key, Value>), untilAddress, variableLengthStructSettings, shiftBeginAddress);
}
}
else
{
Compact(new LogCompactFunctions<Key, Value, DefaultCompactionFunctions<Key, Value>>(default), default(DefaultCompactionFunctions<Key, Value>), untilAddress, null, shiftBeginAddress);
return Compact(new LogCompactFunctions<Key, Value, DefaultCompactionFunctions<Key, Value>>(default), default(DefaultCompactionFunctions<Key, Value>), untilAddress, null, shiftBeginAddress);
}
}

Expand All @@ -242,8 +247,9 @@ public void Compact(long untilAddress, bool shiftBeginAddress)
/// <param name="shiftBeginAddress">Whether to shift begin address to untilAddress after compaction. To avoid
/// data loss on failure, set this to false, and shift begin address only after taking a checkpoint. This
/// ensures that records written to the tail during compaction are first made stable.</param>
/// <returns>Address until which compaction was done</returns>
[Obsolete("Invoke Compact() on a client session (ClientSession) instead")]
public void Compact<CompactionFunctions>(CompactionFunctions compactionFunctions, long untilAddress, bool shiftBeginAddress)
public long Compact<CompactionFunctions>(CompactionFunctions compactionFunctions, long untilAddress, bool shiftBeginAddress)
where CompactionFunctions : ICompactionFunctions<Key, Value>
{
if (allocator is VariableLengthBlittableAllocator<Key, Value> varLen)
Expand All @@ -255,23 +261,23 @@ public void Compact<CompactionFunctions>(CompactionFunctions compactionFunctions
valueLength = varLen.ValueLength,
};

Compact(functions, compactionFunctions, untilAddress, variableLengthStructSettings, shiftBeginAddress);
return Compact(functions, compactionFunctions, untilAddress, variableLengthStructSettings, shiftBeginAddress);
}
else
{
Compact(new LogCompactFunctions<Key, Value, CompactionFunctions>(compactionFunctions), compactionFunctions, untilAddress, null, shiftBeginAddress);
return Compact(new LogCompactFunctions<Key, Value, CompactionFunctions>(compactionFunctions), compactionFunctions, untilAddress, null, shiftBeginAddress);
}
}

private unsafe void Compact<Functions, CompactionFunctions>(Functions functions, CompactionFunctions cf, long untilAddress, VariableLengthStructSettings<Key, Value> variableLengthStructSettings, bool shiftBeginAddress)
private unsafe long Compact<Functions, CompactionFunctions>(Functions functions, CompactionFunctions cf, long untilAddress, VariableLengthStructSettings<Key, Value> variableLengthStructSettings, bool shiftBeginAddress)
where Functions : IFunctions<Key, Value, Empty, Empty, Empty>
where CompactionFunctions : ICompactionFunctions<Key, Value>
{
using var fhtSession = fht.NewSession<Empty, Empty, Empty, Functions>(functions);
Compact(fhtSession, functions, cf, untilAddress, variableLengthStructSettings, shiftBeginAddress);
return Compact(fhtSession, functions, cf, untilAddress, variableLengthStructSettings, shiftBeginAddress);
}

internal unsafe void Compact<Input, Output, Context, Functions, CompactionFunctions>(
internal unsafe long Compact<Input, Output, Context, Functions, CompactionFunctions>(
ClientSession<Key, Value, Input, Output, Context, Functions> fhtSession,
Functions functions, CompactionFunctions cf, long untilAddress, VariableLengthStructSettings<Key, Value> variableLengthStructSettings, bool shiftBeginAddress)
where Functions : IFunctions<Key, Value, Input, Output, Context>
Expand All @@ -294,6 +300,8 @@ internal unsafe void Compact<Input, Output, Context, Functions, CompactionFuncti
else
tempKvSession.Upsert(ref key, ref value, default, 0);
}
// Ensure address is at record boundary
untilAddress = originalUntilAddress = iter1.NextAddress;
}

// TODO: Scan until SafeReadOnlyAddress
Expand Down Expand Up @@ -338,6 +346,8 @@ internal unsafe void Compact<Input, Output, Context, Functions, CompactionFuncti

if (shiftBeginAddress)
ShiftBeginAddress(originalUntilAddress);

return originalUntilAddress;
}

private void LogScanForValidity<Input, Output, Context, Functions>(ref long untilAddress, ref long scanUntil, ClientSession<Key, Value, Input, Output, Context, Functions> tempKvSession)
Expand All @@ -359,7 +369,7 @@ private void LogScanForValidity<Input, Output, Context, Functions>(ref long unti
}

#pragma warning disable IDE0051 // Remove unused private members
private void CompactReadOnly<T>(long untilAddress, bool shiftBeginAddress) where T : unmanaged
private long CompactReadOnly<T>(long untilAddress, bool shiftBeginAddress) where T : unmanaged
{
if (allocator is VariableLengthBlittableAllocator<ReadOnlyMemory<T>, Memory<T>> varLen)
{
Expand All @@ -370,11 +380,12 @@ private void CompactReadOnly<T>(long untilAddress, bool shiftBeginAddress) where
valueLength = varLen.ValueLength,
};

(this as LogAccessor<ReadOnlyMemory<T>, Memory<T>>).Compact(functions, default(DefaultReadOnlyMemoryCompactionFunctions<T>), untilAddress, variableLengthStructSettings, shiftBeginAddress);
return (this as LogAccessor<ReadOnlyMemory<T>, Memory<T>>).Compact(functions, default(DefaultReadOnlyMemoryCompactionFunctions<T>), untilAddress, variableLengthStructSettings, shiftBeginAddress);
}
throw new FasterException("Unexpected condition during log compaction");
}

private void CompactMemory<T>(long untilAddress, bool shiftBeginAddress)
private long CompactMemory<T>(long untilAddress, bool shiftBeginAddress)
where T : unmanaged
{
if (allocator is VariableLengthBlittableAllocator<Memory<T>, Memory<T>> varLen)
Expand All @@ -386,8 +397,9 @@ private void CompactMemory<T>(long untilAddress, bool shiftBeginAddress)
valueLength = varLen.ValueLength,
};

(this as LogAccessor<Memory<T>, Memory<T>>).Compact(functions, default(DefaultMemoryCompactionFunctions<T>), untilAddress, variableLengthStructSettings, shiftBeginAddress);
return (this as LogAccessor<Memory<T>, Memory<T>>).Compact(functions, default(DefaultMemoryCompactionFunctions<T>), untilAddress, variableLengthStructSettings, shiftBeginAddress);
}
throw new FasterException("Unexpected condition during log compaction");
}
#pragma warning restore IDE0051 // Remove unused private members
}
Expand Down
8 changes: 4 additions & 4 deletions cs/test/BlittableLogCompactionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void BlittableLogCompactionTest1()
session.Upsert(ref key1, ref value, 0, 0);
}

session.Compact(compactUntil, true);
compactUntil = session.Compact(compactUntil, true);
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

// Read 2000 keys - all should be present
Expand Down Expand Up @@ -112,7 +112,7 @@ public void BlittableLogCompactionTest2()
fht.Log.Flush(true);

var tail = fht.Log.TailAddress;
session.Compact(compactUntil, true);
compactUntil = session.Compact(compactUntil, true);
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);
Assert.IsTrue(fht.Log.TailAddress == tail);

Expand Down Expand Up @@ -165,7 +165,7 @@ public void BlittableLogCompactionTest3()
}

var tail = fht.Log.TailAddress;
session.Compact(compactUntil, true);
compactUntil = session.Compact(compactUntil, true);
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

// Read 2000 keys - all should be present
Expand Down Expand Up @@ -220,7 +220,7 @@ public void BlittableLogCompactionCustomFunctionsTest1()
var tail = fht.Log.TailAddress;

// Only leave records with even vfield1
session.Compact(compactUntil, true, default(EvenCompactionFunctions));
compactUntil = session.Compact(compactUntil, true, default(EvenCompactionFunctions));
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

// Read 2000 keys - all should be present
Expand Down
8 changes: 4 additions & 4 deletions cs/test/GenericLogCompactionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void GenericLogCompactionTest1()
session.Upsert(ref key1, ref value, 0, 0);
}

session.Compact(compactUntil, true);
compactUntil = session.Compact(compactUntil, true);
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

// Read 2000 keys - all should be present
Expand Down Expand Up @@ -116,7 +116,7 @@ public void GenericLogCompactionTest2()
fht.Log.Flush(true);

var tail = fht.Log.TailAddress;
session.Compact(compactUntil, true);
compactUntil = session.Compact(compactUntil, true);
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);
Assert.IsTrue(fht.Log.TailAddress == tail);

Expand Down Expand Up @@ -163,7 +163,7 @@ public void GenericLogCompactionTest3()
}
}

session.Compact(compactUntil, true);
compactUntil = session.Compact(compactUntil, true);
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

// Read 2000 keys - all should be present
Expand Down Expand Up @@ -211,7 +211,7 @@ public void GenericLogCompactionCustomFunctionsTest1()
session.Upsert(ref key1, ref value, 0, 0);
}

session.Compact(compactUntil, true, default(EvenCompactionFunctions));
compactUntil = session.Compact(compactUntil, true, default(EvenCompactionFunctions));
Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

// Read 2000 keys - all should be present
Expand Down
8 changes: 3 additions & 5 deletions cs/test/MemoryLogCompactionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,9 @@ public void MemoryLogCompactionTest1()

const int totalRecords = 2000;
var start = fht.Log.TailAddress;
long compactUntil = 0;

for (int i = 0; i < totalRecords; i++)
{
if (i == 1000)
compactUntil = fht.Log.TailAddress;

key.Span.Fill(i);
value.Span.Fill(i);
session.Upsert(key, value);
Expand All @@ -63,7 +59,9 @@ public void MemoryLogCompactionTest1()
session.Delete(key); // tombstone inserted
}

session.Compact(compactUntil, true);
// Compact 20% of log:
var compactUntil = fht.Log.BeginAddress + (fht.Log.TailAddress - fht.Log.BeginAddress) / 5;
compactUntil = session.Compact(compactUntil, true);

Assert.IsTrue(fht.Log.BeginAddress == compactUntil);

Expand Down

0 comments on commit 075a10a

Please sign in to comment.