Skip to content

Commit

Permalink
Fix object serialization corner case when invalid record present in l…
Browse files Browse the repository at this point in the history
…og. (#697)
  • Loading branch information
badrishc authored Apr 19, 2022
1 parent 9994828 commit 691feba
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
}


long endPosition = 0;
for (int i=start/recordSize; i<end/recordSize; i++)
{
long endPosition = 0;
if (!src[i].info.Invalid)
{
if (KeyHasObjects())
Expand Down Expand Up @@ -449,27 +449,30 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
{
var memoryStreamActualLength = ms.Position;
var memoryStreamTotalLength = (int)endPosition;
endPosition = 0;

var _objBuffer = bufferPool.Get(memoryStreamTotalLength);

var _alignedLength = (memoryStreamTotalLength + (sectorSize - 1)) & ~(sectorSize - 1);

var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength;

if (KeyHasObjects())
keySerializer.EndSerialize();
if (ValueHasObjects())
valueSerializer.EndSerialize();

ms.Close();

fixed (void* src_ = ms.GetBuffer())
Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamTotalLength, memoryStreamActualLength);
SectorAlignedMemory _objBuffer = null;
var _alignedLength = (memoryStreamTotalLength + (sectorSize - 1)) & ~(sectorSize - 1);
var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength;

if (memoryStreamTotalLength > 0)
{
_objBuffer = bufferPool.Get(memoryStreamTotalLength);

fixed (void* src_ = ms.GetBuffer())
Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamTotalLength, memoryStreamActualLength);
}

foreach (var address in addr)
((AddressInfo*)address)->Address += _objAddr;


if (i < (end / recordSize) - 1)
{
ms = new MemoryStream();
Expand All @@ -483,6 +486,8 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres

asyncResult.done = new AutoResetEvent(false);

Debug.Assert(memoryStreamTotalLength > 0);

objlogDevice.WriteAsync(
(IntPtr)_objBuffer.aligned_pointer,
(int)(alignedDestinationAddress >> LogSegmentSizeBits),
Expand All @@ -494,14 +499,17 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
}
else
{
// need to write both page and object cache
Interlocked.Increment(ref asyncResult.count);

asyncResult.freeBuffer2 = _objBuffer;
objlogDevice.WriteAsync(
(IntPtr)_objBuffer.aligned_pointer,
(int)(alignedDestinationAddress >> LogSegmentSizeBits),
(ulong)_objAddr, (uint)_alignedLength, callback, asyncResult);
if (memoryStreamTotalLength > 0)
{
// need to write both page and object cache
Interlocked.Increment(ref asyncResult.count);

asyncResult.freeBuffer2 = _objBuffer;
objlogDevice.WriteAsync(
(IntPtr)_objBuffer.aligned_pointer,
(int)(alignedDestinationAddress >> LogSegmentSizeBits),
(ulong)_objAddr, (uint)_alignedLength, callback, asyncResult);
}
}
}
}
Expand Down

0 comments on commit 691feba

Please sign in to comment.