Skip to content

Commit

Permalink
[C#] v2 pending flush list (#596)
Browse files Browse the repository at this point in the history
* Make PendingFlushList never error out in stressful situations. Current implementation uses lock for ~5-10% perf degradation over prior structure.

* nit
  • Loading branch information
badrishc authored Nov 16, 2021
1 parent c7e584c commit 84d2724
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 44 deletions.
20 changes: 5 additions & 15 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1752,22 +1752,12 @@ public void AsyncFlushPages(long fromAddress, long untilAddress)
}

// Enqueue work in shared queue
if (PendingFlush[index].Add(asyncResult))
{
// Perform work from shared queue if possible
if (PendingFlush[index].RemoveNextAdjacent(FlushedUntilAddress, out PageAsyncFlushResult<Empty> request))
{
WriteAsync(request.fromAddress >> LogPageSizeBits, AsyncFlushPageCallback, request);
}
}
else
PendingFlush[index].Add(asyncResult);

// Perform work from shared queue if possible
if (PendingFlush[index].RemoveNextAdjacent(FlushedUntilAddress, out PageAsyncFlushResult<Empty> request))
{
// Because we are invoking the callback away from the usual codepath, need to externally
// ensure that flush address are updated in order
while (FlushedUntilAddress < asyncResult.fromAddress) Thread.Yield();
// Could not add to pending flush list, treat as a failed write
// Use a special errorCode to convey that this is not from a syscall
AsyncFlushPageCallback(16000, 0, asyncResult);
WriteAsync(request.fromAddress >> LogPageSizeBits, AsyncFlushPageCallback, request);
}
}
else
Expand Down
49 changes: 20 additions & 29 deletions cs/src/core/Allocator/PendingFlushList.cs
Original file line number Diff line number Diff line change
@@ -1,54 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Threading;
using System.Collections.Generic;

namespace FASTER.core
{
class PendingFlushList
sealed class PendingFlushList
{
const int maxSize = 8;
const int maxRetries = 10;
public PageAsyncFlushResult<Empty>[] list;
public readonly LinkedList<PageAsyncFlushResult<Empty>> list;

public PendingFlushList()
{
list = new PageAsyncFlushResult<Empty>[maxSize];
list = new();
}

public bool Add(PageAsyncFlushResult<Empty> t)
public void Add(PageAsyncFlushResult<Empty> t)
{
int retries = 0;
do
lock (list)
{
for (int i = 0; i < maxSize; i++)
{
if (list[i] == default)
{
if (Interlocked.CompareExchange(ref list[i], t, default) == default)
{
return true;
}
}
}
} while (retries++ < maxRetries);
return false;
list.AddFirst(t);
}
}

/// <summary>
/// Remove item from flush list with from-address equal to the specified address
/// </summary>
public bool RemoveNextAdjacent(long address, out PageAsyncFlushResult<Empty> request)
{
for (int i=0; i<maxSize; i++)
lock (list)
{
request = list[i];
if (request?.fromAddress == address)
for (var it = list.First; it != null;)
{
if (Interlocked.CompareExchange(ref list[i], null, request) == request)
request = it.Value;
if (request.fromAddress == address)
{
list.Remove(it);
return true;
}
it = it.Next;
}
}
request = null;
Expand All @@ -60,15 +49,17 @@ public bool RemoveNextAdjacent(long address, out PageAsyncFlushResult<Empty> req
/// </summary>
public bool RemovePreviousAdjacent(long address, out PageAsyncFlushResult<Empty> request)
{
for (int i = 0; i < maxSize; i++)
lock (list)
{
request = list[i];
if (request?.untilAddress == address)
for (var it = list.First; it != null;)
{
if (Interlocked.CompareExchange(ref list[i], null, request) == request)
request = it.Value;
if (request.untilAddress == address)
{
list.Remove(it);
return true;
}
it = it.Next;
}
}
request = null;
Expand Down

0 comments on commit 84d2724

Please sign in to comment.