Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Remove CheckpointType from CheckpointSettings (v2) #614

Merged
merged 3 commits into from
Dec 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cs/benchmark/FasterSpanByteYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ internal FasterSpanByteYcsbBenchmark(KeySpanByte[] i_keys_, KeySpanByte[] t_keys
if (testLoader.Options.UseSmallMemoryLog)
store = new FasterKV<SpanByte, SpanByte>
(testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true, PageSizeBits = 22, SegmentSizeBits = 26, MemorySizeBits = 26 },
new CheckpointSettings { CheckPointType = CheckpointType.Snapshot, CheckpointDir = testLoader.BackupPath });
new CheckpointSettings { CheckpointDir = testLoader.BackupPath });
else
store = new FasterKV<SpanByte, SpanByte>
(testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true, MemorySizeBits = 35 },
new CheckpointSettings { CheckPointType = CheckpointType.Snapshot, CheckpointDir = testLoader.BackupPath });
new CheckpointSettings { CheckpointDir = testLoader.BackupPath });
}

internal void Dispose()
Expand Down
4 changes: 2 additions & 2 deletions cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ internal FASTER_YcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoade
if (testLoader.Options.UseSmallMemoryLog)
store = new FasterKV<Key, Value>
(testLoader.MaxKey / 4, new LogSettings { LogDevice = device, PreallocateLog = true, PageSizeBits = 25, SegmentSizeBits = 30, MemorySizeBits = 28 },
new CheckpointSettings { CheckPointType = CheckpointType.Snapshot, CheckpointDir = testLoader.BackupPath });
new CheckpointSettings { CheckpointDir = testLoader.BackupPath });
else
store = new FasterKV<Key, Value>
(testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true },
new CheckpointSettings { CheckPointType = CheckpointType.Snapshot, CheckpointDir = testLoader.BackupPath });
new CheckpointSettings { CheckpointDir = testLoader.BackupPath });
}

internal void Dispose()
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/ClassRecoveryDurability/Storedb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public bool InitAndRecover()

public Guid Checkpoint()
{
db.TakeFullCheckpoint(out Guid token);
db.TakeFullCheckpoint(out Guid token, CheckpointType.Snapshot);
db.CompleteCheckpointAsync().GetAwaiter().GetResult();
return token;
}
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/SumStore/RecoveryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private void PeriodicCheckpoints()
{
Thread.Sleep(checkpointInterval);

fht.TakeFullCheckpoint(out Guid token);
fht.TakeFullCheckpoint(out Guid token, CheckpointType.Snapshot);

fht.CompleteCheckpointAsync().GetAwaiter().GetResult();

Expand Down
1 change: 0 additions & 1 deletion cs/remote/src/FASTER.server/Servers/ServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public void GetSettings(out LogSettings logSettings, out CheckpointSettings chec

checkpointSettings = new CheckpointSettings
{
CheckPointType = CheckpointType.Snapshot,
CheckpointDir = CheckpointDir ?? (LogDir + "/Store/checkpoints"),
RemoveOutdated = true,
};
Expand Down
3 changes: 1 addition & 2 deletions cs/samples/StoreAsyncApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ static void Main()
var objlog = Devices.CreateLogDevice(path + "hlog.obj.log", deleteOnClose: true);

var logSettings = new LogSettings { LogDevice = log, ObjectLogDevice = objlog };
var checkpointSettings = new CheckpointSettings { CheckpointDir = path, CheckPointType = CheckpointType.FoldOver };
var serializerSettings = new SerializerSettings<CacheKey, CacheValue> { keySerializer = () => new CacheKeySerializer(), valueSerializer = () => new CacheValueSerializer() };

faster = new FasterKV<CacheKey, CacheValue>(1L << 20, logSettings, checkpointSettings, serializerSettings);
faster = new FasterKV<CacheKey, CacheValue>(1L << 20, logSettings, serializerSettings: serializerSettings);

const int NumParallelTasks = 1;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
Expand Down
5 changes: 0 additions & 5 deletions cs/src/core/Index/Common/CheckpointSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ public class CheckpointSettings
/// </summary>
public ICheckpointManager CheckpointManager = null;

/// <summary>
/// Type of checkpoint
/// </summary>
public CheckpointType CheckPointType = CheckpointType.Snapshot;

/// <summary>
/// Use specified directory for storing and retrieving checkpoints
/// using local storage device.
Expand Down
32 changes: 0 additions & 32 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public partial class FasterKV<Key, Value> : FasterBase,

internal readonly bool UseReadCache;
private readonly CopyReadsToTail CopyReadsToTail;
private readonly bool UseFoldOverCheckpoint;
internal readonly int sectorSize;
private readonly bool WriteDefaultOnDelete;
internal bool RelaxedCPR;
Expand Down Expand Up @@ -149,7 +148,6 @@ public FasterKV(long size, LogSettings logSettings,
if (checkpointSettings.CheckpointManager == null)
disposeCheckpointManager = true;

UseFoldOverCheckpoint = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver;
CopyReadsToTail = logSettings.CopyReadsToTail;

if (logSettings.ReadCacheSettings != null)
Expand Down Expand Up @@ -231,23 +229,6 @@ public FasterKV(long size, LogSettings logSettings,
systemState = SystemState.Make(Phase.REST, 1);
}

/// <summary>
/// Initiate full checkpoint
/// </summary>
/// <param name="token">Checkpoint token</param>
/// <param name="targetVersion">
/// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
/// than current version. Actual new version may have version number greater than supplied number. If the supplied
/// number is -1, checkpoint will unconditionally create a new version.
/// </param>
/// <returns>
/// Whether we successfully initiated the checkpoint (initiation may
/// fail if we are already taking a checkpoint or performing some other
/// operation such as growing the index). Use CompleteCheckpointAsync to wait completion.
/// </returns>
public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1)
=> TakeFullCheckpoint(out token, this.UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion);

/// <summary>
/// Initiate full checkpoint
/// </summary>
Expand Down Expand Up @@ -344,19 +325,6 @@ public bool TakeIndexCheckpoint(out Guid token)
return (success, token);
}

/// <summary>
/// Initiate log-only checkpoint
/// </summary>
/// <param name="token">Checkpoint token</param>
/// <param name="targetVersion">
/// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
/// than current version. Actual new version may have version number greater than supplied number. If the supplied
/// number is -1, checkpoint will unconditionally create a new version.
/// </param>
/// <returns>Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion.</returns>
public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1)
=> TakeHybridLogCheckpoint(out token, UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, tryIncremental: false, targetVersion);

/// <summary>
/// Initiate log-only checkpoint
/// </summary>
Expand Down
73 changes: 52 additions & 21 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1993,6 +1993,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes

long newLogicalAddress, newPhysicalAddress;
bool copyToReadCache = !noReadCache && UseReadCache;

if (copyToReadCache)
{
BlockAllocateReadCache(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
Expand All @@ -2005,7 +2006,32 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
readcache.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key, ref input, ref value,
ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, Constants.kInvalidAddress); // We do not expose readcache addresses
ref recordInfo, Constants.kInvalidAddress, out long lockContext); // We do not expose readcache addresses

var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = tag;
updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
updatedEntry.Pending = entry.Pending;
updatedEntry.Tentative = false;
updatedEntry.ReadCache = copyToReadCache;

var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word,
entry.word);
if (foundEntry.word != entry.word)
{
if (!copyToReadCache) hlog.GetInfo(newPhysicalAddress).SetInvalid();
return OperationStatus.RETRY_NOW;
}
else
{
fasterSession.PostSingleWriter(ref key, ref input, ref value,
ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, Constants.kInvalidAddress, lockContext); // We do not expose readcache addresses
return OperationStatus.SUCCESS;
}
}
else
{
Expand All @@ -2019,29 +2045,34 @@ ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref
hlog.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key, ref input, ref value,
ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, newLogicalAddress);
}

ref recordInfo, newLogicalAddress, out long lockContext);

var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = tag;
updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
updatedEntry.Pending = entry.Pending;
updatedEntry.Tentative = false;
updatedEntry.ReadCache = copyToReadCache;
var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = tag;
updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
updatedEntry.Pending = entry.Pending;
updatedEntry.Tentative = false;
updatedEntry.ReadCache = copyToReadCache;

var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word,
entry.word);
if (foundEntry.word != entry.word)
{
if (!copyToReadCache) hlog.GetInfo(newPhysicalAddress).SetInvalid();
return OperationStatus.RETRY_NOW;
var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word,
entry.word);
if (foundEntry.word != entry.word)
{
if (!copyToReadCache) hlog.GetInfo(newPhysicalAddress).SetInvalid();
return OperationStatus.RETRY_NOW;
}
else
{
fasterSession.PostSingleWriter(ref key, ref input, ref value,
ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, newLogicalAddress, lockContext);
return OperationStatus.SUCCESS;
}
}
else
return OperationStatus.SUCCESS;

#endregion
}

Expand Down
26 changes: 0 additions & 26 deletions cs/src/core/Index/Interfaces/IFasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,6 @@ ClientSession<Key, Value, Input, Output, Context, IFunctions<Key, Value, Input,
/// <returns>Whether the request succeeded</returns>
bool GrowIndex();

/// <summary>
/// Initiate full (index + log) checkpoint of FASTER
/// </summary>
/// <param name="token">Token describing checkpoint</param>
/// <param name="targetVersion">
/// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
/// than current version. Actual new version may have version number greater than supplied number. If the supplied
/// number is -1, checkpoint will unconditionally create a new version.
/// </param>
/// <returns>Whether we successfully initiated the checkpoint (initiation may fail if we are already taking a checkpoint or performing some other
/// operation such as growing the index). Use CompleteCheckpointAsync to await completion.</returns>
/// <remarks>Uses the checkpoint type specified in the <see cref="CheckpointSettings"/></remarks>
bool TakeFullCheckpoint(out Guid token, long targetVersion = -1);

/// <summary>
/// Initiate full (index + log) checkpoint of FASTER
/// </summary>
Expand Down Expand Up @@ -112,18 +98,6 @@ ClientSession<Key, Value, Input, Output, Context, IFunctions<Key, Value, Input,
/// Await the task to complete checkpoint, if initiated successfully</returns>
public ValueTask<(bool success, Guid token)> TakeIndexCheckpointAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Initiate checkpoint of FASTER log only (not index)
/// </summary>
/// <param name="token">Token describing checkpoint</param>
/// <param name="targetVersion">
/// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
/// than current version. Actual new version may have version number greater than supplied number. If the supplied
/// number is -1, checkpoint will unconditionally create a new version.
/// </param>
/// <returns>Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to await completion.</returns>
bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1);

/// <summary>
/// Take asynchronous checkpoint of FASTER log only (not index)
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions cs/test/AsyncLargeObjectTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task LargeObjectTest([Values]CheckpointType checkpointType)

fht1 = new (128,
new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 21, MemorySizeBits = 26 },
new CheckpointSettings { CheckpointDir = test_path, CheckPointType = checkpointType },
new CheckpointSettings { CheckpointDir = test_path },
new SerializerSettings<MyKey, MyLargeValue> { keySerializer = () => new MyKeySerializer(), valueSerializer = () => new MyLargeValueSerializer() }
);

Expand All @@ -60,7 +60,7 @@ public async Task LargeObjectTest([Values]CheckpointType checkpointType)
}
}

fht1.TakeFullCheckpoint(out Guid token);
fht1.TakeFullCheckpoint(out Guid token, checkpointType);
await fht1.CompleteCheckpointAsync();

fht1.Dispose();
Expand All @@ -72,7 +72,7 @@ public async Task LargeObjectTest([Values]CheckpointType checkpointType)

fht2 = new(128,
new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, PageSizeBits = 21, MemorySizeBits = 26 },
new CheckpointSettings { CheckpointDir = test_path, CheckPointType = checkpointType },
new CheckpointSettings { CheckpointDir = test_path },
new SerializerSettings<MyKey, MyLargeValue> { keySerializer = () => new MyKeySerializer(), valueSerializer = () => new MyLargeValueSerializer() }
);

Expand Down
8 changes: 4 additions & 4 deletions cs/test/AsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ public async Task AsyncRecoveryTest1(CheckpointType checkpointType)
fht1 = new FasterKV<AdId, NumClicks>
(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 10, MemorySizeBits = 13 },
checkpointSettings: new CheckpointSettings { CheckpointDir = testPath, CheckPointType = checkpointType }
checkpointSettings: new CheckpointSettings { CheckpointDir = testPath }
);

fht2 = new FasterKV<AdId, NumClicks>
(128,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, PageSizeBits = 10, MemorySizeBits = 13 },
checkpointSettings: new CheckpointSettings { CheckpointDir = testPath, CheckPointType = checkpointType }
checkpointSettings: new CheckpointSettings { CheckpointDir = testPath }
);

int numOps = 5000;
Expand Down Expand Up @@ -73,12 +73,12 @@ public async Task AsyncRecoveryTest1(CheckpointType checkpointType)
}

// does not require session
fht1.TakeFullCheckpoint(out _);
fht1.TakeFullCheckpoint(out _, checkpointType);
await fht1.CompleteCheckpointAsync();

s2.CompletePending(true,false);

fht1.TakeFullCheckpoint(out Guid token);
fht1.TakeFullCheckpoint(out Guid token, checkpointType);
await fht1.CompleteCheckpointAsync();

s2.Dispose();
Expand Down
1 change: 0 additions & 1 deletion cs/test/GenericDiskDeleteTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public void Setup()
fht = new FasterKV<MyKey, MyValue>
(128,
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 14, PageSizeBits = 9 },
checkpointSettings: new CheckpointSettings { CheckPointType = CheckpointType.FoldOver },
serializerSettings: new SerializerSettings<MyKey, MyValue> { keySerializer = () => new MyKeySerializer(), valueSerializer = () => new MyValueSerializer() }
);
session = fht.For(new MyFunctionsDelete()).NewSession<MyFunctionsDelete>();
Expand Down
1 change: 0 additions & 1 deletion cs/test/GenericIterationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public void Setup()
fht = new FasterKV<MyKey, MyValue>
(128,
logSettings: new LogSettings { LogDevice = log, ObjectLogDevice = objlog, MutableFraction = 0.1, MemorySizeBits = 14, PageSizeBits = 9 },
checkpointSettings: new CheckpointSettings { CheckPointType = CheckpointType.FoldOver },
serializerSettings: new SerializerSettings<MyKey, MyValue> { keySerializer = () => new MyKeySerializer(), valueSerializer = () => new MyValueSerializer() }
);
session = fht.For(new MyFunctionsDelete()).NewSession<MyFunctionsDelete>();
Expand Down
Loading