Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing statics from codebase #117

Merged
merged 17 commits into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public unsafe void Run()


idx_ = 0;
store.DumpDistribution();
Console.WriteLine(store.DumpDistribution());

Console.WriteLine("Executing experiment.");

Expand Down
2 changes: 0 additions & 2 deletions cs/playground/SumStore/RecoveryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ class RecoveryTest
const long completePendingInterval = 1 << 12;
const int checkpointInterval = 10 * 1000;
readonly int threadCount;
readonly int numActiveThreads;
FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> fht;

BlockingCollection<Input[]> inputArrays;

public RecoveryTest(int threadCount)
{
this.threadCount = threadCount;
numActiveThreads = 0;

// Create FASTER index
var log = Devices.CreateLogDevice("logs\\hlog");
Expand Down
81 changes: 63 additions & 18 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <summary>
/// Epoch information
/// </summary>
protected LightEpoch epoch;
protected readonly LightEpoch epoch;
private readonly bool ownedEpoch;

/// <summary>
/// Comparer
Expand Down Expand Up @@ -201,13 +202,13 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <summary>
/// Number of pending reads
/// </summary>
private static int numPendingReads = 0;
private int numPendingReads = 0;
#endregion

/// <summary>
/// Read buffer pool
/// Buffer pool
/// </summary>
protected SectorAlignedBufferPool readBufferPool;
protected SectorAlignedBufferPool bufferPool;

/// <summary>
/// Read cache
Expand Down Expand Up @@ -415,8 +416,9 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <param name="settings"></param>
/// <param name="comparer"></param>
/// <param name="evictCallback"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback)
: this(settings, comparer)
/// <param name="epoch"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback, LightEpoch epoch)
: this(settings, comparer, epoch)
{
if (evictCallback != null)
{
Expand All @@ -430,9 +432,18 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
/// </summary>
/// <param name="settings"></param>
/// <param name="comparer"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer)
/// <param name="epoch"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, LightEpoch epoch)
{
this.comparer = comparer;
if (epoch == null)
{
this.epoch = new LightEpoch();
ownedEpoch = true;
}
else
this.epoch = epoch;

settings.LogDevice.Initialize(1L << settings.SegmentSizeBits);
settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits);

Expand Down Expand Up @@ -481,7 +492,7 @@ protected void Initialize(long firstValidAddress)
Debug.Assert(firstValidAddress <= PageSize);
Debug.Assert(PageSize >= GetRecordSize(0));

readBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize);
bufferPool = new SectorAlignedBufferPool(1, sectorSize);

long tailPage = firstValidAddress >> LogPageSizeBits;
int tailPageIndex = (int)(tailPage % BufferSize);
Expand All @@ -504,6 +515,24 @@ protected void Initialize(long firstValidAddress)
TailPageIndex = 0;
}

/// <summary>
/// Acquire thread
/// </summary>
public void Acquire()
{
if (ownedEpoch)
epoch.Acquire();
}

/// <summary>
/// Release thread
/// </summary>
public void Release()
{
if (ownedEpoch)
epoch.Release();
}

/// <summary>
/// Dispose allocator
/// </summary>
Expand All @@ -519,6 +548,10 @@ public virtual void Dispose()
SafeHeadAddress = 0;
HeadAddress = 0;
BeginAddress = 1;

if (ownedEpoch)
epoch.Dispose();
bufferPool.Free();
}

/// <summary>
Expand Down Expand Up @@ -1120,7 +1153,7 @@ public void RecoveryReset(long tailAddress, long headAddress)
uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset);
alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1));

var record = readBufferPool.Get((int)alignedReadLength);
var record = bufferPool.Get((int)alignedReadLength);
record.valid_offset = (int)(fileOffset - alignedFileOffset);
record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset));
record.required_bytes = numBytes;
Expand All @@ -1141,6 +1174,7 @@ public void RecoveryReset(long tailAddress, long headAddress)
/// <typeparam name="TContext"></typeparam>
/// <param name="readPageStart"></param>
/// <param name="numPages"></param>
/// <param name="untilAddress"></param>
/// <param name="callback"></param>
/// <param name="context"></param>
/// <param name="devicePageOffset"></param>
Expand All @@ -1149,12 +1183,13 @@ public void RecoveryReset(long tailAddress, long headAddress)
public void AsyncReadPagesFromDevice<TContext>(
long readPageStart,
int numPages,
long untilAddress,
IOCompletionCallback callback,
TContext context,
long devicePageOffset = 0,
IDevice logDevice = null, IDevice objectLogDevice = null)
{
AsyncReadPagesFromDevice(readPageStart, numPages, callback, context,
AsyncReadPagesFromDevice(readPageStart, numPages, untilAddress, callback, context,
out CountdownEvent completed, devicePageOffset, logDevice, objectLogDevice);
}

Expand All @@ -1164,6 +1199,7 @@ public void AsyncReadPagesFromDevice<TContext>(
/// <typeparam name="TContext"></typeparam>
/// <param name="readPageStart"></param>
/// <param name="numPages"></param>
/// <param name="untilAddress"></param>
/// <param name="callback"></param>
/// <param name="context"></param>
/// <param name="completed"></param>
Expand All @@ -1173,6 +1209,7 @@ public void AsyncReadPagesFromDevice<TContext>(
private void AsyncReadPagesFromDevice<TContext>(
long readPageStart,
int numPages,
long untilAddress,
IOCompletionCallback callback,
TContext context,
out CountdownEvent completed,
Expand Down Expand Up @@ -1205,15 +1242,24 @@ private void AsyncReadPagesFromDevice<TContext>(
page = readPage,
context = context,
handle = completed,
count = 1
maxPtr = PageSize
};

ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage);
uint readLength = (uint)AlignedPageSizeBytes;
long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask));

if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize))
{
readLength = (uint)(adjustedUntilAddress - (long)offsetInFile);
asyncResult.maxPtr = readLength;
readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1));
}

if (device != null)
offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset));

ReadAsync(offsetInFile, pageIndex, (uint)PageSize, callback, asyncResult, usedDevice, usedObjlogDevice);
ReadAsync(offsetInFile, pageIndex, readLength, callback, asyncResult, usedDevice, usedObjlogDevice);
}
}

Expand Down Expand Up @@ -1361,14 +1407,13 @@ public void AsyncGetFromDisk(long fromLogical,
AsyncIOContext<Key, Value> context,
SectorAlignedMemory result = default(SectorAlignedMemory))
{
while (numPendingReads > 120)
if (epoch.IsProtected()) // Do not spin for unprotected IO threads
{
Thread.SpinWait(100);

// Do not protect if we are not already protected
// E.g., we are in an IO thread
if (epoch.IsProtected())
while (numPendingReads > 120)
{
Thread.Yield();
epoch.ProtectAndDrain();
}
}
Interlocked.Increment(ref numPendingReads);

Expand Down
20 changes: 14 additions & 6 deletions cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ public unsafe sealed class BlittableAllocator<Key, Value> : AllocatorBase<Key, V
private static readonly int keySize = Utility.GetSize(default(Key));
private static readonly int valueSize = Utility.GetSize(default(Value));

public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null)
: base(settings, comparer, evictCallback)
public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null)
: base(settings, comparer, evictCallback, epoch)
{
values = new byte[BufferSize][];
handles = new GCHandle[BufferSize];
pointers = new long[BufferSize];

epoch = LightEpoch.Instance;

ptrHandle = GCHandle.Alloc(pointers, GCHandleType.Pinned);
nativePointers = (long*)ptrHandle.AddrOfPinnedObject();
}
Expand Down Expand Up @@ -333,6 +331,7 @@ public override IFasterScanIterator<Key, Value> Scan(long beginAddress, long end
/// <typeparam name="TContext"></typeparam>
/// <param name="readPageStart"></param>
/// <param name="numPages"></param>
/// <param name="untilAddress"></param>
/// <param name="callback"></param>
/// <param name="context"></param>
/// <param name="frame"></param>
Expand All @@ -343,6 +342,7 @@ public override IFasterScanIterator<Key, Value> Scan(long beginAddress, long end
internal void AsyncReadPagesFromDeviceToFrame<TContext>(
long readPageStart,
int numPages,
long untilAddress,
IOCompletionCallback callback,
TContext context,
BlittableFrame frame,
Expand Down Expand Up @@ -375,16 +375,24 @@ internal void AsyncReadPagesFromDeviceToFrame<TContext>(
page = readPage,
context = context,
handle = completed,
count = 1,
frame = frame
};

ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage);

uint readLength = (uint)AlignedPageSizeBytes;
long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask));

if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize))
{
readLength = (uint)(adjustedUntilAddress - (long)offsetInFile);
readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1));
}

if (device != null)
offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset));

usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], (uint)AlignedPageSizeBytes, callback, asyncResult);
usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], readLength, callback, asyncResult);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/Allocator/BlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public unsafe BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long be
var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize;
hlog.AsyncReadPagesFromDeviceToFrame
(nextAddress >> hlog.LogPageSizeBits,
1, AsyncReadPagesCallback, Empty.Default,
1, endAddress, AsyncReadPagesCallback, Empty.Default,
frame, out loaded[frameNumber]);
}
}
Expand Down Expand Up @@ -160,7 +160,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu
{
if (!first)
{
hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
}
}
else
Expand All @@ -169,7 +169,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu
if ((endPage > currentPage) &&
((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0)))
{
hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
}
}
first = false;
Expand Down
Loading