Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Azure Storage v12 API #795

Merged
merged 31 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1ae884e
Upgrade to Azure Storage v12 - in progress. Based on Netherite code.
badrishc Feb 3, 2023
9251288
update
badrishc Feb 3, 2023
a58732d
more cleanup
badrishc Feb 4, 2023
fc6755f
getting there
badrishc Feb 4, 2023
96e39b3
Updates
badrishc Feb 5, 2023
b6cd76e
fixes
badrishc Feb 5, 2023
b3a8fd0
- Add RMW/Delete %s and logfile option to MemOnlyCache
TedHartMS Feb 6, 2023
059044d
update
badrishc Feb 6, 2023
1c5fcea
updates
badrishc Feb 6, 2023
09ef381
Cleanup and minor fixes
badrishc Feb 7, 2023
94c8b1e
update
badrishc Feb 7, 2023
b2ed43e
- Add --RunTime and -q(uiet) args to MemOnlyCache
TedHartMS Feb 7, 2023
10157a6
update testutils.deletedirectory
badrishc Feb 7, 2023
b7dfb99
WIP: add memory size args to MemOnlyCache
TedHartMS Feb 8, 2023
4424444
testing CI error
badrishc Feb 8, 2023
3185093
Merge remote-tracking branch 'origin/memonlycache-fix2' into badrishc…
badrishc Feb 8, 2023
f7b3e5b
test
badrishc Feb 8, 2023
d9b8bdf
add logging for ASD
badrishc Feb 8, 2023
eef29c0
no azure - no disposetests
badrishc Feb 8, 2023
c184a5e
incl azure
badrishc Feb 8, 2023
1c6b33c
re-enable delta log tests
badrishc Feb 10, 2023
2a42cbe
Cleanup
badrishc Feb 10, 2023
876ee6d
try re-enabling dispose tests
badrishc Feb 10, 2023
113e265
try a fix
badrishc Feb 10, 2023
bf33350
another fix
badrishc Feb 10, 2023
8db4cde
try
badrishc Feb 10, 2023
96466fa
code cleanup
badrishc Feb 11, 2023
cf8a5be
merge
badrishc Feb 11, 2023
fcb6317
Merge branch 'main' into badrishc/azure-storage-v12
badrishc Feb 13, 2023
32d6901
remove poor tests
badrishc Feb 13, 2023
da1722d
update nuspec
badrishc Feb 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
351 changes: 278 additions & 73 deletions cs/samples/MemOnlyCache/Program.cs

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions cs/samples/MemOnlyCache/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ public CacheKey(long key, int extraSize = 0)
public bool Equals(ref CacheKey k1, ref CacheKey k2) => k1.key == k2.key;

public int GetSize => sizeof(long) + extra.Length + 48; // heap size incl. ~48 bytes ref/array overheads

public override string ToString() => $"key {key}, len {extra.Length}";
}

public class CacheKeySerializer : BinaryObjectSerializer<CacheKey>
{
public override void Deserialize(out CacheKey obj)
{
obj = new CacheKey();
obj.key = reader.ReadInt64();
int size = reader.ReadInt32();
obj.extra = reader.ReadBytes(size);
}

public override void Serialize(ref CacheKey obj)
{
writer.Write(obj.key);
writer.Write(obj.extra.Length);
writer.Write(obj.extra);
}
}

public sealed class CacheValue
Expand All @@ -37,7 +57,29 @@ public CacheValue(int size, byte firstByte)
value[0] = firstByte;
}

public CacheValue(byte[] serializedValue)
{
value = serializedValue;
}

public int GetSize => value.Length + 48; // heap size for byte array incl. ~48 bytes ref/array overheads

public override string ToString() => $"value[0] {value[0]}, len {value.Length}";
}

public class CacheValueSerializer : BinaryObjectSerializer<CacheValue>
{
public override void Deserialize(out CacheValue obj)
{
int size = reader.ReadInt32();
obj = new CacheValue(reader.ReadBytes(size));
}

public override void Serialize(ref CacheValue obj)
{
writer.Write(obj.value.Length);
writer.Write(obj.value);
}
}

/// <summary>
Expand Down
12 changes: 7 additions & 5 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ public int EmptyPageCount
emptyPageCount = value;
headOffsetLagSize -= emptyPageCount;

// Lag addresses are the number of pages "behind" TailPageOffset (the tail in the circular buffer).
ReadOnlyLagAddress = (long)(LogMutableFraction * headOffsetLagSize) << LogPageSizeBits;
HeadOffsetLagAddress = (long)headOffsetLagSize << LogPageSizeBits;
}
Expand All @@ -942,6 +943,7 @@ public int EmptyPageCount
if (!prot) epoch.Resume();
try
{
// These shifts adjust via application of the lag addresses.
var _tailAddress = GetTailAddress();
PageAlignedShiftReadOnlyAddress(_tailAddress);
PageAlignedShiftHeadAddress(_tailAddress);
Expand Down Expand Up @@ -1329,10 +1331,6 @@ private void OnPagesClosedWorker()
long closeStartAddress = ClosedUntilAddress;
long closeEndAddress = OngoingCloseUntilAddress;

// If we are using a null storage device, we must also shift BeginAddress
if (IsNullDevice)
Utility.MonotonicUpdate(ref BeginAddress, closeEndAddress, out _);

if (ReadCache)
EvictCallback(closeStartAddress, closeEndAddress);

Expand All @@ -1346,6 +1344,10 @@ private void OnPagesClosedWorker()
if (OnEvictionObserver is not null)
MemoryPageScan(start, end, OnEvictionObserver);

// If we are using a null storage device, we must also shift BeginAddress
if (IsNullDevice)
Utility.MonotonicUpdate(ref BeginAddress, end, out _);

// If the end of the closing range is at the end of the page, free the page
if (end == closePageAddress + PageSize)
FreePage((int)(closePageAddress >> LogPageSizeBits));
Expand Down Expand Up @@ -1886,7 +1888,7 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje
{
if (errorCode != 0)
{
logger?.LogError($"AsyncGetFromDiskCallback error: {errorCode}");
logger?.LogError("AsyncGetFromDiskCallback error: {errorCode}", errorCode);
}

var result = (AsyncGetFromDiskResult<AsyncIOContext<Key, Value>>)context;
Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/Allocator/ScanIteratorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,5 +243,8 @@ public void Reset()
currentAddress = -1;
nextAddress = beginAddress;
}

/// <inheritdoc/>
public override string ToString() => $"BA {BeginAddress}, EA {EndAddress}, CA {CurrentAddress}, NA {NextAddress}";
}
}
25 changes: 18 additions & 7 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2045,8 +2045,12 @@ private void RestoreLatest(out Dictionary<string, long> iterators, out byte[] co
CommittedUntilAddress = long.MaxValue;
beginAddress = info.BeginAddress;
allocator.HeadAddress = long.MaxValue;
using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
scanIterator.ScanForwardForCommit(ref info);
try
{
using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
scanIterator.ScanForwardForCommit(ref info);
}
catch { }
}

// If until address is 0, that means info is still its default value and we haven't been able to recover
Expand Down Expand Up @@ -2130,9 +2134,13 @@ private void RestoreSpecificCommit(long requestedCommitNum, out Dictionary<strin
CommittedUntilAddress = long.MaxValue;
beginAddress = info.BeginAddress;
allocator.HeadAddress = long.MaxValue;
using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
if (!scanIterator.ScanForwardForCommit(ref info, requestedCommitNum))
throw new FasterException("requested commit num is not available");
try
{
using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
if (!scanIterator.ScanForwardForCommit(ref info, requestedCommitNum))
throw new FasterException("requested commit num is not available");
}
catch { }
}

// At this point, we should have found the exact commit num requested
Expand Down Expand Up @@ -2193,8 +2201,11 @@ private void RestoreSpecificCommit(long requestedCommitNum, out Dictionary<strin
CommittedUntilAddress = long.MaxValue;
beginAddress = info.BeginAddress;
allocator.HeadAddress = long.MaxValue;
using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
scanIterator.ScanForwardForCommit(ref info);
try
{
using var scanIterator = Scan(info.UntilAddress, long.MaxValue, recover: false);
scanIterator.ScanForwardForCommit(ref info);
} catch { }
}

// if until address is 0, that means info is still its default value and we haven't been able to recover
Expand Down
29 changes: 22 additions & 7 deletions cs/src/core/Index/Common/RecordInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ public static void WriteInfo(ref RecordInfo info, bool inNewVersion, bool tombst

private static bool IsIntermediateOrInvalidWord(long word) => (word & (kTentativeBitMask | kSealedBitMask | kValidBitMask)) != kValidBitMask;

private static bool IsInvalidOrSealedWord(long word) => (word & (kSealedBitMask | kValidBitMask)) != kValidBitMask;

public void CleanDiskImage()
{
// We ignore locks and temp bits for disk images
this.word &= ~(kExclusiveLockBitMask | kSharedLockMaskInWord | kTentativeBitMask | kSealedBitMask);
}

public bool TryLock(LockType lockType)
{
if (lockType == LockType.Shared)
Expand Down Expand Up @@ -269,14 +277,20 @@ public void TransferLocksFrom(ref RecordInfo source)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TransferReadLocksFromAndMarkSourceAtomic(ref RecordInfo source, bool allowXLock, bool seal, bool removeEphemeralLock)
public bool CopyReadLocksFromAndMarkSourceAtomic(ref RecordInfo source, bool allowXLock, bool seal, bool removeEphemeralLock)
{
// This is called when tranferring read locks from the read cache or Lock Table to a tentative log record.
// This is called when transferring read locks from the read cache or Lock Table to a tentative log record. This does not remove
// locks from the source record because if they exist it means other threads have the record locked and must be allowed to
// unlock it (and observe the 'false' return of that unlock due to the Seal/Invalid, and then go chase the record where it is now).
Debug.Assert(this.Tentative, "Must only transfer locks to a tentative recordInfo");
Debug.Assert(!this.IsLockedExclusive, "Must only transfer readlocks");
Debug.Assert((word & (kExclusiveLockBitMask | kSharedLockMaskInWord)) != kExclusiveLockBitMask, "Must only transfer readlocks");
for (; ; Thread.Yield())
{
long expected_word = source.word;

// If this is invalid or sealed, someone else won the race.
if (IsInvalidOrSealedWord(expected_word))
return false;
var new_word = expected_word;

// Fail if there is an established XLock. Having both X and S locks means the other thread is still in the read-lock draining portion
Expand All @@ -296,11 +310,12 @@ public bool TransferReadLocksFromAndMarkSourceAtomic(ref RecordInfo source, bool
if (removeEphemeralLock)
new_word -= kSharedLockIncrement;

// Update the source record; this ensures we atomically transfer the lock count while setting the mark bit.
// Update the source record; this ensures we atomically copy the lock count while setting the mark bit.
// If that succeeds, then we update our own word.
if (expected_word == Interlocked.CompareExchange(ref source.word, new_word, expected_word))
{
this.word = (new_word & ~kSealedBitMask) | kValidBitMask;
this.word &= ~(kExclusiveLockBitMask | kSharedLockMaskInWord);
this.word |= new_word & (kExclusiveLockBitMask | kSharedLockMaskInWord);
return true;
}
}
Expand Down Expand Up @@ -352,6 +367,8 @@ public bool Tentative
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void ClearTentativeBitAtomic()
{
Debug.Assert(this.Tentative, "Should only ClearTentative a tentative record");

// Call this when locking or splicing may be done simultaneously
while (true)
{
Expand Down Expand Up @@ -421,8 +438,6 @@ public bool InNewVersion
}

public void SetDirtyAndModified() => word |= kDirtyBitMask | kModifiedBitMask;
public void SetModified() => word |= kModifiedBitMask;
public void ClearModified() => word &= (~kModifiedBitMask);
public void SetDirty() => word |= kDirtyBitMask;
public void SetTombstone() => word |= kTombstoneBitMask;
public void SetValid() => word |= kValidBitMask;
Expand Down
17 changes: 14 additions & 3 deletions cs/src/core/Index/FASTER/Implementation/BlockAllocate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,14 @@ void SaveAllocationForRetry<Input, Output, Context>(ref PendingContext<Input, Ou
ref var recordInfo = ref hlog.GetInfo(physicalAddress);
recordInfo.SetInvalid(); // so log scan will skip it

*(int*)Unsafe.AsPointer(ref hlog.GetValue(physicalAddress)) = allocatedSize;
if (logicalAddress < hlog.HeadAddress || allocatedSize < sizeof(RecordInfo) + sizeof(int))
{
pendingContext.retryNewLogicalAddress = Constants.kInvalidAddress;
return;
}

// Use Key in case we have ushort or byte Key/Value; blittable types do not 8-byte align the record size, and it's easier than figure out where Value starts.
*(int*)Unsafe.AsPointer(ref hlog.GetKey(physicalAddress)) = allocatedSize;
pendingContext.retryNewLogicalAddress = logicalAddress;
}

Expand All @@ -69,12 +76,16 @@ bool GetAllocationForRetry<Input, Output, Context>(ref PendingContext<Input, Out
{
// Use an earlier allocation from a failed operation, if possible.
newLogicalAddress = pendingContext.retryNewLogicalAddress;
newPhysicalAddress = 0;
pendingContext.retryNewLogicalAddress = 0;
if (newLogicalAddress < hlog.HeadAddress || newLogicalAddress <= minAddress)
{
newPhysicalAddress = 0;
return false;
}
newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
int recordSize = *(int*)Unsafe.AsPointer(ref hlog.GetValue(newPhysicalAddress));
int* len_ptr = (int*)Unsafe.AsPointer(ref hlog.GetKey(newPhysicalAddress));
int recordSize = *len_ptr;
*len_ptr = 0;
return recordSize >= minSize;
}
}
Expand Down
29 changes: 11 additions & 18 deletions cs/src/core/Index/FASTER/Implementation/ContinuePending.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ internal OperationStatus InternalContinuePendingRead<Input, Output, Context, Fas
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
ref RecordInfo srcRecordInfo = ref hlog.GetInfoFromBytePointer(request.record.GetValidPointer());
// We ignore locks and temp bits for disk images
srcRecordInfo.ClearLocks();
srcRecordInfo.Tentative = false;
srcRecordInfo.Unseal();
// Debug.Assert(!srcRecordInfo.IsIntermediate, "Should always retrieve a non-Tentative, non-Sealed record from disk");
srcRecordInfo.CleanDiskImage();

if (request.logicalAddress >= hlog.BeginAddress)
{
Expand Down Expand Up @@ -123,7 +119,7 @@ internal OperationStatus InternalContinuePendingRead<Input, Output, Context, Fas
finally
{
stackCtx.HandleNewRecordOnError(this);
EphemeralSUnlockAfterPendingIO(fasterSession, ctx, ref pendingContext, ref key, ref stackCtx, ref srcRecordInfo);
EphemeralSUnlock(fasterSession, ctx, ref pendingContext, ref key, ref stackCtx, ref srcRecordInfo);
}
} // end while (true)
}
Expand Down Expand Up @@ -175,12 +171,8 @@ internal OperationStatus InternalContinuePendingRMW<Input, Output, Context, Fast

byte* recordPointer = request.record.GetValidPointer();
ref var inputRIRef = ref hlog.GetInfoFromBytePointer(recordPointer);
// We ignore locks and temp bits for disk images
inputRIRef.ClearLocks();
inputRIRef.Tentative = false;
inputRIRef.Unseal();
RecordInfo inputRecordInfo = inputRIRef; // Not ref, as we don't want to write into request.record
// Debug.Assert(!inputRecordInfo.IsIntermediate, "Should always retrieve a non-Tentative, non-Sealed record from disk");
inputRIRef.CleanDiskImage();
RecordInfo inputRecordInfo = inputRIRef; // Not ref, as we don't want the operations below to write into request.record

OperationStackContext<Key, Value> stackCtx = new(comparer.GetHashCode64(ref key));
OperationStatus status;
Expand Down Expand Up @@ -337,7 +329,7 @@ internal OperationStatus InternalCopyToTailForCompaction<Input, Output, Context,
finally
{
stackCtx.HandleNewRecordOnError(this);
EphemeralSUnlockAfterPendingIO(fasterSession, currentCtx, ref pendingContext, ref key, ref stackCtx, ref srcRecordInfo);
EphemeralSUnlock(fasterSession, currentCtx, ref pendingContext, ref key, ref stackCtx, ref srcRecordInfo);
}
}
} while (HandleImmediateRetryStatus(status, currentCtx, currentCtx, fasterSession, ref pendingContext));
Expand Down Expand Up @@ -444,14 +436,15 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
long readcacheNewAddressBit = 0L;
if (copyToReadCache)
{
localLog = readcache;
readcacheNewAddressBit = Constants.kReadCacheBitMask;

// Spin to make sure newLogicalAddress is > hei.Address (the .PreviousAddress and CAS comparison value).
do
{
if (!BlockAllocateReadCache(allocatedSize, out newLogicalAddress, ref pendingContext, out _))
return OperationStatus.SUCCESS; // We don't slow down Reads to handle allocation failure in the read cache, but don't return StatusCode.CopiedRecordToReadCache
newPhysicalAddress = readcache.GetPhysicalAddress(newLogicalAddress);
localLog = readcache;
readcacheNewAddressBit = Constants.kReadCacheBitMask;

if (!VerifyInMemoryAddresses(ref stackCtx))
{
Expand All @@ -464,7 +457,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
} while (stackCtx.hei.IsReadCache && newLogicalAddress < stackCtx.hei.AbsoluteAddress);

newRecordInfo = ref WriteTentativeInfo(ref key, readcache, newPhysicalAddress, inNewVersion: false, tombstone: false, stackCtx.hei.Address);
stackCtx.newLogicalAddress = newLogicalAddress;
stackCtx.newLogicalAddress = newLogicalAddress | readcacheNewAddressBit;

upsertInfo.Address = Constants.kInvalidAddress; // We do not expose readcache addresses
advancedStatusCode |= StatusCode.CopiedRecordToReadCache;
Expand All @@ -478,7 +471,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
do
{
if (!BlockAllocate(allocatedSize, out newLogicalAddress, ref pendingContext, out OperationStatus status))
return status; // For CopyToTail, we do want to make sure the record is appended to the tail, so return the failing status.
return status; // For CopyToTail, we do want to make sure the record is appended to the tail, so return the failing status for retry
newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);

if (!VerifyInMemoryAddresses(ref stackCtx))
Expand All @@ -504,7 +497,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
if (!fasterSession.SingleWriter(ref key, ref input, ref value, ref localLog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
ref output, ref newRecordInfo, ref upsertInfo, reason))
{
// No SaveAlloc here, but TODO this record could be reused later.
// No SaveAlloc here, but TODO this record could be reused later if not a readcache record.
stackCtx.SetNewRecordInvalid(ref newRecordInfo);
return (upsertInfo.Action == UpsertAction.CancelOperation) ? OperationStatus.CANCELED : OperationStatus.SUCCESS;
}
Expand Down
Loading