Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing statics from codebase #117

Merged
merged 17 commits into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public unsafe void Run()


idx_ = 0;
store.DumpDistribution();
Console.WriteLine(store.DumpDistribution());

Console.WriteLine("Executing experiment.");

Expand Down
2 changes: 0 additions & 2 deletions cs/playground/SumStore/RecoveryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ class RecoveryTest
const long completePendingInterval = 1 << 12;
const int checkpointInterval = 10 * 1000;
readonly int threadCount;
readonly int numActiveThreads;
FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> fht;

BlockingCollection<Input[]> inputArrays;

public RecoveryTest(int threadCount)
{
this.threadCount = threadCount;
numActiveThreads = 0;

// Create FASTER index
var log = Devices.CreateLogDevice("logs\\hlog");
Expand Down
51 changes: 42 additions & 9 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <summary>
/// Epoch information
/// </summary>
protected LightEpoch epoch;
protected readonly LightEpoch epoch;
private readonly bool ownedEpoch;

/// <summary>
/// Comparer
Expand Down Expand Up @@ -201,13 +202,13 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <summary>
/// Number of pending reads
/// </summary>
private static int numPendingReads = 0;
private int numPendingReads = 0;
#endregion

/// <summary>
/// Read buffer pool
/// Buffer pool
/// </summary>
protected SectorAlignedBufferPool readBufferPool;
protected SectorAlignedBufferPool bufferPool;

/// <summary>
/// Read cache
Expand Down Expand Up @@ -415,8 +416,9 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <param name="settings"></param>
/// <param name="comparer"></param>
/// <param name="evictCallback"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback)
: this(settings, comparer)
/// <param name="epoch"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback, LightEpoch epoch)
: this(settings, comparer, epoch)
{
if (evictCallback != null)
{
Expand All @@ -430,9 +432,18 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
/// </summary>
/// <param name="settings"></param>
/// <param name="comparer"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer)
/// <param name="epoch"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, LightEpoch epoch)
{
this.comparer = comparer;
if (epoch == null)
{
this.epoch = new LightEpoch();
ownedEpoch = true;
}
else
this.epoch = epoch;

settings.LogDevice.Initialize(1L << settings.SegmentSizeBits);
settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits);

Expand Down Expand Up @@ -481,7 +492,7 @@ protected void Initialize(long firstValidAddress)
Debug.Assert(firstValidAddress <= PageSize);
Debug.Assert(PageSize >= GetRecordSize(0));

readBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize);
bufferPool = new SectorAlignedBufferPool(1, sectorSize);

long tailPage = firstValidAddress >> LogPageSizeBits;
int tailPageIndex = (int)(tailPage % BufferSize);
Expand All @@ -504,6 +515,24 @@ protected void Initialize(long firstValidAddress)
TailPageIndex = 0;
}

/// <summary>
/// Acquire thread
/// </summary>
public void Acquire()
{
if (ownedEpoch)
epoch.Acquire();
}

/// <summary>
/// Release thread
/// </summary>
public void Release()
{
if (ownedEpoch)
epoch.Release();
}

/// <summary>
/// Dispose allocator
/// </summary>
Expand All @@ -519,6 +548,10 @@ public virtual void Dispose()
SafeHeadAddress = 0;
HeadAddress = 0;
BeginAddress = 1;

if (ownedEpoch)
epoch.Dispose();
bufferPool.Free();
}

/// <summary>
Expand Down Expand Up @@ -1120,7 +1153,7 @@ public void RecoveryReset(long tailAddress, long headAddress)
uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset);
alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1));

var record = readBufferPool.Get((int)alignedReadLength);
var record = bufferPool.Get((int)alignedReadLength);
record.valid_offset = (int)(fileOffset - alignedFileOffset);
record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset));
record.required_bytes = numBytes;
Expand Down
6 changes: 2 additions & 4 deletions cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ public unsafe sealed class BlittableAllocator<Key, Value> : AllocatorBase<Key, V
private static readonly int keySize = Utility.GetSize(default(Key));
private static readonly int valueSize = Utility.GetSize(default(Value));

public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null)
: base(settings, comparer, evictCallback)
public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null)
: base(settings, comparer, evictCallback, epoch)
{
values = new byte[BufferSize][];
handles = new GCHandle[BufferSize];
pointers = new long[BufferSize];

epoch = LightEpoch.Instance;

ptrHandle = GCHandle.Alloc(pointers, GCHandleType.Pinned);
nativePointers = (long*)ptrHandle.AddrOfPinnedObject();
}
Expand Down
108 changes: 20 additions & 88 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ public unsafe sealed class GenericAllocator<Key, Value> : AllocatorBase<Key, Val
private const int ObjectBlockSize = 100 * (1 << 20);
// Tail offsets per segment, in object log
public readonly long[] segmentOffsets;
// Buffer pool for object log related work
SectorAlignedBufferPool ioBufferPool;
// Record sizes
private static readonly int recordSize = Utility.GetSize(default(Record<Key, Value>));
private readonly SerializerSettings<Key, Value> SerializerSettings;

public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> serializerSettings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null)
: base(settings, comparer, evictCallback)
public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> serializerSettings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null)
: base(settings, comparer, evictCallback, epoch)
{
SerializerSettings = serializerSettings;

Expand All @@ -66,9 +64,6 @@ public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> ser
if (objectLogDevice == null)
throw new Exception("Objects in key/value, but object log not provided during creation of FASTER instance");
}

epoch = LightEpoch.Instance;
ioBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize);
}

public override void Initialize()
Expand Down Expand Up @@ -295,7 +290,7 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
if (localSegmentOffsets == null) localSegmentOffsets = segmentOffsets;

var src = values[flushPage % BufferSize];
var buffer = ioBufferPool.Get((int)numBytesToWrite);
var buffer = bufferPool.Get((int)numBytesToWrite);

if (aligned_start < start && (KeyHasObjects() || ValueHasObjects()))
{
Expand Down Expand Up @@ -373,11 +368,23 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres

if (ms.Position > ObjectBlockSize || i == (end / recordSize) - 1)
{
if (KeyHasObjects())
keySerializer.EndSerialize();
if (ValueHasObjects())
valueSerializer.EndSerialize();
var _s = ms.ToArray();
ms.Close();
ms = new MemoryStream();

var _objBuffer = ioBufferPool.Get(_s.Length);
if (i < (end / recordSize) - 1)
{
ms = new MemoryStream();
if (KeyHasObjects())
keySerializer.BeginSerialize(ms);
if (ValueHasObjects())
valueSerializer.BeginSerialize(ms);
}

var _objBuffer = bufferPool.Get(_s.Length);

asyncResult.done = new AutoResetEvent(false);

Expand Down Expand Up @@ -414,14 +421,6 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
}
}
}
if (KeyHasObjects())
{
keySerializer.EndSerialize();
}
if (ValueHasObjects())
{
valueSerializer.EndSerialize();
}

if (asyncResult.partial)
{
Expand Down Expand Up @@ -456,7 +455,7 @@ protected override void ReadAsync<TContext>(
ulong alignedSourceAddress, int destinationPageIndex, uint aligned_read_length,
IOCompletionCallback callback, PageAsyncReadResult<TContext> asyncResult, IDevice device, IDevice objlogDevice)
{
asyncResult.freeBuffer1 = readBufferPool.Get((int)aligned_read_length);
asyncResult.freeBuffer1 = bufferPool.Get((int)aligned_read_length);
asyncResult.freeBuffer1.required_bytes = (int)aligned_read_length;

if (!(KeyHasObjects() || ValueHasObjects()))
Expand Down Expand Up @@ -581,7 +580,7 @@ private void AsyncReadPageWithObjectsCallback<TContext>(uint errorCode, uint num
if (size > int.MaxValue)
throw new Exception("Unable to read object page, total size greater than 2GB: " + size);

var objBuffer = ioBufferPool.Get((int)size);
var objBuffer = bufferPool.Get((int)size);
result.freeBuffer2 = objBuffer;
var alignedLength = (size + (sectorSize - 1)) & ~(sectorSize - 1);

Expand Down Expand Up @@ -609,7 +608,7 @@ private void AsyncReadPageWithObjectsCallback<TContext>(uint errorCode, uint num
uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset);
alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1));

var record = readBufferPool.Get((int)alignedReadLength);
var record = bufferPool.Get((int)alignedReadLength);
record.valid_offset = (int)(fileOffset - alignedFileOffset);
record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset));
record.required_bytes = numBytes;
Expand Down Expand Up @@ -758,73 +757,6 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record<Key, Value>[]
}
}

/// <summary>
/// Serialize part of page to stream
/// </summary>
/// <param name="ptr">From pointer</param>
/// <param name="untilptr">Until pointer</param>
/// <param name="stream">Stream</param>
/// <param name="objectBlockSize">Size of blocks to serialize in chunks of</param>
/// <param name="addr">List of addresses that need to be updated with offsets</param>
public void Serialize(ref long ptr, long untilptr, Stream stream, int objectBlockSize, out List<long> addr)
{
IObjectSerializer<Key> keySerializer = null;
IObjectSerializer<Value> valueSerializer = null;

if (KeyHasObjects())
{
keySerializer = SerializerSettings.keySerializer();
keySerializer.BeginSerialize(stream);
}
if (ValueHasObjects())
{
valueSerializer = SerializerSettings.valueSerializer();
valueSerializer.BeginSerialize(stream);
}

addr = new List<long>();
while (ptr < untilptr)
{
if (!GetInfo(ptr).Invalid)
{
long pos = stream.Position;

if (KeyHasObjects())
{
keySerializer.Serialize(ref GetKey(ptr));
var key_address = GetKeyAddressInfo(ptr);
key_address->Address = pos;
key_address->Size = (int)(stream.Position - pos);
addr.Add((long)key_address);
}

if (ValueHasObjects() && !GetInfo(ptr).Tombstone)
{
pos = stream.Position;
var value_address = GetValueAddressInfo(ptr);
valueSerializer.Serialize(ref GetValue(ptr));
value_address->Address = pos;
value_address->Size = (int)(stream.Position - pos);
addr.Add((long)value_address);
}

}
ptr += GetRecordSize(ptr);

if (stream.Position > objectBlockSize)
break;
}

if (KeyHasObjects())
{
keySerializer.EndSerialize();
}
if (ValueHasObjects())
{
valueSerializer.EndSerialize();
}
}

/// <summary>
/// Get location and range of object log addresses for specified log page
/// </summary>
Expand Down
Loading