Skip to content

Commit

Permalink
Fix complete-pending & segment sizes for object log (#228)
Browse files Browse the repository at this point in the history
* * Fix CompletePending to not hold epoch
* Fix object log initialization to set segment size to -1 so that device does not pre-allocate/truncate object log segments to a fixed size

* Reset address list when performing chunked writes of object log
  • Loading branch information
badrishc authored Jan 7, 2020
1 parent b8f3c3b commit 3988acd
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 8 deletions.
3 changes: 2 additions & 1 deletion cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer
this.epoch = epoch;

settings.LogDevice.Initialize(1L << settings.SegmentSizeBits, epoch);
settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits, epoch);
settings.ObjectLogDevice?.Initialize(-1, epoch);

// Page size
LogPageSizeBits = settings.PageSizeBits;
Expand Down Expand Up @@ -1257,6 +1257,7 @@ private void AsyncReadPagesFromDevice<TContext>(
var asyncResult = new PageAsyncReadResult<TContext>()
{
page = readPage,
offset = devicePageOffset,
context = context,
handle = completed,
maxPtr = PageSize
Expand Down
7 changes: 6 additions & 1 deletion cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> ser
{
if (objectLogDevice == null)
throw new FasterException("Objects in key/value, but object log not provided during creation of FASTER instance");
if (objectLogDevice.SegmentSize != -1)
throw new FasterException("Object log device should not have fixed segment size. Set preallocateFile to false when calling CreateLogDevice for object log");
}
}

Expand Down Expand Up @@ -405,6 +407,9 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
if (ValueHasObjects())
valueSerializer.BeginSerialize(ms);

// Reset address list for next chunk
addr = new List<long>();

objlogDevice.WriteAsync(
(IntPtr)_objBuffer.aligned_pointer,
(int)(alignedDestinationAddress >> LogSegmentSizeBits),
Expand Down Expand Up @@ -567,7 +572,7 @@ private void AsyncReadPageWithObjectsCallback<TContext>(uint errorCode, uint num

// Request objects from objlog
result.objlogDevice.ReadAsync(
(int)(result.page >> (LogSegmentSizeBits - LogPageSizeBits)),
(int)((result.page - result.offset) >> (LogSegmentSizeBits - LogPageSizeBits)),
(ulong)startptr,
(IntPtr)objBuffer.aligned_pointer, (uint)alignedLength, AsyncReadPageWithObjectsCallback<TContext>, result);
}
Expand Down
4 changes: 4 additions & 0 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,17 @@ public bool CompletePending(bool spinWait = false, bool spinWaitForCommit = fals
if (spinWaitForCommit)
{
if (spinWait != true)
{
if (supportAsync) UnsafeSuspendThread();
throw new FasterException("Can spin-wait for checkpoint completion only if spinWait is true");
}
do
{
fht.InternalCompletePending(ctx, spinWait);
if (fht.InRestPhase())
{
fht.InternalCompletePending(ctx, spinWait);
if (supportAsync) UnsafeSuspendThread();
return true;
}
} while (spinWait);
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Device/IDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public interface IDevice
/// <summary>
/// Initialize device. This function is used to pass optional information that may only be known after
/// FASTER initialization (whose constructor takes in IDevice upfront). Implementation are free to ignore
/// information if it does not need the supplied information.
/// information if it does not need the supplied information. Segment size of -1 is used for object log.
///
/// This is a bit of a hack.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private SafeFileHandle CreateHandle(int segmentId)
throw new IOException($"Error creating log file for {GetSegmentName(segmentId)}, error: {error}", Native32.MakeHRFromErrorCode(error));
}

if (preallocateFile)
if (preallocateFile && segmentSize != -1)
SetFileSize(FileName, logHandle, segmentSize);

try
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Device/ManagedLocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private Stream CreateWriteHandle(int segmentId)
#endif

if (preallocateFile && segmentSize != -1)
SetFileSize(logWriteHandle, segmentSize);
SetFileSize(logWriteHandle, segmentSize);

return logWriteHandle;
}
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Device/StorageDeviceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public StorageDeviceBase(string filename, uint sectorSize, long capacity)
/// <param name="epoch"></param>
public virtual void Initialize(long segmentSize, LightEpoch epoch = null)
{
Debug.Assert(Capacity == -1 || Capacity % segmentSize == 0, "capacity must be a multiple of segment sizes");
if (segmentSize != -1)
Debug.Assert(Capacity == -1 || Capacity % segmentSize == 0, "capacity must be a multiple of segment sizes");
this.segmentSize = segmentSize;
this.epoch = epoch;
if (!Utility.IsPowerOfTwo(segmentSize))
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/Recovery/Checkpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta
_hybridLogCheckpoint.snapshotFileDevice = checkpointManager.GetSnapshotLogDevice(_hybridLogCheckpointToken);
_hybridLogCheckpoint.snapshotFileObjectLogDevice = checkpointManager.GetSnapshotObjectLogDevice(_hybridLogCheckpointToken);
_hybridLogCheckpoint.snapshotFileDevice.Initialize(hlog.GetSegmentSize());
_hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(hlog.GetSegmentSize());
_hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(-1);

long startPage = hlog.GetPage(_hybridLogCheckpoint.info.flushedLogicalAddress);
long endPage = hlog.GetPage(_hybridLogCheckpoint.info.finalLogicalAddress);
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private void RecoverHybridLogFromSnapshotFile(
var recoveryDevice = checkpointManager.GetSnapshotLogDevice(recoveryInfo.guid);
var objectLogRecoveryDevice = checkpointManager.GetSnapshotObjectLogDevice(recoveryInfo.guid);
recoveryDevice.Initialize(hlog.GetSegmentSize());
objectLogRecoveryDevice.Initialize(hlog.GetSegmentSize());
objectLogRecoveryDevice.Initialize(-1);
var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress)
{
recoveryDevice = recoveryDevice,
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Utilities/PageAsyncResultTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace FASTER.core
public class PageAsyncReadResult<TContext> : IAsyncResult
{
internal long page;
internal long offset;
internal TContext context;
internal CountdownEvent handle;
internal SectorAlignedMemory freeBuffer1;
Expand Down

0 comments on commit 3988acd

Please sign in to comment.