diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index c5d567a63..beb109c0c 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -486,8 +486,8 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer else this.epoch = epoch; - settings.LogDevice.Initialize(1L << settings.SegmentSizeBits); - settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits); + settings.LogDevice.Initialize(1L << settings.SegmentSizeBits, epoch); + settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits, epoch); // Page size LogPageSizeBits = settings.PageSizeBits; @@ -870,7 +870,7 @@ public void ShiftReadOnlyToTail(out long tailAddress) tailAddress = GetTailAddress(); long localTailAddress = tailAddress; long currentReadOnlyOffset = ReadOnlyAddress; - if (MonotonicUpdate(ref ReadOnlyAddress, tailAddress, out long oldReadOnlyOffset)) + if (Utility.MonotonicUpdate(ref ReadOnlyAddress, tailAddress, out long oldReadOnlyOffset)) { epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(localTailAddress, false)); } @@ -882,7 +882,7 @@ public void ShiftReadOnlyToTail(out long tailAddress) /// public bool ShiftReadOnlyAddress(long newReadOnlyAddress) { - if (MonotonicUpdate(ref ReadOnlyAddress, newReadOnlyAddress, out long oldReadOnlyOffset)) + if (Utility.MonotonicUpdate(ref ReadOnlyAddress, newReadOnlyAddress, out long oldReadOnlyOffset)) { epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(newReadOnlyAddress, false)); return true; @@ -897,34 +897,33 @@ public bool ShiftReadOnlyAddress(long newReadOnlyAddress) public void ShiftBeginAddress(long newBeginAddress) { // First update the begin address - MonotonicUpdate(ref BeginAddress, newBeginAddress, out long oldBeginAddress); + Utility.MonotonicUpdate(ref BeginAddress, newBeginAddress, out long oldBeginAddress); // Then the head address - var h = MonotonicUpdate(ref HeadAddress, newBeginAddress, out long old); + var h = Utility.MonotonicUpdate(ref HeadAddress, newBeginAddress, out long old); // Finally the read-only address - var r = MonotonicUpdate(ref ReadOnlyAddress, newBeginAddress, out old); + var r = Utility.MonotonicUpdate(ref ReadOnlyAddress, newBeginAddress, out old); // Clean up until begin address epoch.BumpCurrentEpoch(() => { if (r) { - MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out long _old); - MonotonicUpdate(ref FlushedUntilAddress, newBeginAddress, out _old); + Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out long _old); + Utility.MonotonicUpdate(ref FlushedUntilAddress, newBeginAddress, out _old); } if (h) OnPagesClosed(newBeginAddress); - DeleteAddressRange(oldBeginAddress, newBeginAddress); + TruncateUntilAddress(newBeginAddress); }); } /// - /// Delete address range + /// Wraps when an allocator potentially has to interact with multiple devices /// - /// /// - protected virtual void DeleteAddressRange(long fromAddress, long toAddress) + protected virtual void TruncateUntilAddress(long toAddress) { - device.DeleteAddressRange(fromAddress, toAddress); + device.TruncateUntilAddress(toAddress); } /// @@ -935,7 +934,7 @@ protected virtual void DeleteAddressRange(long fromAddress, long toAddress) /// public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress, bool waitForPendingFlushComplete = false) { - if (MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress)) + if (Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress)) { Debug.WriteLine("SafeReadOnly shifted from {0:X} to {1:X}", oldSafeReadOnlyAddress, newSafeReadOnlyAddress); long startPage = oldSafeReadOnlyAddress >> LogPageSizeBits; @@ -964,7 +963,7 @@ public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress, bool waitForPendi /// public void OnPagesClosed(long newSafeHeadAddress) { - if (MonotonicUpdate(ref SafeHeadAddress, newSafeHeadAddress, out long oldSafeHeadAddress)) + if (Utility.MonotonicUpdate(ref SafeHeadAddress, newSafeHeadAddress, out long oldSafeHeadAddress)) { Debug.WriteLine("SafeHeadOffset shifted from {0:X} to {1:X}", oldSafeHeadAddress, newSafeHeadAddress); @@ -1020,7 +1019,7 @@ private void PageAlignedShiftReadOnlyAddress(long currentTailAddress) long currentReadOnlyAddress = ReadOnlyAddress; long pageAlignedTailAddress = currentTailAddress & ~PageSizeMask; long desiredReadOnlyAddress = (pageAlignedTailAddress - ReadOnlyLagAddress); - if (MonotonicUpdate(ref ReadOnlyAddress, desiredReadOnlyAddress, out long oldReadOnlyAddress)) + if (Utility.MonotonicUpdate(ref ReadOnlyAddress, desiredReadOnlyAddress, out long oldReadOnlyAddress)) { Debug.WriteLine("Allocate: Moving read-only offset from {0:X} to {1:X}", oldReadOnlyAddress, desiredReadOnlyAddress); epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(desiredReadOnlyAddress)); @@ -1050,7 +1049,7 @@ private void PageAlignedShiftHeadAddress(long currentTailAddress) if (ReadCache && (newHeadAddress > HeadAddress)) EvictCallback(HeadAddress, newHeadAddress); - if (MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress)) + if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress)) { Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress); epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress)); @@ -1075,7 +1074,7 @@ public long ShiftHeadAddress(long desiredHeadAddress) if (ReadCache && (newHeadAddress > HeadAddress)) EvictCallback(HeadAddress, newHeadAddress); - if (MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress)) + if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress)) { Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress); epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress)); @@ -1104,33 +1103,8 @@ protected void ShiftFlushedUntilAddress() if (update) { - MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress); - } - } - - - - /// - /// Used by several functions to update the variable to newValue. Ignores if newValue is smaller or - /// than the current value. - /// - /// - /// - /// - /// - private bool MonotonicUpdate(ref long variable, long newValue, out long oldValue) - { - oldValue = variable; - while (oldValue < newValue) - { - var foundValue = Interlocked.CompareExchange(ref variable, newValue, oldValue); - if (foundValue == oldValue) - { - return true; - } - oldValue = foundValue; + Utility.MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress); } - return false; } /// diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index ef1a9a773..e3339ff6b 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -153,11 +153,6 @@ protected override bool IsAllocated(int pageIndex) return values[pageIndex] != null; } - protected override void DeleteAddressRange(long fromAddress, long toAddress) - { - base.DeleteAddressRange(fromAddress, toAddress); - } - protected override void WriteAsync(long flushPage, IOCompletionCallback callback, PageAsyncFlushResult asyncResult) { WriteAsync((IntPtr)pointers[flushPage % BufferSize], diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index e5822429f..ccea80368 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -226,10 +226,10 @@ protected override bool IsAllocated(int pageIndex) return values[pageIndex] != null; } - protected override void DeleteAddressRange(long fromAddress, long toAddress) + protected override void TruncateUntilAddress(long toAddress) { - base.DeleteAddressRange(fromAddress, toAddress); - objectLogDevice.DeleteSegmentRange((int)(fromAddress >> LogSegmentSizeBits), (int)(toAddress >> LogSegmentSizeBits)); + base.TruncateUntilAddress(toAddress); + objectLogDevice.TruncateUntilAddress(toAddress); } protected override void WriteAsync(long flushPage, IOCompletionCallback callback, PageAsyncFlushResult asyncResult) diff --git a/cs/src/core/Allocator/VarLenBlittableAllocator.cs b/cs/src/core/Allocator/VarLenBlittableAllocator.cs index 3abd21c70..2499b4e9b 100644 --- a/cs/src/core/Allocator/VarLenBlittableAllocator.cs +++ b/cs/src/core/Allocator/VarLenBlittableAllocator.cs @@ -225,11 +225,6 @@ protected override bool IsAllocated(int pageIndex) return values[pageIndex] != null; } - protected override void DeleteAddressRange(long fromAddress, long toAddress) - { - base.DeleteAddressRange(fromAddress, toAddress); - } - protected override void WriteAsync(long flushPage, IOCompletionCallback callback, PageAsyncFlushResult asyncResult) { WriteAsync((IntPtr)pointers[flushPage % BufferSize], diff --git a/cs/src/core/Device/Devices.cs b/cs/src/core/Device/Devices.cs index 9428fd848..14a975445 100644 --- a/cs/src/core/Device/Devices.cs +++ b/cs/src/core/Device/Devices.cs @@ -13,14 +13,22 @@ namespace FASTER.core /// public static class Devices { + /// + /// This value is supplied for capacity when the device does not have a specified limit. + /// + public const long CAPACITY_UNSPECIFIED = -1; + private const string EMULATED_STORAGE_STRING = "UseDevelopmentStorage=true;"; + private const string TEST_CONTAINER = "test"; + /// /// Create a storage device for the log /// /// Path to file that will store the log (empty for null device) /// Whether we try to preallocate the file on creation /// Delete files on close + /// /// Device instance - public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false) + public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED) { if (string.IsNullOrWhiteSpace(logPath)) return new NullDevice(); @@ -30,12 +38,12 @@ public static IDevice CreateLogDevice(string logPath, bool preallocateFile = tru #if DOTNETCORE if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { - logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose); + logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity); } else #endif { - logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose); + logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity: capacity); } return logDevice; } diff --git a/cs/src/core/Device/IDevice.cs b/cs/src/core/Device/IDevice.cs index 692724548..8833a71b1 100644 --- a/cs/src/core/Device/IDevice.cs +++ b/cs/src/core/Device/IDevice.cs @@ -22,14 +22,44 @@ public interface IDevice string FileName { get; } /// - /// Initialize device + /// Returns the maximum capacity of the storage device, in number of bytes. + /// If returned CAPACITY_UNSPECIFIED, the storage device has no specfied capacity limit. /// - /// - void Initialize(long segmentSize); + long Capacity { get; } + /// + /// A device breaks up each logical log into multiple self-contained segments that are of the same size. + /// It is an atomic unit of data that cannot be partially present on a device (i.e. either the entire segment + /// is present or no data from the segment is present). Examples of this include files or named blobs. This + /// property returns the size of each segment. + /// + long SegmentSize { get; } - /* Segmented addressing API */ + /// + /// The index of the first segment present on this device + /// + int StartSegment { get; } + + /// + /// The index of the last segment present on this device + /// + int EndSegment { get; } + + /// + /// Initialize device. This function is used to pass optional information that may only be known after + /// FASTER initialization (whose constructor takes in IDevice upfront). Implementation are free to ignore + /// information if it does not need the supplied information. + /// + /// This is a bit of a hack. + /// + /// + /// + /// The instance of the epoch protection framework to use, if needed + /// + void Initialize(long segmentSize, LightEpoch epoch = null); + + /* Segmented addressing API */ /// /// Write /// @@ -52,13 +82,6 @@ public interface IDevice /// void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destinationAddress, uint readLength, IOCompletionCallback callback, IAsyncResult asyncResult); - /// - /// Delete segment range - /// - /// - /// - void DeleteSegmentRange(int fromSegment, int toSegment); - /* Direct addressing API */ /// @@ -82,11 +105,51 @@ public interface IDevice void ReadAsync(ulong alignedSourceAddress, IntPtr alignedDestinationAddress, uint aligned_read_length, IOCompletionCallback callback, IAsyncResult asyncResult); /// - /// Delete address range + /// Truncates the log until the given address. The truncated portion should no longer be accessed as the device is no longer responsible for + /// its maintenance, but physical deletion may not happen immediately. + /// + /// upper bound of truncated address + /// callback to invoke when truncation is complete + /// result to be passed to the callback + void TruncateUntilAddressAsync(long toAddress, AsyncCallback callback, IAsyncResult result); + + /// + /// Truncates the log until the given address. The truncated portion should no longer be accessed as the device is no longer responsible for + /// its maintenance, but physical deletion may not happen immediately. This version of the function can block. + /// + /// upper bound of truncated address + void TruncateUntilAddress(long toAddress); + + /// + /// Truncates the log until the given segment. Physical deletion of the given segments are guaranteed to have happened when the callback is invoked. + /// + /// the largest (in index) segment to truncate + /// callback to invoke when truncation is complete + /// result to be passed to the callback + void TruncateUntilSegmentAsync(int toSegment, AsyncCallback callback, IAsyncResult result); + + /// + /// Truncates the log until the given segment. Physical deletion of the given segments are guaranteed to have happened when the function returns. + /// This version of the function can block. + /// + /// the largest (in index) segment to truncate + void TruncateUntilSegment(int toSegment); + + /// + /// Removes a single segment from the device. This function should not normally be called. + /// Instead, use + /// + /// index of the segment to remov + /// callback to invoke when removal is complete + /// result to be passed to the callback + void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result); + + /// + /// Removes a single segment from the device. This function should not normally be called. + /// Instead, use /// - /// - /// - void DeleteAddressRange(long fromAddress, long toAddress); + /// index of the segment to remov + void RemoveSegment(int segment); /* Close */ diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index 1680e4a22..9818a9679 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -28,14 +28,47 @@ public class LocalStorageDevice : StorageDeviceBase /// /// /// - public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true) - : base(filename, GetSectorSize(filename)) + /// The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + public LocalStorageDevice(string filename, + bool preallocateFile = false, + bool deleteOnClose = false, + bool disableFileBuffering = true, + long capacity = Devices.CAPACITY_UNSPECIFIED) + : base(filename, GetSectorSize(filename), capacity) + { Native32.EnableProcessPrivileges(); this.preallocateFile = preallocateFile; this.deleteOnClose = deleteOnClose; this.disableFileBuffering = disableFileBuffering; logHandles = new SafeConcurrentDictionary(); + RecoverFiles(); + } + + private void RecoverFiles() + { + string[] comps = FileName.Split(Path.DirectorySeparatorChar); + string bareName = comps[comps.Length - 1]; + string directory = System.IO.Path.GetDirectoryName(FileName); + DirectoryInfo di = new DirectoryInfo(directory); + if (!di.Exists) return; + int prevSegmentId = -1; + foreach (FileInfo item in di.GetFiles(bareName + "*")) + { + // TODO(Tianyu): Depending on string parsing is bad. But what can one do when an entire cloud service API has no doc? + int segmentId = Int32.Parse(item.Name.Replace(bareName, "").Replace(".", "")); + if (segmentId != prevSegmentId + 1) + { + startSegment = segmentId; + + } + else + { + endSegment = segmentId; + } + prevSegmentId = segmentId; + } + // No need to populate map because logHandles use Open or create on files. } /// @@ -119,24 +152,35 @@ public override unsafe void WriteAsync(IntPtr sourceAddress, } } - /// - /// + /// /// - /// - /// - public override void DeleteSegmentRange(int fromSegment, int toSegment) + /// + public override void RemoveSegment(int segment) { - for (int i=fromSegment; i + /// + /// + /// + /// + /// + public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result) + { + RemoveSegment(segment); + callback(result); + } + + // TODO(Tianyu): It may be somewhat inefficient to use the default async calls from the base class when the underlying + // method is inheritly synchronous. But just for delete (which is called infrequently and off the critical path) such + // inefficiency is probably negligible. + /// /// /// diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index df1b81a6f..c17c6ed38 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -27,17 +27,46 @@ public class ManagedLocalStorageDevice : StorageDeviceBase /// /// /// - public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false) - : base(filename, GetSectorSize(filename)) + /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED) + : base(filename, GetSectorSize(filename), capacity) { pool = new SectorAlignedBufferPool(1, 1); this.preallocateFile = preallocateFile; this.deleteOnClose = deleteOnClose; logHandles = new ConcurrentDictionary(); + RecoverFiles(); } + private void RecoverFiles() + { + string directory = System.IO.Path.GetDirectoryName(FileName); + DirectoryInfo di = new DirectoryInfo(directory); + int prevSegmentId = -1; + foreach (FileInfo item in di.GetFiles(FileName + "*")) + { + Console.WriteLine(FileName); + // TODO(Tianyu): Depending on string parsing is bad. But what can one do when an entire cloud service API has no doc? + int segmentId = Int32.Parse(item.Name.Replace(FileName, "").Replace(".", "")); + Console.WriteLine(segmentId); + if (segmentId != prevSegmentId + 1) + { + startSegment = segmentId; + + } + else + { + endSegment = segmentId; + } + prevSegmentId = segmentId; + } + // No need to populate map because logHandles use Open or create on files. + } + + + class ReadCallbackWrapper { readonly IOCompletionCallback callback; @@ -139,22 +168,30 @@ public override unsafe void WriteAsync(IntPtr sourceAddress, } /// - /// + /// /// - /// - /// - public override void DeleteSegmentRange(int fromSegment, int toSegment) + /// + public override void RemoveSegment(int segment) { - for (int i=fromSegment; i + /// + /// + /// + /// + /// + public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result) + { + RemoveSegment(segment); + callback(result); + } + /// /// /// diff --git a/cs/src/core/Device/NullDevice.cs b/cs/src/core/Device/NullDevice.cs index 0b5b66a54..c8cca2cb4 100644 --- a/cs/src/core/Device/NullDevice.cs +++ b/cs/src/core/Device/NullDevice.cs @@ -14,7 +14,7 @@ public class NullDevice : StorageDeviceBase /// /// /// - public NullDevice() : base("null", 512) + public NullDevice() : base("null", 512, Devices.CAPACITY_UNSPECIFIED) { } @@ -62,16 +62,24 @@ public override unsafe void WriteAsync(IntPtr alignedSourceAddress, int segmentI } /// - /// + /// /// - /// - /// - public override void DeleteSegmentRange(int fromSegment, int toSegment) + /// + public override void RemoveSegment(int segment) { + // No-op } /// - /// + /// + /// + /// + /// + /// + public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result) => callback(result); + + /// + /// /// public override void Close() { diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs index 7b9749858..f89b3a1f3 100644 --- a/cs/src/core/Device/StorageDeviceBase.cs +++ b/cs/src/core/Device/StorageDeviceBase.cs @@ -18,6 +18,7 @@ namespace FASTER.core /// public abstract class StorageDeviceBase : IDevice { + /// /// /// @@ -28,6 +29,25 @@ public abstract class StorageDeviceBase : IDevice /// public string FileName { get; } + /// + /// + /// + public long Capacity { get; } + + /// + /// + /// + public int StartSegment { get { return startSegment; } } + + /// + /// + /// + public int EndSegment { get { return endSegment; } } + + /// + /// + /// + public long SegmentSize { get { return segmentSize; } } /// /// Segment size @@ -38,11 +58,24 @@ public abstract class StorageDeviceBase : IDevice private ulong segmentSizeMask; /// - /// + /// Instance of the epoch protection framework in the current system. + /// A device may have internal in-memory data structure that requires epoch protection under concurrent access. /// - /// - /// - public StorageDeviceBase(string filename, uint sectorSize) + protected LightEpoch epoch; + + /// + /// start and end segment corresponding to and . Subclasses are + /// allowed to modify these as needed. + /// + protected int startSegment = 0, endSegment = -1; + + /// + /// Initializes a new StorageDeviceBase + /// + /// Name of the file to use + /// The smallest unit of write of the underlying storage device (e.g. 512 bytes for a disk) + /// The maximal number of bytes this storage device can accommondate, or CAPAPCITY_UNSPECIFIED if there is no such limit + public StorageDeviceBase(string filename, uint sectorSize, long capacity) { FileName = filename; SectorSize = sectorSize; @@ -50,15 +83,22 @@ public StorageDeviceBase(string filename, uint sectorSize) segmentSize = -1; segmentSizeBits = 64; segmentSizeMask = ~0UL; + + Capacity = capacity; } /// /// Initialize device /// /// - public void Initialize(long segmentSize) + /// + public virtual void Initialize(long segmentSize, LightEpoch epoch = null) { + // TODO(Tianyu): Alternatively, we can adjust capacity based on the segment size: given a phsyical upper limit of capacity, + // we only make use of (Capacity / segmentSize * segmentSize) many bytes. + Debug.Assert(Capacity == -1 || Capacity % segmentSize == 0, "capacity must be a multiple of segment sizes"); this.segmentSize = segmentSize; + this.epoch = epoch; if (!Utility.IsPowerOfTwo(segmentSize)) { if (segmentSize != -1) @@ -83,10 +123,19 @@ public void Initialize(long segmentSize) /// public void WriteAsync(IntPtr alignedSourceAddress, ulong alignedDestinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult) { - var segment = segmentSizeBits < 64 ? alignedDestinationAddress >> segmentSizeBits : 0; + int segment = (int)(segmentSizeBits < 64 ? alignedDestinationAddress >> segmentSizeBits : 0); + + // If the device has bounded space, and we are writing a new segment, need to check whether an existing segment needs to be evicted. + if (Capacity != Devices.CAPACITY_UNSPECIFIED && Utility.MonotonicUpdate(ref endSegment, segment, out int oldEnd)) + { + // Attempt to update the stored range until there are enough space on the tier to accomodate the current logTail + int newStartSegment = endSegment - (int)(Capacity >> segmentSizeBits); + // Assuming that we still have enough physical capacity to write another segment, even if delete does not immediately free up space. + TruncateUntilSegmentAsync(newStartSegment, r => { }, null); + } WriteAsync( alignedSourceAddress, - (int)segment, + segment, alignedDestinationAddress & segmentSizeMask, numBytesToWrite, callback, asyncResult); } @@ -111,15 +160,95 @@ public void ReadAsync(ulong alignedSourceAddress, IntPtr alignedDestinationAddre } /// - /// + /// + /// + /// + /// + /// + public abstract void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result); + + /// + /// + /// By default the implementation calls into + /// + /// + public virtual void RemoveSegment(int segment) + { + ManualResetEventSlim completionEvent = new ManualResetEventSlim(false); + RemoveSegmentAsync(segment, r => completionEvent.Set(), null); + completionEvent.Wait(); + } + + /// + /// + /// + /// + /// + /// + public void TruncateUntilSegmentAsync(int toSegment, AsyncCallback callback, IAsyncResult result) + { + // Reset begin range to at least toAddress + if (!Utility.MonotonicUpdate(ref startSegment, toSegment, out int oldStart)) + { + // If no-op, invoke callback and return immediately + callback(result); + return; + } + CountdownEvent countdown = new CountdownEvent(toSegment - oldStart); + // This action needs to be epoch-protected because readers may be issuing reads to the deleted segment, unaware of the delete. + // Because of earlier compare-and-swap, the caller has exclusive access to the range [oldStartSegment, newStartSegment), and there will + // be no double deletes. + epoch.BumpCurrentEpoch(() => + { + for (int i = oldStart; i < toSegment; i++) + { + RemoveSegmentAsync(i, r => { + if (countdown.Signal()) + { + callback(r); + countdown.Dispose(); + } + }, result); + } + }); + } + + /// + /// + /// + /// + public void TruncateUntilSegment(int toSegment) + { + using (ManualResetEventSlim completionEvent = new ManualResetEventSlim(false)) + { + TruncateUntilSegmentAsync(toSegment, r => completionEvent.Set(), null); + completionEvent.Wait(); + } + } + + /// + /// + /// + /// + /// + /// + public virtual void TruncateUntilAddressAsync(long toAddress, AsyncCallback callback, IAsyncResult result) + { + // Truncate only up to segment boundary if address is not aligned + TruncateUntilSegmentAsync((int)toAddress >> segmentSizeBits, callback, result); + } + + /// + /// /// - /// /// - public void DeleteAddressRange(long fromAddress, long toAddress) + public virtual void TruncateUntilAddress(long toAddress) { - var fromSegment = segmentSizeBits < 64 ? fromAddress >> segmentSizeBits : 0; - var toSegment = segmentSizeBits < 64 ? toAddress >> segmentSizeBits : 0; - DeleteSegmentRange((int)fromSegment, (int)toSegment); + using (ManualResetEventSlim completionEvent = new ManualResetEventSlim(false)) + { + TruncateUntilAddressAsync(toAddress, r => completionEvent.Set(), null); + completionEvent.Wait(); + } } /// @@ -144,13 +273,6 @@ public void DeleteAddressRange(long fromAddress, long toAddress) /// public abstract void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destinationAddress, uint readLength, IOCompletionCallback callback, IAsyncResult asyncResult); - /// - /// - /// - /// - /// - public abstract void DeleteSegmentRange(int fromSegment, int toSegment); - /// /// /// diff --git a/cs/src/core/Device/TieredStorageDevice.cs b/cs/src/core/Device/TieredStorageDevice.cs new file mode 100644 index 000000000..1b6cbcf18 --- /dev/null +++ b/cs/src/core/Device/TieredStorageDevice.cs @@ -0,0 +1,184 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Diagnostics; +using System.Threading; +using System.ComponentModel; +using System.Collections.Concurrent; + +namespace FASTER.core +{ + /// + /// A logically composes multiple into a single storage device. It is assumed + /// that some are used as caches while there is one that is considered the commit point, i.e. when a write is completed + /// on the device, it is considered persistent. Reads are served from the closest device with available data. Writes are issued in parallel to + /// all devices + /// + class TieredStorageDevice : StorageDeviceBase + { + private readonly IList devices; + private readonly int commitPoint; + + // TODO(Tianyu): Not reasoning about what the sector size of a tiered storage should be when different tiers can have different sector sizes. + /// + /// Constructs a new TieredStorageDevice composed of the given devices. + /// + /// + /// The index of an IDevice in . When a write has been completed on the device, + /// the write is considered persistent. It is guaranteed that the callback in + /// will not be called until the write is completed on the commit point device. + /// + /// + /// List of devices to be used. The list should be given in order of hot to cold. Read is served from the + /// device with smallest index in the list that has the requested data + /// + // TODO(Tianyu): Recovering from a tiered device is potentially difficult, because we also need to recover their respective ranges. + public TieredStorageDevice(int commitPoint, IList devices) : base(ComputeFileString(devices, commitPoint), 512, ComputeCapacity(devices)) + { + Debug.Assert(commitPoint >= 0 && commitPoint < devices.Count, "commit point is out of range"); + // TODO(Tianyu): Should assert that passed in devices are not yet initialized. This is more challenging for recovering. + this.devices = devices; + this.commitPoint = commitPoint; + } + + /// + /// Constructs a new TieredStorageDevice composed of the given devices. + /// + /// + /// The index of an IDevice in devices. When a write has been completed on the device, + /// the write is considered persistent. It is guaranteed that the callback in + /// will not be called until the write is completed on commit point device and all previous tiers. + /// + /// + /// List of devices to be used. The list should be given in order of hot to cold. Read is served from the + /// device with smallest index in the list that has the requested data + /// + public TieredStorageDevice(int commitPoint, params IDevice[] devices) : this(commitPoint, (IList)devices) + { + } + + + // TODO(Tianyu): Unclear whether this is the right design. Should we allow different tiers different segment sizes? + public override void Initialize(long segmentSize, LightEpoch epoch) + { + base.Initialize(segmentSize, epoch); + + foreach (IDevice devices in devices) + { + devices.Initialize(segmentSize, epoch); + } + } + + public override void Close() + { + foreach (IDevice device in devices) + { + device.Close(); + } + } + + public override void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destinationAddress, uint readLength, IOCompletionCallback callback, IAsyncResult asyncResult) + { + // This device is epoch-protected and cannot be stale while the operation is in flight + IDevice closestDevice = devices[FindClosestDeviceContaining(segmentId)]; + // We can directly forward the address, because assuming an inclusive policy, all devices agree on the same address space. The only difference is that some segments may not + // be present for certain devices. + closestDevice.ReadAsync(segmentId, sourceAddress, destinationAddress, readLength, callback, asyncResult); + } + + public override unsafe void WriteAsync(IntPtr sourceAddress, int segmentId, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult) + { + + int startTier = FindClosestDeviceContaining(segmentId); + // TODO(Tianyu): Can you ever initiate a write that is after the commit point? Given FASTER's model of a read-only region, this will probably never happen. + Debug.Assert(startTier <= commitPoint, "Write should not elide the commit point"); + + var countdown = new CountdownEvent(commitPoint + 1); // number of devices to wait on + // Issue writes to all tiers in parallel + for (int i = startTier; i < devices.Count; i++) + { + if (i <= commitPoint) + { + + // All tiers before the commit point (incluisive) need to be persistent before the callback is invoked. + devices[i].WriteAsync(sourceAddress, segmentId, destinationAddress, numBytesToWrite, (e, n, o) => + { + // The last tier to finish invokes the callback + if (countdown.Signal()) + { + callback(e, n, o); + countdown.Dispose(); + } + + }, asyncResult); + } + else + { + // Otherwise, simply issue the write without caring about callbacks + devices[i].WriteAsync(sourceAddress, segmentId, destinationAddress, numBytesToWrite, (e, n, o) => { }, null); + } + } + } + + public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result) + { + int startTier = FindClosestDeviceContaining(segment); + var countdown = new CountdownEvent(devices.Count); + for(int i = startTier; i < devices.Count; i++) + { + devices[i].RemoveSegmentAsync(segment, r => + { + if (countdown.Signal()) + { + callback(r); + countdown.Dispose(); + } + }, result); + } + } + + private static long ComputeCapacity(IList devices) + { + long result = 0; + // The capacity of a tiered storage device is the sum of the capacity of its tiers + foreach (IDevice device in devices) + { + // Unless the last tier device has unspecified storage capacity, in which case the tiered storage also has unspecified capacity + if (device.Capacity == Devices.CAPACITY_UNSPECIFIED) + { + // TODO(Tianyu): Is this assumption too strong? + Debug.Assert(device == devices[devices.Count - 1], "Only the last tier storage of a tiered storage device can have unspecified capacity"); + return Devices.CAPACITY_UNSPECIFIED; + } + result = Math.Max(result, device.Capacity); + } + return result; + } + + // TODO(Tianyu): Is the notion of file name still relevant in a tiered storage device? + private static string ComputeFileString(IList devices, int commitPoint) + { + StringBuilder result = new StringBuilder(); + foreach (IDevice device in devices) + { + string formatString = "{0}, file name {1}, capacity {2} bytes;"; + string capacity = device.Capacity == Devices.CAPACITY_UNSPECIFIED ? "unspecified" : device.Capacity.ToString(); + result.AppendFormat(formatString, device.GetType().Name, device.FileName, capacity); + } + result.AppendFormat("commit point: {0} at tier {1}", devices[commitPoint].GetType().Name, commitPoint); + return result.ToString(); + } + + private int FindClosestDeviceContaining(int segment) + { + // Can use binary search, but 1) it might not be faster than linear on a array assumed small, and 2) C# built in does not guarantee first element is returned on duplicates. + // Therefore we are sticking to the simpler approach at first. + for (int i = 0; i < devices.Count; i++) + { + if (devices[i].StartSegment <= segment) return i; + } + // TODO(Tianyu): This exception should never be triggered if we enforce that the last tier has unbounded storage. + throw new ArgumentException("No such address exists"); + } + } +} diff --git a/cs/src/core/Utilities/Utility.cs b/cs/src/core/Utilities/Utility.cs index 57ab7168f..94360a7f2 100644 --- a/cs/src/core/Utilities/Utility.cs +++ b/cs/src/core/Utilities/Utility.cs @@ -223,5 +223,39 @@ internal static int Murmur3(int h) a ^= a >> 16; return (int)a; } + + /// + /// Updates the variable to newValue only if the current value is smaller than the new value. + /// + /// The variable to possibly replace + /// The value that replaces the variable if successful + /// The orignal value in the variable + /// if oldValue less than newValue + public static bool MonotonicUpdate(ref long variable, long newValue, out long oldValue) + { + do + { + oldValue = variable; + if (oldValue > newValue) return false; + } while (Interlocked.CompareExchange(ref variable, newValue, oldValue) != oldValue); + return true; + } + /// + /// Updates the variable to newValue only if the current value is smaller than the new value. + /// + /// The variable to possibly replace + /// The value that replaces the variable if successful + /// The orignal value in the variable + /// if oldValue less than or equal to newValue + public static bool MonotonicUpdate(ref int variable, int newValue, out int oldValue) + { + do + { + oldValue = variable; + if (oldValue >= newValue) return false; + } while (Interlocked.CompareExchange(ref variable, newValue, oldValue) != oldValue); + return true; + } + } } diff --git a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs index cf3491e57..8c874fb62 100644 --- a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs +++ b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs @@ -39,8 +39,9 @@ public class AzureStorageDevice : StorageDeviceBase /// True if the program should delete all blobs created on call to Close. False otherwise. /// The container is not deleted even if it was created in this constructor /// - public AzureStorageDevice(string connectionString, string containerName, string blobName, bool deleteOnClose = false) - : base(connectionString + "/" + containerName + "/" + blobName, PAGE_BLOB_SECTOR_SIZE) + /// The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + public AzureStorageDevice(string connectionString, string containerName, string blobName, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED) + : base(connectionString + "/" + containerName + "/" + blobName, PAGE_BLOB_SECTOR_SIZE, capacity) { CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString); CloudBlobClient client = storageAccount.CreateCloudBlobClient(); @@ -49,6 +50,34 @@ public AzureStorageDevice(string connectionString, string containerName, string blobs = new ConcurrentDictionary(); this.blobName = blobName; this.deleteOnClose = deleteOnClose; + RecoverBlobs(); + } + + private void RecoverBlobs() + { + int prevSegmentId = -1; + foreach (IListBlobItem item in container.ListBlobs(blobName)) + { + // TODO(Tianyu): Depending on string parsing is bad. But what can one do when an entire cloud service API has no doc? + string[] parts = item.Uri.Segments; + int segmentId = Int32.Parse(parts[parts.Length - 1].Replace(blobName, "")); + if (segmentId != prevSegmentId + 1) + { + startSegment = segmentId; + + } + else + { + endSegment = segmentId; + } + prevSegmentId = segmentId; + } + + for (int i = startSegment; i <= endSegment; i++) + { + bool ret = blobs.TryAdd(i, new BlobEntry(container.GetPageBlobReference(GetSegmentBlobName(i)))); + Debug.Assert(ret, "Recovery of blobs is single-threaded and should not yield any failure due to concurrency"); + } } /// @@ -70,16 +99,29 @@ public override void Close() } /// - /// Inherited + /// /// - public override void DeleteSegmentRange(int fromSegment, int toSegment) + /// + /// + /// + public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result) { - for (int i = fromSegment; i < toSegment; i++) + if (blobs.TryRemove(segment, out BlobEntry blob)) { - if (blobs.TryRemove(i, out BlobEntry blob)) + CloudPageBlob pageBlob = blob.GetPageBlob(); + pageBlob.BeginDelete(ar => { - blob.GetPageBlob().Delete(); - } + try + { + pageBlob.EndDelete(ar); + + } + catch (Exception) + { + // Can I do anything else other than printing out an error message? + } + callback(ar); + }, result); } } @@ -124,7 +166,7 @@ public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong desti BlobEntry entry = new BlobEntry(); if (blobs.TryAdd(segmentId, entry)) { - CloudPageBlob pageBlob = container.GetPageBlobReference(blobName + segmentId); + CloudPageBlob pageBlob = container.GetPageBlobReference(GetSegmentBlobName(segmentId)); // If segment size is -1, which denotes absence, we request the largest possible blob. This is okay because // page blobs are not backed by real pages on creation, and the given size is only a the physical limit of // how large it can grow to. @@ -175,5 +217,10 @@ private static unsafe void WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAdd callback(0, numBytesToWrite, ovNative); }, asyncResult); } + + private string GetSegmentBlobName(int segmentId) + { + return blobName + segmentId; + } } } diff --git a/cs/src/devices/AzureStorageDevice/BlobEntry.cs b/cs/src/devices/AzureStorageDevice/BlobEntry.cs index e70d20b6d..2ad46bcf7 100644 --- a/cs/src/devices/AzureStorageDevice/BlobEntry.cs +++ b/cs/src/devices/AzureStorageDevice/BlobEntry.cs @@ -21,15 +21,27 @@ class BlobEntry private ConcurrentQueue> pendingWrites; private int waitingCount; + /// + /// Creates a new BlobEntry to hold the given pageBlob. The pageBlob must already be created. + /// + /// + public BlobEntry(CloudPageBlob pageBlob) + { + this.pageBlob = pageBlob; + if (pageBlob == null) + { + // Only need to allocate a queue when we potentially need to asynchronously create a blob + pendingWrites = new ConcurrentQueue>(); + waitingCount = 0; + } + + } /// /// Creates a new BlobEntry, does not initialize a page blob. Use /// for actual creation. /// - public BlobEntry() + public BlobEntry() : this(null) { - pageBlob = null; - pendingWrites = new ConcurrentQueue>(); - waitingCount = 0; } /// diff --git a/cs/test/BasicDiskFASTERTests.cs b/cs/test/BasicDiskFASTERTests.cs index 0d8ad497f..3b4019107 100644 --- a/cs/test/BasicDiskFASTERTests.cs +++ b/cs/test/BasicDiskFASTERTests.cs @@ -34,6 +34,25 @@ public void PageBlobWriteRead() if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests"))) TestDeviceWriteRead(new AzureStorageDevice(EMULATED_STORAGE_STRING, TEST_CONTAINER, "BasicDiskFASTERTests", false)); } + [Test] + public void TieredWriteRead() + { + IDevice tested; + IDevice localDevice = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\BasicDiskFASTERTests.log", deleteOnClose: true, capacity: 1 << 30); + if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests"))) + { + IDevice cloudDevice = new AzureStorageDevice(EMULATED_STORAGE_STRING, TEST_CONTAINER, "BasicDiskFASTERTests", false); + tested = new TieredStorageDevice(1, localDevice, cloudDevice); + } + else + { + // If no Azure is enabled, just use another disk + IDevice localDevice2 = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\BasicDiskFASTERTests2.log", deleteOnClose: true, capacity: 1 << 30); + tested = new TieredStorageDevice(1, localDevice, localDevice2); + + } + TestDeviceWriteRead(tested); + } void TestDeviceWriteRead(IDevice log) {