diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs index 78e2d4ae7..25671baf7 100644 --- a/cs/src/core/FasterLog/FasterLog.cs +++ b/cs/src/core/FasterLog/FasterLog.cs @@ -382,16 +382,39 @@ public long Enqueue(ReadOnlySpan entry) /// Enqueue raw pre-formatted bytes with headers to the log (in memory). /// /// Raw bytes to be enqueued to log + /// Do not auto-commit /// First logical address of added entries - public long UnsafeEnqueueRaw(ReadOnlySpan entryBytes) + public long UnsafeEnqueueRaw(ReadOnlySpan entryBytes, bool noCommit = false) { long logicalAddress; - while (!UnsafeTryEnqueueRaw(entryBytes, out logicalAddress)) + while (!UnsafeTryEnqueueRaw(entryBytes, noCommit, out logicalAddress)) Thread.Yield(); return logicalAddress; } + /// + /// Commit metadata only (no records added to main log) + /// + /// + public void UnsafeCommitMetadataOnly(FasterLogRecoveryInfo info) + { + lock (ongoingCommitRequests) + { + ongoingCommitRequests.Enqueue((info.UntilAddress, info)); + } + try + { + epoch.Resume(); + if (!allocator.ShiftReadOnlyToTail(out _, out _)) + CommitMetadataOnly(ref info); + } + finally + { + epoch.Suspend(); + } + } + /// /// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit /// @@ -560,9 +583,10 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) /// done. If it returns false, we need to retry. /// /// Entry bytes to be enqueued to log + /// Do not auto-commit /// Logical address of added entry /// Whether the append succeeded - public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan entryBytes, out long logicalAddress) + public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan entryBytes, bool noCommit, out long logicalAddress) { int length = entryBytes.Length; @@ -589,7 +613,7 @@ public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan entryBytes, out long l entryBytes.CopyTo(new Span((byte*)physicalAddress, length)); if (AutoRefreshSafeTailAddress) DoAutoRefreshSafeTailAddress(); epoch.Suspend(); - if (AutoCommit) Commit(); + if (AutoCommit && !noCommit) Commit(); return true; } @@ -1693,8 +1717,9 @@ public void TruncateUntilPageStart(long untilAddress) /// Whether to recover named iterator from latest commit (if exists). If false, iterator starts from beginAddress. /// Use single or double buffering /// Whether we scan uncommitted data + /// /// - public FasterLogScanIterator Scan(long beginAddress, long endAddress, string name = null, bool recover = true, ScanBufferingMode scanBufferingMode = ScanBufferingMode.DoublePageBuffering, bool scanUncommitted = false) + public FasterLogScanIterator Scan(long beginAddress, long endAddress, string name = null, bool recover = true, ScanBufferingMode scanBufferingMode = ScanBufferingMode.DoublePageBuffering, bool scanUncommitted = false, ILogger logger = null) { if (readOnlyMode) { @@ -1711,9 +1736,9 @@ public FasterLogScanIterator Scan(long beginAddress, long endAddress, string nam FasterLogScanIterator iter; if (recover && name != null && RecoveredIterators != null && RecoveredIterators.ContainsKey(name)) - iter = new FasterLogScanIterator(this, allocator, RecoveredIterators[name], endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted); + iter = new FasterLogScanIterator(this, allocator, RecoveredIterators[name], endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted, logger: logger); else - iter = new FasterLogScanIterator(this, allocator, beginAddress, endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted); + iter = new FasterLogScanIterator(this, allocator, beginAddress, endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted, logger: logger); if (name != null) { diff --git a/cs/src/core/FasterLog/FasterLogRecoveryInfo.cs b/cs/src/core/FasterLog/FasterLogRecoveryInfo.cs index 6cb82ec85..7f363d2f4 100644 --- a/cs/src/core/FasterLog/FasterLogRecoveryInfo.cs +++ b/cs/src/core/FasterLog/FasterLogRecoveryInfo.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Text; namespace FASTER.core { @@ -104,7 +105,9 @@ public void Initialize(BinaryReader reader) Iterators = new Dictionary(); for (int i = 0; i < iteratorCount; i++) { - Iterators.Add(reader.ReadString(), reader.ReadInt64()); + int len = reader.ReadInt32(); + byte[] bytes = reader.ReadBytes(len); + Iterators.Add(Encoding.UTF8.GetString(bytes), reader.ReadInt64()); } } @@ -185,7 +188,9 @@ public readonly byte[] ToByteArray() { foreach (var kvp in Iterators) { - writer.Write(kvp.Key); + var bytes = Encoding.UTF8.GetBytes(kvp.Key); + writer.Write(bytes.Length); + writer.Write(bytes); writer.Write(kvp.Value); } } @@ -206,7 +211,7 @@ public int SerializedSize() if (Iterators != null) { foreach (var kvp in Iterators) - iteratorSize += kvp.Key.Length + sizeof(long); + iteratorSize += sizeof(int) + Encoding.UTF8.GetByteCount(kvp.Key) + sizeof(long); } return sizeof(int) + 4 * sizeof(long) + iteratorSize + sizeof(int) + (Cookie?.Length ?? 0); diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 3b6a42147..682c1858d 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -133,6 +133,25 @@ internal RecoveryOptions(long headAddress, long fuzzyRegionStartAddress, bool un } } + /// + /// Log File info + /// + public struct LogFileInfo + { + /// + /// Snapshot file end address (start address is always 0) + /// + public long snapshotFileEndAddress; + /// + /// Hybrid log file start address + /// + public long hybridLogFileStartAddress; + /// + /// Hybrid log file end address + /// + public long hybridLogFileEndAddress; + } + public partial class FasterKV : FasterBase, IFasterKV { /// @@ -168,12 +187,18 @@ public long GetLatestCheckpointVersion() /// /// /// - public long GetSnapshotFileSizes(Guid token) + public LogFileInfo GetLogFileSize(Guid token) { using var current = new HybridLogCheckpointInfo(); current.Recover(token, checkpointManager, hlog.LogPageSizeBits, out var _, false); - return current.info.finalLogicalAddress; + long snapshotDeviceOffset = hlog.GetPage(current.info.snapshotStartFlushedLogicalAddress) << hlog.LogPageSizeBits; + return new LogFileInfo + { + snapshotFileEndAddress = current.info.snapshotFinalLogicalAddress - snapshotDeviceOffset, + hybridLogFileStartAddress = hlog.GetPage(current.info.beginAddress) << hlog.LogPageSizeBits, + hybridLogFileEndAddress = current.info.flushedLogicalAddress + }; } /// @@ -185,7 +210,7 @@ public long GetIndexFileSize(Guid token) { IndexCheckpointInfo recoveredICInfo = new IndexCheckpointInfo(); recoveredICInfo.Recover(token, checkpointManager); - return (long)recoveredICInfo.info.num_ht_bytes; + return (long)(recoveredICInfo.info.num_ht_bytes + recoveredICInfo.info.num_ofb_bytes); } private void GetClosestHybridLogCheckpointInfo( diff --git a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs index ec97181e0..13b99025f 100644 --- a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs +++ b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs @@ -437,13 +437,13 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr TimeStamp = DateTime.UtcNow }); - // It is up to the allocator to make sure no reads are issued to segments before they are written + // Lazily cache the blob entry for the segment being read if (!this.blobs.TryGetValue(segmentId, out BlobEntry blobEntry)) { - var nonLoadedBlob = this.pageBlobDirectory.GetPageBlobClient(this.GetSegmentBlobName(segmentId)); - var exception = new InvalidOperationException("Attempt to read a non-loaded segment"); - this.BlobManager?.HandleStorageError(nameof(ReadAsync), exception.Message, nonLoadedBlob.Default?.Name, exception, false, true); - throw exception; + var blobClients = this.pageBlobDirectory.GetPageBlobClient(this.GetSegmentBlobName(segmentId)); + var entry = new BlobEntry(blobClients, blobClients.Default.GetProperties().Value.ETag, this); + this.blobs.TryAdd(segmentId, entry); + blobEntry = this.blobs[segmentId]; } this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id)