diff --git a/cs/src/core/Device/FixedPool.cs b/cs/src/core/Device/FixedPool.cs index e917c6eb2..a2b75f9c2 100644 --- a/cs/src/core/Device/FixedPool.cs +++ b/cs/src/core/Device/FixedPool.cs @@ -29,7 +29,7 @@ public FixedPool(int size, Func creator) for (int i = 0; i < size; i++) { if (disposed) - throw new FasterException("Disposed"); + throw new FasterException("Accessing a disposed handle in device"); var val = owners[i]; if (val == 0) diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index 05feaab63..8d1fde608 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -37,6 +37,7 @@ public unsafe class LocalStorageDevice : StorageDeviceBase private readonly bool useIoCompletionPort; private readonly ConcurrentQueue results; private static uint sectorSize = 0; + private bool _disposed; /// /// Number of pending reads on device @@ -105,6 +106,8 @@ protected internal LocalStorageDevice(string filename, #endif ThrottleLimit = 120; this.useIoCompletionPort = useIoCompletionPort; + this._disposed = false; + if (useIoCompletionPort) { ThreadPool.GetMaxThreads(out int workerThreads, out _); @@ -329,6 +332,7 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs /// public override void Dispose() { + _disposed = true; foreach (var logHandle in logHandles.Values) logHandle.Dispose(); @@ -445,7 +449,15 @@ protected SafeFileHandle GetOrAddHandle(int _segmentId) { return h; } - return logHandles.GetOrAdd(_segmentId, segmentId => CreateHandle(segmentId)); + if (_disposed) return null; + var result = logHandles.GetOrAdd(_segmentId, segmentId => CreateHandle(segmentId)); + if (_disposed) + { + foreach (var logHandle in logHandles.Values) + logHandle.Dispose(); + return null; + } + return result; } private SafeFileHandle CreateHandle(int segmentId) diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 1ba809171..30b773d65 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -26,6 +26,8 @@ public sealed class ManagedLocalStorageDevice : StorageDeviceBase /// private int numPending = 0; + private bool _disposed; + /// /// /// @@ -44,6 +46,7 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, if (!Directory.Exists(path)) Directory.CreateDirectory(path); + this._disposed = false; this.preallocateFile = preallocateFile; this.deleteOnClose = deleteOnClose; logHandles = new SafeConcurrentDictionary, FixedPool)>(); @@ -260,10 +263,11 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs } /// - /// + /// Close device /// public override void Dispose() { + _disposed = true; foreach (var entry in logHandles) { entry.Value.Item1.Dispose(); @@ -343,7 +347,24 @@ private Stream CreateWriteHandle(int segmentId) private (FixedPool, FixedPool) GetOrAddHandle(int _segmentId) { - return logHandles.GetOrAdd(_segmentId, e => AddHandle(e)); + if (logHandles.TryGetValue(_segmentId, out var h)) + { + return h; + } + var result = logHandles.GetOrAdd(_segmentId, e => AddHandle(e)); + + if (_disposed) + { + // If disposed, dispose the fixed pools and return the (disposed) result + foreach (var entry in logHandles) + { + entry.Value.Item1.Dispose(); + entry.Value.Item2.Dispose(); + if (deleteOnClose) + File.Delete(GetSegmentName(entry.Key)); + } + } + return result; } /// diff --git a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs index 55acc1a01..53baf2b78 100644 --- a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs +++ b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs @@ -24,6 +24,7 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa private SectorAlignedBufferPool bufferPool; private IDevice singleLogCommitDevice; + private bool _disposed; /// /// Next commit number @@ -48,6 +49,7 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, IChec this.overwriteLogCommits = overwriteLogCommits; this.removeOutdated = removeOutdated; + this._disposed = false; deviceFactory.Initialize(checkpointNamingScheme.BaseName()); } @@ -76,6 +78,8 @@ public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMet { var device = NextCommitDevice(); + if (device == null) return; + // Two phase to ensure we write metadata in single Write operation using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); @@ -93,6 +97,7 @@ public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMet /// public void Dispose() { + _disposed = true; singleLogCommitDevice?.Dispose(); singleLogCommitDevice = null; } @@ -109,8 +114,16 @@ public byte[] GetCommitMetadata(long commitNum) IDevice device; if (overwriteLogCommits) { + if (_disposed) return null; if (singleLogCommitDevice == null) + { singleLogCommitDevice = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum)); + if (_disposed) + { + singleLogCommitDevice?.Dispose(); + singleLogCommitDevice = null; + } + } device = singleLogCommitDevice; } else @@ -118,6 +131,7 @@ public byte[] GetCommitMetadata(long commitNum) device = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum)); this.commitNum = commitNum + 1; } + if (device == null) return null; ReadInto(device, 0, out byte[] writePad, sizeof(int)); @@ -139,8 +153,17 @@ private IDevice NextCommitDevice() { if (overwriteLogCommits) { + if (_disposed) return null; if (singleLogCommitDevice == null) + { singleLogCommitDevice = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum)); + if (_disposed) + { + singleLogCommitDevice?.Dispose(); + singleLogCommitDevice = null; + return null; + } + } return singleLogCommitDevice; } diff --git a/cs/test/FasterLogTests.cs b/cs/test/FasterLogTests.cs index e83d10bc7..2a97a7515 100644 --- a/cs/test/FasterLogTests.cs +++ b/cs/test/FasterLogTests.cs @@ -13,6 +13,33 @@ namespace FASTER.test { + [TestFixture] + internal class FasterLogStandAloneTests + { + + [Test] + public void TestDisposeReleasesFileLocksWithInprogressCommit() + { + string commitPath = TestContext.CurrentContext.TestDirectory + "/" + TestContext.CurrentContext.Test.Name + "/"; + DirectoryInfo di = Directory.CreateDirectory(commitPath); + IDevice device = Devices.CreateLogDevice(commitPath + "testDisposeReleasesFileLocksWithInprogressCommit.log", preallocateFile: true, deleteOnClose: false); + FasterLog fasterLog = new FasterLog(new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.PerEntry }); + Assert.IsTrue(fasterLog.TryEnqueue(new byte[100], out long beginAddress)); + fasterLog.Commit(spinWait: false); + fasterLog.Dispose(); + device.Dispose(); + while (true) + { + try + { + di.Delete(recursive: true); + break; + } + catch { } + } + } + } + [TestFixture] internal class FasterLogTests {