Skip to content

Commit

Permalink
Merge branch 'master' into async-support
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc authored Oct 31, 2019
2 parents fc47dbd + 69a8103 commit c4fba23
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 111 deletions.
85 changes: 52 additions & 33 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public unsafe abstract partial class AllocatorBase<Key, Value> : IDisposable
/// </summary>
private PageOffset TailPageOffset;

/// <summary>
/// Whether log is disposed
/// </summary>
private bool disposed = false;

/// <summary>
/// Number of pending reads
/// </summary>
Expand Down Expand Up @@ -586,6 +591,8 @@ public void Release()
/// </summary>
public virtual void Dispose()
{
disposed = true;

TailPageOffset.Page = 0;
TailPageOffset.Offset = 0;
SafeReadOnlyAddress = 0;
Expand Down Expand Up @@ -1499,41 +1506,42 @@ private void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, NativeOverl
/// <param name="overlap"></param>
private void AsyncFlushPageCallback(uint errorCode, uint numBytes, NativeOverlapped* overlap)
{
if (errorCode != 0)
try
{
Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
}
if (errorCode != 0)
{
Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
}

/*
if (DateTime.Now - last > TimeSpan.FromSeconds(7))
{
last = DateTime.Now;
errorCode = 1;
Console.WriteLine("Disk error");
}*/

// Set the page status to flushed
PageAsyncFlushResult<Empty> result = (PageAsyncFlushResult<Empty>)Overlapped.Unpack(overlap).AsyncResult;

// Set the page status to flushed
PageAsyncFlushResult<Empty> result = (PageAsyncFlushResult<Empty>)Overlapped.Unpack(overlap).AsyncResult;
if (Interlocked.Decrement(ref result.count) == 0)
{
if (errorCode != 0)
{
errorList.Add(result.fromAddress);
}
Utility.MonotonicUpdate(ref PageStatusIndicator[result.page % BufferSize].LastFlushedUntilAddress, result.untilAddress, out _);
ShiftFlushedUntilAddress();
result.Free();
}

if (Interlocked.Decrement(ref result.count) == 0)
{
if (errorCode != 0)
var _flush = FlushedUntilAddress;
if (GetOffsetInPage(_flush) > 0 && PendingFlush[GetPage(_flush) % BufferSize].RemoveAdjacent(_flush, out PageAsyncFlushResult<Empty> request))
{
errorList.Add(result.fromAddress);
WriteAsync(request.fromAddress >> LogPageSizeBits, AsyncFlushPageCallback, request);
}
Utility.MonotonicUpdate(ref PageStatusIndicator[result.page % BufferSize].LastFlushedUntilAddress, result.untilAddress, out _);
ShiftFlushedUntilAddress();
result.Free();
}

var _flush = FlushedUntilAddress;
if (GetOffsetInPage(_flush) > 0 && PendingFlush[GetPage(_flush) % BufferSize].RemoveAdjacent(_flush, out PageAsyncFlushResult<Empty> request))
catch
{
WriteAsync(request.fromAddress >> LogPageSizeBits, AsyncFlushPageCallback, request);
if (!disposed)
throw;
}
finally
{
Overlapped.Free(overlap);
}

Overlapped.Free(overlap);
}

/// <summary>
Expand All @@ -1544,18 +1552,29 @@ private void AsyncFlushPageCallback(uint errorCode, uint numBytes, NativeOverlap
/// <param name="overlap"></param>
private void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, NativeOverlapped* overlap)
{
if (errorCode != 0)
try
{
Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
}
if (errorCode != 0)
{
Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
}

PageAsyncFlushResult<Empty> result = (PageAsyncFlushResult<Empty>)Overlapped.Unpack(overlap).AsyncResult;
PageAsyncFlushResult<Empty> result = (PageAsyncFlushResult<Empty>)Overlapped.Unpack(overlap).AsyncResult;

if (Interlocked.Decrement(ref result.count) == 0)
if (Interlocked.Decrement(ref result.count) == 0)
{
result.Free();
}
}
catch
{
result.Free();
if (!disposed)
throw;
}
finally
{
Overlapped.Free(overlap);
}
Overlapped.Free(overlap);
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public override int GetRecordSize(ref Key key, ref Value value)
/// </summary>
public override void Dispose()
{
base.Dispose();

if (values != null)
{
for (int i = 0; i < values.Length; i++)
Expand All @@ -99,7 +101,6 @@ public override void Dispose()
handles = null;
pointers = null;
values = null;
base.Dispose();
}

public override AddressInfo* GetKeyAddressInfo(long physicalAddress)
Expand Down
74 changes: 46 additions & 28 deletions cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,38 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress,
IOCompletionCallback callback,
IAsyncResult asyncResult)
{
var logHandle = GetOrAddHandle(segmentId);

Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
ovNative->OffsetLow = unchecked((int)((ulong)sourceAddress & 0xFFFFFFFF));
ovNative->OffsetHigh = unchecked((int)(((ulong)sourceAddress >> 32) & 0xFFFFFFFF));

bool result = Native32.ReadFile(logHandle,
destinationAddress,
readLength,
out uint bytesRead,
ovNative);

if (!result)
try
{
int error = Marshal.GetLastWin32Error();
if (error != Native32.ERROR_IO_PENDING)
var logHandle = GetOrAddHandle(segmentId);

bool result = Native32.ReadFile(logHandle,
destinationAddress,
readLength,
out uint bytesRead,
ovNative);

if (!result)
{
Overlapped.Unpack(ovNative);
Overlapped.Free(ovNative);
throw new Exception("Error reading from log file: " + error);
int error = Marshal.GetLastWin32Error();
if (error != Native32.ERROR_IO_PENDING)
{
throw new IOException("Error reading from log file", error);
}
}
}
catch (IOException e)
{
callback((uint)(e.HResult & 0x0000FFFF), 0, ovNative);
}
catch
{
callback(uint.MaxValue, 0, ovNative);
}
}

/// <summary>
Expand All @@ -136,29 +145,38 @@ public override unsafe void WriteAsync(IntPtr sourceAddress,
IOCompletionCallback callback,
IAsyncResult asyncResult)
{
var logHandle = GetOrAddHandle(segmentId);

Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
ovNative->OffsetLow = unchecked((int)(destinationAddress & 0xFFFFFFFF));
ovNative->OffsetHigh = unchecked((int)((destinationAddress >> 32) & 0xFFFFFFFF));

bool result = Native32.WriteFile(logHandle,
sourceAddress,
numBytesToWrite,
out uint bytesWritten,
ovNative);

if (!result)
try
{
int error = Marshal.GetLastWin32Error();
if (error != Native32.ERROR_IO_PENDING)
var logHandle = GetOrAddHandle(segmentId);

bool result = Native32.WriteFile(logHandle,
sourceAddress,
numBytesToWrite,
out uint bytesWritten,
ovNative);

if (!result)
{
Overlapped.Unpack(ovNative);
Overlapped.Free(ovNative);
throw new Exception("Error writing to log file: " + error);
int error = Marshal.GetLastWin32Error();
if (error != Native32.ERROR_IO_PENDING)
{
throw new IOException("Error writing to log file", error);
}
}
}
catch (IOException e)
{
callback((uint)(e.HResult & 0x0000FFFF), 0, ovNative);
}
catch
{
callback(uint.MaxValue, 0, ovNative);
}
}

/// <summary>
Expand Down
Loading

0 comments on commit c4fba23

Please sign in to comment.