Skip to content

Commit

Permalink
[C#] Compaction via session, update EntryCount, remove ShallowCopy (#359
Browse files Browse the repository at this point in the history
)

* Remove ShallowCopy since we now have serialize
* Provide default implementation for log compaction function for Memory<T> where T : unmanaged
* Moved Compact to session so that user does not need to implement Copy/InPlaceCopy functions.
* Updated EntryCount and DumpDistribution to reflect only hash entries >= BeginAddress.
* Improved testcase
* Marked Log.Compact as obsolete, provided default implementation for Copy/CopyInPlace
* Cleanup of code
  • Loading branch information
badrishc authored Oct 25, 2020
1 parent d879a88 commit 5d24bb8
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 196 deletions.
3 changes: 2 additions & 1 deletion cs/samples/StoreLogCompaction/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ static void Main()
}

s.CompletePending(true);

Console.WriteLine("Compacting log");
h.Log.Compact(h.Log.HeadAddress, true);
s.Compact(h.Log.HeadAddress, true);

Console.WriteLine("Log begin address: {0}", h.Log.BeginAddress);
Console.WriteLine("Log tail address: {0}", h.Log.TailAddress);
Expand Down
7 changes: 7 additions & 0 deletions cs/samples/StoreVarLenTypes/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ static void SpanByteSample()
s.Upsert(key, value);
}

using (IFasterScanIterator<SpanByte, SpanByte> iterator = store.Log.Scan(store.Log.BeginAddress, store.Log.TailAddress))
{
while (iterator.GetNext(out RecordInfo recordInfo, out SpanByte keyObj, out SpanByte valueObj))
{
Console.WriteLine("Key: " + keyObj.ToByteArray());
}
}
bool success = true;

r = new Random(100);
Expand Down
20 changes: 0 additions & 20 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,16 +1603,6 @@ private void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, objec
catch when (disposed) { }
}

/// <summary>
/// Shallow copy
/// </summary>
/// <param name="src"></param>
/// <param name="dst"></param>
public virtual void ShallowCopy(ref Key src, ref Key dst)
{
dst = src;
}

/// <summary>
/// Serialize to log
/// </summary>
Expand All @@ -1633,16 +1623,6 @@ public virtual void Serialize(ref Value src, long physicalAddress)
GetValue(physicalAddress) = src;
}

/// <summary>
/// Shallow copy
/// </summary>
/// <param name="src"></param>
/// <param name="dst"></param>
public virtual void ShallowCopy(ref Value src, ref Value dst)
{
dst = src;
}

internal string PrettyPrint(long address)
{
return $"{GetPage(address)}:{GetOffsetInPage(address)}";
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ protected override void AsyncReadRecordObjectsToMemory(long fromLogical, int num
/// <returns></returns>
protected override bool RetrievedFullRecord(byte* record, ref AsyncIOContext<Key, Value> ctx)
{
ShallowCopy(ref GetKey((long)record), ref ctx.key);
ShallowCopy(ref GetValue((long)record), ref ctx.value);
ctx.key = GetKey((long)record);
ctx.value = GetValue((long)record);
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -849,11 +849,11 @@ protected override bool RetrievedFullRecord(byte* record, ref AsyncIOContext<Key
{
if (!KeyHasObjects())
{
ShallowCopy(ref Unsafe.AsRef<Record<Key, Value>>(record).key, ref ctx.key);
ctx.key = Unsafe.AsRef<Record<Key, Value>>(record).key;
}
if (!ValueHasObjects())
{
ShallowCopy(ref Unsafe.AsRef<Record<Key, Value>>(record).value, ref ctx.value);
ctx.value = Unsafe.AsRef<Record<Key, Value>>(record).value;
}

if (!(KeyHasObjects() || ValueHasObjects()))
Expand Down
10 changes: 0 additions & 10 deletions cs/src/core/Allocator/VarLenBlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,6 @@ public override void Serialize(ref Value src, long physicalAddress)
ValueLength.Serialize(ref src, (byte*)ValueOffset(physicalAddress));
}

public override void ShallowCopy(ref Key src, ref Key dst)
{
throw new NotImplementedException();
}

public override void ShallowCopy(ref Value src, ref Value dst)
{
throw new NotImplementedException();
}

/// <summary>
/// Dispose memory allocator
/// </summary>
Expand Down
42 changes: 40 additions & 2 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#pragma warning disable 0162

using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand Down Expand Up @@ -577,6 +575,46 @@ public async ValueTask WaitForCommitAsync(CancellationToken token = default)
}
}

/// <summary>
/// Compact the log until specified address using current session, moving active records to the tail of the log.
/// </summary>
/// <param name="untilAddress">Compact log until this address</param>
/// <param name="shiftBeginAddress">Whether to shift begin address to untilAddress after compaction. To avoid
/// data loss on failure, set this to false, and shift begin address only after taking a checkpoint. This
/// ensures that records written to the tail during compaction are first made stable.</param>
public void Compact(long untilAddress, bool shiftBeginAddress)
{
Compact(untilAddress, shiftBeginAddress, default(DefaultCompactionFunctions<Key, Value>));
}

/// <summary>
/// Compact the log until specified address using current session, moving active records to the tail of the log.
/// </summary>
/// <param name="untilAddress">Compact log until this address</param>
/// <param name="shiftBeginAddress">Whether to shift begin address to untilAddress after compaction. To avoid
/// <param name="compactionFunctions">User provided compaction functions (see <see cref="ICompactionFunctions{Key, Value}"/>).</param>
/// data loss on failure, set this to false, and shift begin address only after taking a checkpoint. This
/// ensures that records written to the tail during compaction are first made stable.</param>
public void Compact<CompactionFunctions>(long untilAddress, bool shiftBeginAddress, CompactionFunctions compactionFunctions)
where CompactionFunctions : ICompactionFunctions<Key, Value>
{
if (!SupportAsync)
throw new FasterException("Do not perform compaction using a threadAffinitized session");

VariableLengthStructSettings<Key, Value> vl = null;

if (fht.hlog is VariableLengthBlittableAllocator<Key, Value> varLen)
{
vl = new VariableLengthStructSettings<Key, Value>
{
keyLength = varLen.KeyLength,
valueLength = varLen.ValueLength,
};
}

fht.Log.Compact(this, functions, compactionFunctions, untilAddress, vl, shiftBeginAddress);
}

/// <summary>
/// Resume session on current thread
/// Call SuspendThread before any async op
Expand Down
89 changes: 87 additions & 2 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#pragma warning disable 0162

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -36,7 +36,7 @@ public partial class FasterKV<Key, Value> : FasterBase,
internal void UseRelaxedCPR() => RelaxedCPR = true;

/// <summary>
/// Number of used entries in hash index
/// Number of active entries in hash index (does not correspond to total records, due to hash collisions)
/// </summary>
public long EntryCount => GetEntryCount();

Expand Down Expand Up @@ -641,5 +641,90 @@ private void UpdateVarLen(ref VariableLengthStructSettings<Key, Value> variableL
}
}
}

/// <summary>
/// Total number of valid entries in hash table
/// </summary>
/// <returns></returns>
private unsafe long GetEntryCount()
{
var version = resizeInfo.version;
var table_size_ = state[version].size;
var ptable_ = state[version].tableAligned;
long total_entry_count = 0;
long beginAddress = hlog.BeginAddress;

for (long bucket = 0; bucket < table_size_; ++bucket)
{
HashBucket b = *(ptable_ + bucket);
while (true)
{
for (int bucket_entry = 0; bucket_entry < Constants.kOverflowBucketIndex; ++bucket_entry)
if (b.bucket_entries[bucket_entry] >= beginAddress)
++total_entry_count;
if (b.bucket_entries[Constants.kOverflowBucketIndex] == 0) break;
b = *((HashBucket*)overflowBucketsAllocator.GetPhysicalAddress((b.bucket_entries[Constants.kOverflowBucketIndex])));
}
}
return total_entry_count;
}

private unsafe string DumpDistributionInternal(int version)
{
var table_size_ = state[version].size;
var ptable_ = state[version].tableAligned;
long total_record_count = 0;
long beginAddress = hlog.BeginAddress;
Dictionary<int, long> histogram = new Dictionary<int, long>();

for (long bucket = 0; bucket < table_size_; ++bucket)
{
List<int> tags = new List<int>();
int cnt = 0;
HashBucket b = *(ptable_ + bucket);
while (true)
{
for (int bucket_entry = 0; bucket_entry < Constants.kOverflowBucketIndex; ++bucket_entry)
{
if (b.bucket_entries[bucket_entry] >= beginAddress)
{
var x = default(HashBucketEntry);
x.word = b.bucket_entries[bucket_entry];
if (tags.Contains(x.Tag) && !x.Tentative)
throw new FasterException("Duplicate tag found in index");
tags.Add(x.Tag);
++cnt;
++total_record_count;
}
}
if (b.bucket_entries[Constants.kOverflowBucketIndex] == 0) break;
b = *((HashBucket*)overflowBucketsAllocator.GetPhysicalAddress((b.bucket_entries[Constants.kOverflowBucketIndex])));
}

if (!histogram.ContainsKey(cnt)) histogram[cnt] = 0;
histogram[cnt]++;
}

var distribution =
$"Number of hash buckets: {{{table_size_}}}\n" +
$"Total distinct hash-table entry count: {{{total_record_count}}}\n" +
$"Average #entries per hash bucket: {{{total_record_count / (double)table_size_:0.00}}}\n" +
$"Histogram of #entries per bucket:\n";

foreach (var kvp in histogram.OrderBy(e => e.Key))
{
distribution += $" {kvp.Key} : {kvp.Value}\n";
}

return distribution;
}

/// <summary>
/// Dumps the distribution of each non-empty bucket in the hash table.
/// </summary>
public string DumpDistribution()
{
return DumpDistributionInternal(resizeInfo.version);
}
}
}
90 changes: 0 additions & 90 deletions cs/src/core/Index/FASTER/FASTERBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
Expand Down Expand Up @@ -692,93 +690,5 @@ internal bool UpdateSlot(HashBucket* bucket, int entrySlot, long expected, long

return (found == expected);
}

/// <summary>
///
/// </summary>
/// <returns></returns>
protected virtual long GetEntryCount()
{
var version = resizeInfo.version;
var table_size_ = state[version].size;
var ptable_ = state[version].tableAligned;
long total_entry_count = 0;

for (long bucket = 0; bucket < table_size_; ++bucket)
{
HashBucket b = *(ptable_ + bucket);
while (true)
{
for (int bucket_entry = 0; bucket_entry < Constants.kOverflowBucketIndex; ++bucket_entry)
if (0 != b.bucket_entries[bucket_entry])
++total_entry_count;
if (b.bucket_entries[Constants.kOverflowBucketIndex] == 0) break;
b = *((HashBucket*)overflowBucketsAllocator.GetPhysicalAddress((b.bucket_entries[Constants.kOverflowBucketIndex])));
}
}
return total_entry_count;
}

/// <summary>
///
/// </summary>
/// <param name="version"></param>
protected virtual string DumpDistributionInternal(int version)
{
var table_size_ = state[version].size;
var ptable_ = state[version].tableAligned;
long total_record_count = 0;
Dictionary<int, long> histogram = new Dictionary<int, long>();

for (long bucket = 0; bucket < table_size_; ++bucket)
{
List<int> tags = new List<int>();
int cnt = 0;
HashBucket b = *(ptable_ + bucket);
while (true)
{
for (int bucket_entry = 0; bucket_entry < Constants.kOverflowBucketIndex; ++bucket_entry)
{
if (0 != b.bucket_entries[bucket_entry])
{
var x = default(HashBucketEntry);
x.word = b.bucket_entries[bucket_entry];
if (tags.Contains(x.Tag) && !x.Tentative)
throw new FasterException("Duplicate tag found in index");
tags.Add(x.Tag);
++cnt;
++total_record_count;
}
}
if (b.bucket_entries[Constants.kOverflowBucketIndex] == 0) break;
b = *((HashBucket*)overflowBucketsAllocator.GetPhysicalAddress((b.bucket_entries[Constants.kOverflowBucketIndex])));
}

if (!histogram.ContainsKey(cnt)) histogram[cnt] = 0;
histogram[cnt]++;
}

var distribution =
$"Number of hash buckets: {{{table_size_}}}\n" +
$"Total distinct hash-table entry count: {{{total_record_count}}}\n" +
$"Average #entries per hash bucket: {{{total_record_count / (double)table_size_:0.00}}}\n" +
$"Histogram of #entries per bucket:\n";
foreach (var kvp in histogram.OrderBy(e => e.Key))
{
distribution += $" {kvp.Key} : {kvp.Value}\n";
}

return distribution;
}

/// <summary>
/// Dumps the distribution of each non-empty bucket in the hash table.
/// </summary>
public string DumpDistribution()
{
return DumpDistributionInternal(resizeInfo.version);
}

}

}
3 changes: 1 addition & 2 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#pragma warning disable 0162
#define CPR

using System;
Expand Down Expand Up @@ -1024,7 +1023,7 @@ internal OperationStatus InternalDelete<Input, Output, Context, FasterSession>(
sessionCtx.version,
true, true, false,
latestLogicalAddress);
hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress));
hlog.Serialize(ref key, newPhysicalAddress);

var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = tag;
Expand Down
Loading

0 comments on commit 5d24bb8

Please sign in to comment.