diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index d5c277f7e..1ad5027ba 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -490,7 +490,7 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer this.epoch = epoch; settings.LogDevice.Initialize(1L << settings.SegmentSizeBits, epoch); - settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits, epoch); + settings.ObjectLogDevice?.Initialize(-1, epoch); // Page size LogPageSizeBits = settings.PageSizeBits; @@ -1257,6 +1257,7 @@ private void AsyncReadPagesFromDevice( var asyncResult = new PageAsyncReadResult() { page = readPage, + offset = devicePageOffset, context = context, handle = completed, maxPtr = PageSize diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index ed9f72f86..97b53e27c 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -65,6 +65,8 @@ public GenericAllocator(LogSettings settings, SerializerSettings ser { if (objectLogDevice == null) throw new FasterException("Objects in key/value, but object log not provided during creation of FASTER instance"); + if (objectLogDevice.SegmentSize != -1) + throw new FasterException("Object log device should not have fixed segment size. Set preallocateFile to false when calling CreateLogDevice for object log"); } } @@ -405,6 +407,9 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres if (ValueHasObjects()) valueSerializer.BeginSerialize(ms); + // Reset address list for next chunk + addr = new List(); + objlogDevice.WriteAsync( (IntPtr)_objBuffer.aligned_pointer, (int)(alignedDestinationAddress >> LogSegmentSizeBits), @@ -567,7 +572,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num // Request objects from objlog result.objlogDevice.ReadAsync( - (int)(result.page >> (LogSegmentSizeBits - LogPageSizeBits)), + (int)((result.page - result.offset) >> (LogSegmentSizeBits - LogPageSizeBits)), (ulong)startptr, (IntPtr)objBuffer.aligned_pointer, (uint)alignedLength, AsyncReadPageWithObjectsCallback, result); } diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 50cc427cc..3dc64b4b9 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -263,13 +263,17 @@ public bool CompletePending(bool spinWait = false, bool spinWaitForCommit = fals if (spinWaitForCommit) { if (spinWait != true) + { + if (supportAsync) UnsafeSuspendThread(); throw new FasterException("Can spin-wait for checkpoint completion only if spinWait is true"); + } do { fht.InternalCompletePending(ctx, spinWait); if (fht.InRestPhase()) { fht.InternalCompletePending(ctx, spinWait); + if (supportAsync) UnsafeSuspendThread(); return true; } } while (spinWait); diff --git a/cs/src/core/Device/IDevice.cs b/cs/src/core/Device/IDevice.cs index 8833a71b1..8508f3a7c 100644 --- a/cs/src/core/Device/IDevice.cs +++ b/cs/src/core/Device/IDevice.cs @@ -48,7 +48,7 @@ public interface IDevice /// /// Initialize device. This function is used to pass optional information that may only be known after /// FASTER initialization (whose constructor takes in IDevice upfront). Implementation are free to ignore - /// information if it does not need the supplied information. + /// information if it does not need the supplied information. Segment size of -1 is used for object log. /// /// This is a bit of a hack. /// diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index 45182db98..93fdc286f 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -285,7 +285,7 @@ private SafeFileHandle CreateHandle(int segmentId) throw new IOException($"Error creating log file for {GetSegmentName(segmentId)}, error: {error}", Native32.MakeHRFromErrorCode(error)); } - if (preallocateFile) + if (preallocateFile && segmentSize != -1) SetFileSize(FileName, logHandle, segmentSize); try diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 82c6a850a..6f324f416 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -321,7 +321,7 @@ private Stream CreateWriteHandle(int segmentId) #endif if (preallocateFile && segmentSize != -1) - SetFileSize(logWriteHandle, segmentSize); + SetFileSize(logWriteHandle, segmentSize); return logWriteHandle; } diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs index 102ec49d3..7ec367f65 100644 --- a/cs/src/core/Device/StorageDeviceBase.cs +++ b/cs/src/core/Device/StorageDeviceBase.cs @@ -94,7 +94,8 @@ public StorageDeviceBase(string filename, uint sectorSize, long capacity) /// public virtual void Initialize(long segmentSize, LightEpoch epoch = null) { - Debug.Assert(Capacity == -1 || Capacity % segmentSize == 0, "capacity must be a multiple of segment sizes"); + if (segmentSize != -1) + Debug.Assert(Capacity == -1 || Capacity % segmentSize == 0, "capacity must be a multiple of segment sizes"); this.segmentSize = segmentSize; this.epoch = epoch; if (!Utility.IsPowerOfTwo(segmentSize)) diff --git a/cs/src/core/Index/Recovery/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs index 70024d3ae..43d3c0630 100644 --- a/cs/src/core/Index/Recovery/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -262,7 +262,7 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta _hybridLogCheckpoint.snapshotFileDevice = checkpointManager.GetSnapshotLogDevice(_hybridLogCheckpointToken); _hybridLogCheckpoint.snapshotFileObjectLogDevice = checkpointManager.GetSnapshotObjectLogDevice(_hybridLogCheckpointToken); _hybridLogCheckpoint.snapshotFileDevice.Initialize(hlog.GetSegmentSize()); - _hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(hlog.GetSegmentSize()); + _hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(-1); long startPage = hlog.GetPage(_hybridLogCheckpoint.info.flushedLogicalAddress); long endPage = hlog.GetPage(_hybridLogCheckpoint.info.finalLogicalAddress); diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index e11de7374..8ce772292 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -222,7 +222,7 @@ private void RecoverHybridLogFromSnapshotFile( var recoveryDevice = checkpointManager.GetSnapshotLogDevice(recoveryInfo.guid); var objectLogRecoveryDevice = checkpointManager.GetSnapshotObjectLogDevice(recoveryInfo.guid); recoveryDevice.Initialize(hlog.GetSegmentSize()); - objectLogRecoveryDevice.Initialize(hlog.GetSegmentSize()); + objectLogRecoveryDevice.Initialize(-1); var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress) { recoveryDevice = recoveryDevice, diff --git a/cs/src/core/Utilities/PageAsyncResultTypes.cs b/cs/src/core/Utilities/PageAsyncResultTypes.cs index fbb01aca5..94d515a26 100644 --- a/cs/src/core/Utilities/PageAsyncResultTypes.cs +++ b/cs/src/core/Utilities/PageAsyncResultTypes.cs @@ -15,6 +15,7 @@ namespace FASTER.core public class PageAsyncReadResult : IAsyncResult { internal long page; + internal long offset; internal TContext context; internal CountdownEvent handle; internal SectorAlignedMemory freeBuffer1;