Skip to content

Commit

Permalink
[C#] Support concurrent device dispose and async commit (#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc authored Mar 9, 2021
1 parent 1eca35c commit fe3935e
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cs/src/core/Device/FixedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public FixedPool(int size, Func<T> creator)
for (int i = 0; i < size; i++)
{
if (disposed)
throw new FasterException("Disposed");
throw new FasterException("Accessing a disposed handle in device");

var val = owners[i];
if (val == 0)
Expand Down
14 changes: 13 additions & 1 deletion cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public unsafe class LocalStorageDevice : StorageDeviceBase
private readonly bool useIoCompletionPort;
private readonly ConcurrentQueue<SimpleAsyncResult> results;
private static uint sectorSize = 0;
private bool _disposed;

/// <summary>
/// Number of pending reads on device
Expand Down Expand Up @@ -105,6 +106,8 @@ protected internal LocalStorageDevice(string filename,
#endif
ThrottleLimit = 120;
this.useIoCompletionPort = useIoCompletionPort;
this._disposed = false;

if (useIoCompletionPort)
{
ThreadPool.GetMaxThreads(out int workerThreads, out _);
Expand Down Expand Up @@ -329,6 +332,7 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs
/// </summary>
public override void Dispose()
{
_disposed = true;
foreach (var logHandle in logHandles.Values)
logHandle.Dispose();

Expand Down Expand Up @@ -445,7 +449,15 @@ protected SafeFileHandle GetOrAddHandle(int _segmentId)
{
return h;
}
return logHandles.GetOrAdd(_segmentId, segmentId => CreateHandle(segmentId));
if (_disposed) return null;
var result = logHandles.GetOrAdd(_segmentId, segmentId => CreateHandle(segmentId));
if (_disposed)
{
foreach (var logHandle in logHandles.Values)
logHandle.Dispose();
return null;
}
return result;
}

private SafeFileHandle CreateHandle(int segmentId)
Expand Down
25 changes: 23 additions & 2 deletions cs/src/core/Device/ManagedLocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public sealed class ManagedLocalStorageDevice : StorageDeviceBase
/// </summary>
private int numPending = 0;

private bool _disposed;

/// <summary>
///
/// </summary>
Expand All @@ -44,6 +46,7 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false,
if (!Directory.Exists(path))
Directory.CreateDirectory(path);

this._disposed = false;
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
logHandles = new SafeConcurrentDictionary<int, (FixedPool<Stream>, FixedPool<Stream>)>();
Expand Down Expand Up @@ -260,10 +263,11 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs
}

/// <summary>
///
/// Close device
/// </summary>
public override void Dispose()
{
_disposed = true;
foreach (var entry in logHandles)
{
entry.Value.Item1.Dispose();
Expand Down Expand Up @@ -343,7 +347,24 @@ private Stream CreateWriteHandle(int segmentId)

private (FixedPool<Stream>, FixedPool<Stream>) GetOrAddHandle(int _segmentId)
{
return logHandles.GetOrAdd(_segmentId, e => AddHandle(e));
if (logHandles.TryGetValue(_segmentId, out var h))
{
return h;
}
var result = logHandles.GetOrAdd(_segmentId, e => AddHandle(e));

if (_disposed)
{
// If disposed, dispose the fixed pools and return the (disposed) result
foreach (var entry in logHandles)
{
entry.Value.Item1.Dispose();
entry.Value.Item2.Dispose();
if (deleteOnClose)
File.Delete(GetSegmentName(entry.Key));
}
}
return result;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
private SectorAlignedBufferPool bufferPool;

private IDevice singleLogCommitDevice;
private bool _disposed;

/// <summary>
/// Next commit number
Expand All @@ -48,6 +49,7 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, IChec

this.overwriteLogCommits = overwriteLogCommits;
this.removeOutdated = removeOutdated;
this._disposed = false;

deviceFactory.Initialize(checkpointNamingScheme.BaseName());
}
Expand Down Expand Up @@ -76,6 +78,8 @@ public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMet
{
var device = NextCommitDevice();

if (device == null) return;

// Two phase to ensure we write metadata in single Write operation
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
Expand All @@ -93,6 +97,7 @@ public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMet
/// <inheritdoc />
public void Dispose()
{
_disposed = true;
singleLogCommitDevice?.Dispose();
singleLogCommitDevice = null;
}
Expand All @@ -109,15 +114,24 @@ public byte[] GetCommitMetadata(long commitNum)
IDevice device;
if (overwriteLogCommits)
{
if (_disposed) return null;
if (singleLogCommitDevice == null)
{
singleLogCommitDevice = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
if (_disposed)
{
singleLogCommitDevice?.Dispose();
singleLogCommitDevice = null;
}
}
device = singleLogCommitDevice;
}
else
{
device = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
this.commitNum = commitNum + 1;
}
if (device == null) return null;


ReadInto(device, 0, out byte[] writePad, sizeof(int));
Expand All @@ -139,8 +153,17 @@ private IDevice NextCommitDevice()
{
if (overwriteLogCommits)
{
if (_disposed) return null;
if (singleLogCommitDevice == null)
{
singleLogCommitDevice = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
if (_disposed)
{
singleLogCommitDevice?.Dispose();
singleLogCommitDevice = null;
return null;
}
}
return singleLogCommitDevice;
}

Expand Down
27 changes: 27 additions & 0 deletions cs/test/FasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,33 @@

namespace FASTER.test
{
[TestFixture]
internal class FasterLogStandAloneTests
{

[Test]
public void TestDisposeReleasesFileLocksWithInprogressCommit()
{
string commitPath = TestContext.CurrentContext.TestDirectory + "/" + TestContext.CurrentContext.Test.Name + "/";
DirectoryInfo di = Directory.CreateDirectory(commitPath);
IDevice device = Devices.CreateLogDevice(commitPath + "testDisposeReleasesFileLocksWithInprogressCommit.log", preallocateFile: true, deleteOnClose: false);
FasterLog fasterLog = new FasterLog(new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.PerEntry });
Assert.IsTrue(fasterLog.TryEnqueue(new byte[100], out long beginAddress));
fasterLog.Commit(spinWait: false);
fasterLog.Dispose();
device.Dispose();
while (true)
{
try
{
di.Delete(recursive: true);
break;
}
catch { }
}
}
}

[TestFixture]
internal class FasterLogTests
{
Expand Down

0 comments on commit fe3935e

Please sign in to comment.