From 0ae0568dcda949780562488181faeedff1ee7699 Mon Sep 17 00:00:00 2001
From: Masashi Ito <59102452+mito-csod@users.noreply.github.com>
Date: Fri, 20 Aug 2021 16:54:48 -0700
Subject: [PATCH] MemoryPool-based override for FasterLog.ReadAsync() (#549)
---
cs/src/core/Index/FasterLog/FasterLog.cs | 59 ++++++++++++++++++++++++
cs/test/LogReadAsyncTests.cs | 11 +++++
2 files changed, 70 insertions(+)
diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs
index 78dbb2a6c..c4b559542 100644
--- a/cs/src/core/Index/FasterLog/FasterLog.cs
+++ b/cs/src/core/Index/FasterLog/FasterLog.cs
@@ -4,6 +4,7 @@
#pragma warning disable 0162
using System;
+using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
@@ -843,6 +844,37 @@ public FasterLogScanIterator Scan(long beginAddress, long endAddress, string nam
return GetRecordAndFree(ctx.record);
}
+ ///
+ /// Random read record from log as IMemoryOwner<byte>, at given address
+ ///
+ /// Logical address to read from
+ /// MemoryPool to rent the destination buffer from
+ /// Estimated length of entry, if known
+ /// Cancellation token
+ ///
+ public async ValueTask<(IMemoryOwner, int)> ReadAsync(long address, MemoryPool memoryPool, int estimatedLength = 0, CancellationToken token = default)
+ {
+ token.ThrowIfCancellationRequested();
+ epoch.Resume();
+ if (address >= CommittedUntilAddress || address < BeginAddress)
+ {
+ epoch.Suspend();
+ return default;
+ }
+ var ctx = new SimpleReadContext
+ {
+ logicalAddress = address,
+ completedRead = new SemaphoreSlim(0)
+ };
+ unsafe
+ {
+ allocator.AsyncReadRecordToMemory(address, headerSize + estimatedLength, AsyncGetFromDiskCallback, ref ctx);
+ }
+ epoch.Suspend();
+ await ctx.completedRead.WaitAsync(token).ConfigureAwait(false);
+ return GetRecordAsMemoryOwnerAndFree(ctx.record, memoryPool);
+ }
+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int Align(int length)
{
@@ -1197,6 +1229,33 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje
return (result, length);
}
+ private (IMemoryOwner, int) GetRecordAsMemoryOwnerAndFree(SectorAlignedMemory record, MemoryPool memoryPool)
+ {
+ if (record == null)
+ return (null, 0);
+
+ IMemoryOwner result;
+ int length;
+ unsafe
+ {
+ var ptr = record.GetValidPointer();
+ length = GetLength(ptr);
+ if (!VerifyChecksum(ptr, length))
+ {
+ throw new FasterException("Checksum failed for read");
+ }
+ result = memoryPool.Rent(length);
+
+ fixed (byte* bp = result.Memory.Span)
+ {
+ Buffer.MemoryCopy(ptr + headerSize, bp, length, length);
+ }
+ }
+
+ record.Return();
+ return (result, length);
+ }
+
private long CommitInternal(bool spinWait = false)
{
if (readOnlyMode)
diff --git a/cs/test/LogReadAsyncTests.cs b/cs/test/LogReadAsyncTests.cs
index 381bc2e15..0ce85cdbc 100644
--- a/cs/test/LogReadAsyncTests.cs
+++ b/cs/test/LogReadAsyncTests.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
+using System.Buffers;
using System.Threading;
using FASTER.core;
using NUnit.Framework;
@@ -121,6 +122,16 @@ record = log.ReadAsync(log.BeginAddress, 104, cts);
Assert.IsTrue(foundEntry == 1, $"Fail reading data - Found Normal Entry:{foundEntry} Expected Value: 1");
Assert.IsTrue(foundTotal == entryLength, $"Fail reading data - Found Total:{foundTotal} Expected Total: {entryLength}");
+ // Read one entry as IMemoryOwner and verify
+ var recordMemoryOwner = log.ReadAsync(log.BeginAddress, MemoryPool.Shared, 104, cts);
+ var foundFlaggedMem = recordMemoryOwner.Result.Item1.Memory.Span[0]; // 15
+ var foundEntryMem = recordMemoryOwner.Result.Item1.Memory.Span[1]; // 1
+ var foundTotalMem = recordMemoryOwner.Result.Item2;
+
+ Assert.IsTrue(foundFlagged == foundFlaggedMem, $"MemoryPool-based ReadAsync result does not match that of the byte array one. value: {foundFlaggedMem} expected: {foundFlagged}");
+ Assert.IsTrue(foundEntry == foundEntryMem, $"MemoryPool-based ReadAsync result does not match that of the byte array one. value: {foundEntryMem} expected: {foundEntry}");
+ Assert.IsTrue(foundTotal == foundTotalMem, $"MemoryPool-based ReadAsync result does not match that of the byte array one. value: {foundTotalMem} expected: {foundTotal}");
+
break;
default:
Assert.Fail("Unknown case ParameterDefaultsIteratorType.DefaultParams:");