Skip to content

Commit

Permalink
various changes from indexing work (#562)
Browse files Browse the repository at this point in the history
* various changes from indexing work
- Add BeginAddress and EndAddress to scan iterators
- Allow null deltaFileDevice in HybridLogCheckpointInfo.Recover
- Rename FoldOverSnapshot to useFoldOverCheckpoint
- Make TakeHybridLogCheckpoint overload call through to overload to avoid duplication
- Add default values to new scanDelta and recoverTo arguments
- Add missing checkpointManager.OnRecovery call to IntenralRecoverAsync
- Fix some comments

* Fix shortened name in test to work on Linux
  • Loading branch information
TedHartMS authored Oct 3, 2021
1 parent 9fef961 commit 0a32f1b
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 75 deletions.
10 changes: 10 additions & 0 deletions cs/src/core/Allocator/IFasterScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,15 @@ public interface IFasterScanIterator<Key, Value> : IDisposable
/// Next address
/// </summary>
long NextAddress { get; }

/// <summary>
/// The starting address of the scan
/// </summary>
long BeginAddress { get; }

/// <summary>
/// The ending address of the scan
/// </summary>
long EndAddress { get; }
}
}
7 changes: 6 additions & 1 deletion cs/src/core/Allocator/MemoryPageScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace FASTER.core
class MemoryPageScanIterator<Key, Value> : IFasterScanIterator<Key, Value>
{
readonly Record<Key, Value>[] page;
readonly int end;
readonly int start, end;
int offset;


Expand All @@ -25,13 +25,18 @@ public MemoryPageScanIterator(Record<Key, Value>[] page, int start, int end)
this.page = new Record<Key, Value>[page.Length];
Array.Copy(page, start, this.page, start, end - start);
offset = start - 1;
this.start = start;
this.end = end;
}

public long CurrentAddress => offset;

public long NextAddress => offset + 1;

public long BeginAddress => start;

public long EndAddress => end;

public void Dispose()
{
}
Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/Allocator/ScanIteratorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ public abstract class ScanIteratorBase
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// The starting address of the scan
/// </summary>
public long BeginAddress => beginAddress;

/// <summary>
/// The ending address of the scan
/// </summary>
public long EndAddress => endAddress;

/// <summary>
/// Constructor
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/WorkQueueLIFO.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void EnqueueAndTryWork(T work, bool asTask)

private void ProcessQueue()
{
// Process items in qork queue
// Process items in work queue
while (true)
{
while (_queue.TryPop(out var workItem))
Expand Down
3 changes: 1 addition & 2 deletions cs/src/core/Index/Common/CompletedOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;

namespace FASTER.core
{
Expand Down Expand Up @@ -94,7 +93,7 @@ public struct CompletedOutput<TKey, TValue, TInput, TOutput, TContext>
public ref TInput Input => ref inputContainer.Get();

/// <summary>
/// The output for this pending operation.
/// The output for this pending operation. It is the caller's responsibility to dispose this if necessary; <see cref="Dispose()"/> will not try to dispose this member.
/// </summary>
public TOutput Output;

Expand Down
43 changes: 22 additions & 21 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ internal enum OperationType
READ,
RMW,
UPSERT,
INSERT,
DELETE
}

Expand Down Expand Up @@ -266,7 +265,7 @@ public struct HybridLogRecoveryInfo
/// </summary>
public int nextVersion;
/// <summary>
/// Flushed logical address
/// Flushed logical address; indicates the latest immutable address on the main FASTER log at recovery time.
/// </summary>
public long flushedLogicalAddress;
/// <summary>
Expand Down Expand Up @@ -588,37 +587,39 @@ public HybridLogCheckpointInfo Transfer()
}

public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits,
bool scanDelta, long recoverTo)
bool scanDelta = false, long recoverTo = -1)
{
deltaFileDevice = checkpointManager.GetDeltaLogDevice(token);
deltaFileDevice.Initialize(-1);
if (deltaFileDevice.GetFileSize(0) > 0)
if (deltaFileDevice is not null)
{
deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
deltaLog.InitializeForReads();
info.Recover(token, checkpointManager, deltaLog, scanDelta, recoverTo);
}
else
{
info.Recover(token, checkpointManager, null);
deltaFileDevice.Initialize(-1);
if (deltaFileDevice.GetFileSize(0) > 0)
{
deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
deltaLog.InitializeForReads();
info.Recover(token, checkpointManager, deltaLog, scanDelta, recoverTo);
return;
}
}
info.Recover(token, checkpointManager, null);
}

public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits,
out byte[] commitCookie, bool scanDelta = false, long recoverTo = -1)
{
deltaFileDevice = checkpointManager.GetDeltaLogDevice(token);
deltaFileDevice.Initialize(-1);
if (deltaFileDevice.GetFileSize(0) > 0)
if (deltaFileDevice is not null)
{
deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
deltaLog.InitializeForReads();
info.Recover(token, checkpointManager, out commitCookie, deltaLog, scanDelta, recoverTo);
}
else
{
info.Recover(token, checkpointManager, out commitCookie);
deltaFileDevice.Initialize(-1);
if (deltaFileDevice.GetFileSize(0) > 0)
{
deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
deltaLog.InitializeForReads();
info.Recover(token, checkpointManager, out commitCookie, deltaLog, scanDelta, recoverTo);
return;
}
}
info.Recover(token, checkpointManager, out commitCookie);
}

public bool IsDefault()
Expand Down
3 changes: 1 addition & 2 deletions cs/src/core/Index/Common/HeapContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ namespace FASTER.core
public interface IHeapContainer<T> : IDisposable
{
/// <summary>
/// Get object
/// Get a reference to the contained object
/// </summary>
/// <returns></returns>
ref T Get();
}

Expand Down
25 changes: 9 additions & 16 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ public partial class FasterKV<Key, Value> : FasterBase,
{
internal readonly AllocatorBase<Key, Value> hlog;
private readonly AllocatorBase<Key, Value> readcache;
private readonly IFasterEqualityComparer<Key> comparer;

/// <summary>
/// Compares two keys
/// </summary>
protected readonly IFasterEqualityComparer<Key> comparer;

internal readonly bool UseReadCache;
private readonly CopyReadsToTail CopyReadsToTail;
private readonly bool FoldOverSnapshot;
private readonly bool UseFoldOverCheckpoint;
internal readonly int sectorSize;
private readonly bool WriteDefaultOnDelete;
internal bool RelaxedCPR;
Expand Down Expand Up @@ -145,7 +149,7 @@ public FasterKV(long size, LogSettings logSettings,
if (checkpointSettings.CheckpointManager == null)
disposeCheckpointManager = true;

FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver;
UseFoldOverCheckpoint = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver;
CopyReadsToTail = logSettings.CopyReadsToTail;

if (logSettings.ReadCacheSettings != null)
Expand Down Expand Up @@ -244,7 +248,7 @@ public FasterKV(long size, LogSettings logSettings,
/// operation such as growing the index). Use CompleteCheckpointAsync to wait completion.
/// </returns>
public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1)
=> TakeFullCheckpoint(out token, this.FoldOverSnapshot ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion);
=> TakeFullCheckpoint(out token, this.UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion);

/// <summary>
/// Initiate full checkpoint
Expand Down Expand Up @@ -353,17 +357,7 @@ public bool TakeIndexCheckpoint(out Guid token)
/// </param>
/// <returns>Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion.</returns>
public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1)
{
ISynchronizationTask backend;
if (FoldOverSnapshot)
backend = new FoldOverCheckpointTask();
else
backend = new SnapshotCheckpointTask();

var result = StartStateMachine(new HybridLogCheckpointStateMachine(backend, targetVersion));
token = _hybridLogCheckpointToken;
return result;
}
=> TakeHybridLogCheckpoint(out token, UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, tryIncremental: false, targetVersion);

/// <summary>
/// Initiate log-only checkpoint
Expand Down Expand Up @@ -631,7 +625,6 @@ internal Status ContextUpsert<Input, Output, Context, FasterSession>(ref Key key
while (internalStatus == OperationStatus.RETRY_NOW);

Status status;

if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND)
{
status = (Status)internalStatus;
Expand Down
14 changes: 8 additions & 6 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ internal OperationStatus InternalUpsert<Input, Output, Context, FasterSession>(
goto CreateNewRecord;
}

#region Entry latch operation
#region Entry latch operation
if (sessionCtx.phase != Phase.REST)
{
latchDestination = AcquireLatchUpsert(sessionCtx, bucket, ref status, ref latchOperation, ref entry, logicalAddress);
Expand Down Expand Up @@ -717,8 +717,8 @@ internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(
{
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
if (!recordInfo.Tombstone)
{
if (FoldOverSnapshot)
{
if (UseFoldOverCheckpoint)
{
Debug.Assert(recordInfo.Version == sessionCtx.version);
}
Expand Down Expand Up @@ -780,9 +780,9 @@ internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(
}
}

#endregion
#endregion

#region Create new record
#region Create new record
CreateNewRecord:
if (latchDestination != LatchDestination.CreatePendingContext)
{
Expand Down Expand Up @@ -1402,7 +1402,9 @@ internal void InternalContinuePendingReadCopyToTail<Input, Output, Context, Fast
{
Debug.Assert(RelaxedCPR || pendingContext.version == opCtx.version);

// If NoKey, we do not have the key in the initial call and must use the key from the satisfied request.
ref Key key = ref pendingContext.NoKey ? ref hlog.GetContextRecordKey(ref request) : ref pendingContext.key.Get();

byte* physicalAddress = request.record.GetValidPointer();
long logicalAddress = pendingContext.entry.Address;
ref RecordInfo oldRecordInfo = ref hlog.GetInfoFromBytePointer(physicalAddress);
Expand Down Expand Up @@ -2007,7 +2009,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes

#endregion

#region Split Index
#region Split Index
private void SplitBuckets(long hash)
{
long masked_bucket_index = hash & state[1 - resizeInfo.version].size_mask;
Expand Down
4 changes: 4 additions & 0 deletions cs/src/core/Index/FASTER/FASTERIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public FasterKVIterator(FasterKV<Key, Value> fht, Functions functions, long unti

public long NextAddress => enumerationPhase == 0 ? iter1.NextAddress : iter2.NextAddress;

public long BeginAddress => enumerationPhase == 0 ? iter1.BeginAddress : iter2.BeginAddress;

public long EndAddress => enumerationPhase == 0 ? iter1.EndAddress : iter2.EndAddress;

public void Dispose()
{
iter1?.Dispose();
Expand Down
5 changes: 4 additions & 1 deletion cs/src/core/Index/Interfaces/IFasterSession.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace FASTER.core
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

namespace FASTER.core
{
/// <summary>
/// Provides thread management and callback to checkpoint completion (called state machine).
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/Recovery/ICheckpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public interface ICheckpointManager : IDisposable
/// <param name="scanDelta"> whether or not to scan through the delta log to acquire latest entry</param>
/// <param name="recoverTo"> version upper bound to scan for in the delta log. Function will return the largest version metadata no greater than the given version.</param>
/// <returns>Metadata, or null if invalid</returns>
byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo);
byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta = false, long recoverTo = -1);

/// <summary>
/// Get list of index checkpoint tokens, in order of usage preference
Expand Down
2 changes: 2 additions & 0 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
// Recover session information
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;

checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid);
recoveredHLCInfo.Dispose();
}
Expand Down Expand Up @@ -348,6 +349,7 @@ await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogical
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;

checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid);
recoveredHLCInfo.Dispose();
}

Expand Down
Loading

0 comments on commit 0a32f1b

Please sign in to comment.