Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] minor cleanup of fasterlog v2 #611

Merged
merged 3 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cs/samples/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ static void ScanThread()
{
// For finite end address, check if iteration ended
// if (currentAddress >= endAddress) return;
iter.WaitAsync().GetAwaiter().GetResult();
iter.WaitAsync().AsTask().GetAwaiter().GetResult();
}

// Memory pool variant:
Expand Down Expand Up @@ -197,7 +197,7 @@ static void ReportThread()
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
Stopwatch sw = new();
sw.Start();

while (true)
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/StoreLogCompaction/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static void Main()

Console.WriteLine("Writing keys from 0 to {0} to FASTER", max);

Stopwatch sw = new Stopwatch();
Stopwatch sw = new();
sw.Start();
for (int i = 0; i < max; i++)
{
Expand All @@ -65,7 +65,7 @@ static void Main()
Console.WriteLine("Log tail address: {0}", h.Log.TailAddress);

// Issue mix of deletes and upserts
Random r = new Random(3);
Random r = new(3);

while (true)
{
Expand Down
34 changes: 17 additions & 17 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,29 @@ namespace FASTER.core
/// <summary>
/// FASTER log
/// </summary>
public sealed partial class FasterLog
public sealed class FasterLog : IDisposable
{
private readonly BlittableAllocator<Empty, byte> allocator;
private readonly LightEpoch epoch;
private readonly ILogCommitManager logCommitManager;
private readonly bool disposeLogCommitManager;
private readonly GetMemory getMemory;
private readonly int headerSize;
private readonly LogChecksumType logChecksum;
private readonly WorkQueueLIFO<CommitInfo> commitQueue;
readonly BlittableAllocator<Empty, byte> allocator;
readonly LightEpoch epoch;
readonly ILogCommitManager logCommitManager;
readonly bool disposeLogCommitManager;
readonly GetMemory getMemory;
readonly int headerSize;
readonly LogChecksumType logChecksum;
readonly WorkQueueLIFO<CommitInfo> commitQueue;

internal readonly bool readOnlyMode;
internal readonly bool fastCommitMode;

private TaskCompletionSource<LinkedCommitInfo> commitTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private TaskCompletionSource<Empty> refreshUncommittedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<LinkedCommitInfo> commitTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<Empty> refreshUncommittedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);

// Offsets for all currently unprocessed commit records
private readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests;
private readonly List<FasterLogRecoveryInfo> coveredCommits = new List<FasterLogRecoveryInfo>();
private long commitNum, commitCoveredAddress;
readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests;
readonly List<FasterLogRecoveryInfo> coveredCommits = new();
long commitNum, commitCoveredAddress;

private IFasterLogCommitPolicy commitPolicy;
readonly LogCommitPolicy commitPolicy;

/// <summary>
/// Beginning address of log
Expand All @@ -48,7 +48,7 @@ public sealed partial class FasterLog

// Here's a soft begin address that is observed by all access at the FasterLog level but not actually on the
// allocator. This is to make sure that any potential physical deletes only happen after commit.
private long beginAddress;
long beginAddress;

/// <summary>
/// Tail address of log
Expand Down Expand Up @@ -189,7 +189,7 @@ private FasterLog(FasterLogSettings logSettings, bool oldCommitManager)
fastCommitMode = logSettings.FastCommitMode;

ongoingCommitRequests = new Queue<(long, FasterLogRecoveryInfo)>();
commitPolicy = logSettings.CommitPolicy ?? new DefaultCommitPolicy();
commitPolicy = logSettings.LogCommitPolicy ?? LogCommitPolicy.Default();
badrishc marked this conversation as resolved.
Show resolved Hide resolved
commitPolicy.OnAttached(this);
}

Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FasterLog/FasterLogSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public class FasterLogSettings
public bool RemoveOutdatedCommitFiles = true;

/// <summary>
/// CommitPolicy that influences the behavior of Commit() calls, or null if default.
/// Log commit policy that influences the behavior of Commit() calls.
/// </summary>
public IFasterLogCommitPolicy CommitPolicy = null;
public LogCommitPolicy LogCommitPolicy = LogCommitPolicy.Default();

internal LogSettings GetLogSettings()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ namespace FASTER.core
/// FasterLogCommitPolicy defines the way FasterLog behaves on Commit(). In addition
/// to choosing from a set of pre-defined ones, users can implement their own for custom behavior
/// </summary>
public interface IFasterLogCommitPolicy
public abstract class LogCommitPolicy
{
/// <summary>
/// Invoked when strategy object is attached to a FasterLog instance.
/// </summary>
/// <param name="log"> the log this commit strategy is attached to </param>
public void OnAttached(FasterLog log);
public abstract void OnAttached(FasterLog log);

/// <summary>
/// Admission control to decide whether a call to Commit() should successfully start or not.
Expand All @@ -27,55 +27,85 @@ public interface IFasterLogCommitPolicy
/// <param name="currentTail"> if successful, this request will commit at least up to this tail</param>
/// <param name="metadataChanged"> whether commit metadata (e.g., iterators) has changed </param>
/// <returns></returns>
public bool AdmitCommit(long currentTail, bool metadataChanged);
public abstract bool AdmitCommit(long currentTail, bool metadataChanged);

/// <summary>
/// Invoked when a commit is successfully created
/// </summary>
/// <param name="info"> commit content </param>
public void OnCommitCreated(FasterLogRecoveryInfo info);
public abstract void OnCommitCreated(FasterLogRecoveryInfo info);

/// <summary>
/// Invoked after a commit is complete
/// </summary>
/// <param name="info"> commit content </param>
public void OnCommitFinished(FasterLogRecoveryInfo info);
public abstract void OnCommitFinished(FasterLogRecoveryInfo info);

/// <summary>
/// The default log commit policy ensures that each record is covered by at most one commit request (except when
/// the metadata has changed). Redundant commit calls are dropped and corresponding commit invocation will
/// return false.
/// </summary>
/// <returns> policy object </returns>
public static LogCommitPolicy Default() => new DefaultLogCommitPolicy();

/// <summary>
/// MaxParallel log commit policy allows k (non-strong) commit requests to be in progress at any giving time. The k commits are guaranteed
/// to be non-overlapping unless there are metadata changes. Additional commit requests will fail and
/// automatically retried.
/// </summary>
/// <param name="k"> maximum number of commits that can be outstanding at a time </param>
/// <returns> policy object </returns>
public static LogCommitPolicy MaxParallel(int k) => new MaxParallelLogCommitPolicy(k);


/// <summary>
/// RateLimit log commit policy will only issue a request if it covers at least m bytes or if there has not been a
/// commit request in n milliseconds. Additional commit requests will fail and automatically retried
/// </summary>
/// <param name="thresholdMilli">
/// minimum time, in milliseconds, to be allowed between two commits, unless thresholdRange bytes will be committed
/// </param>
/// <param name="thresholdBytes">
/// minimum range, in bytes, to be allowed between two commits, unless it has been thresholdMilli milliseconds
/// </param>
/// <returns> policy object </returns>
public static LogCommitPolicy RateLimit(long thresholdMilli, long thresholdBytes) => new RateLimitLogCommitPolicy(thresholdMilli, thresholdBytes);
}

internal class DefaultCommitPolicy : IFasterLogCommitPolicy
internal sealed class DefaultLogCommitPolicy : LogCommitPolicy
{

/// <inheritdoc/>
public void OnAttached(FasterLog log) {}
public override void OnAttached(FasterLog log) {}

/// <inheritdoc/>
public bool AdmitCommit(long currentTail, bool metadataChanged) => true;
public override bool AdmitCommit(long currentTail, bool metadataChanged) => true;

/// <inheritdoc/>
public void OnCommitCreated(FasterLogRecoveryInfo info) {}
public override void OnCommitCreated(FasterLogRecoveryInfo info) { }

/// <inheritdoc/>
public void OnCommitFinished(FasterLogRecoveryInfo info) {}
public override void OnCommitFinished(FasterLogRecoveryInfo info) { }
}

internal class MaxParallelCommitPolicy : IFasterLogCommitPolicy
internal sealed class MaxParallelLogCommitPolicy : LogCommitPolicy
{
private FasterLog log;
private int commitInProgress, maxCommitInProgress;
readonly int maxCommitInProgress;
FasterLog log;
int commitInProgress;
// If we filtered out some commit, make sure to remember to retry later
private bool shouldRetry;
bool shouldRetry;

internal MaxParallelCommitPolicy(int maxCommitInProgress)
internal MaxParallelLogCommitPolicy(int maxCommitInProgress)
{
this.maxCommitInProgress = maxCommitInProgress;
}

/// <inheritdoc/>
public void OnAttached(FasterLog log) => this.log = log;
public override void OnAttached(FasterLog log) => this.log = log;

/// <inheritdoc/>
public bool AdmitCommit(long currentTail, bool metadataChanged)
public override bool AdmitCommit(long currentTail, bool metadataChanged)
{
while (true)
{
Expand All @@ -91,12 +121,10 @@ public bool AdmitCommit(long currentTail, bool metadataChanged)
}

/// <inheritdoc/>
public void OnCommitCreated(FasterLogRecoveryInfo info)
{
}
public override void OnCommitCreated(FasterLogRecoveryInfo info) { }

/// <inheritdoc/>
public void OnCommitFinished(FasterLogRecoveryInfo info)
public override void OnCommitFinished(FasterLogRecoveryInfo info)
{
Interlocked.Decrement(ref commitInProgress);
if (shouldRetry)
Expand All @@ -107,14 +135,17 @@ public void OnCommitFinished(FasterLogRecoveryInfo info)
}
}

internal class RateLimitCommitPolicy : IFasterLogCommitPolicy
internal sealed class RateLimitLogCommitPolicy : LogCommitPolicy
{
private FasterLog log;
private Stopwatch stopwatch;
private long lastAdmittedMilli, lastAdmittedAddress, thresholdMilli, thresholdRange;
private int shouldRetry = 0;
readonly Stopwatch stopwatch;
readonly long thresholdMilli;
readonly long thresholdRange;
FasterLog log;
long lastAdmittedMilli;
long lastAdmittedAddress;
int shouldRetry = 0;

internal RateLimitCommitPolicy(long thresholdMilli, long thresholdRange)
internal RateLimitLogCommitPolicy(long thresholdMilli, long thresholdRange)
{
this.thresholdMilli = thresholdMilli;
this.thresholdRange = thresholdRange;
Expand All @@ -124,10 +155,10 @@ internal RateLimitCommitPolicy(long thresholdMilli, long thresholdRange)
}

/// <inheritdoc/>
public void OnAttached(FasterLog log) => this.log = log;
public override void OnAttached(FasterLog log) => this.log = log;

/// <inheritdoc/>
public bool AdmitCommit(long currentTail, bool metadataChanged)
public override bool AdmitCommit(long currentTail, bool metadataChanged)
{
var now = stopwatch.ElapsedMilliseconds;
while (true)
Expand All @@ -154,50 +185,11 @@ public bool AdmitCommit(long currentTail, bool metadataChanged)
return true;
}
}

/// <inheritdoc/>
public void OnCommitCreated(FasterLogRecoveryInfo info)
{
}

/// <inheritdoc/>
public void OnCommitFinished(FasterLogRecoveryInfo info)
{
}
}

public sealed partial class FasterLog : IDisposable
{
/// <summary>
/// The default commit strategy ensures that each record is covered by at most one commit request (except when
/// the metadata has changed). Redundant commit calls are dropped and corresponding commit invocation will
/// return false.
/// </summary>
/// <returns> policy object </returns>
public static IFasterLogCommitPolicy DefaultStrategy() => new DefaultCommitPolicy();
public override void OnCommitCreated(FasterLogRecoveryInfo info) { }

/// <summary>
/// Allows k (non-strong) commit requests to be in progress at any giving time. The k commits are guaranteed
/// to be non-overlapping unless there are metadata changes. Additional commit requests will fail and
/// automatically retried.
/// </summary>
/// <param name="k"> maximum number of commits that can be outstanding at a time </param>
/// <returns> policy object </returns>
public static IFasterLogCommitPolicy MaxParallelCommitStrategy(int k) => new MaxParallelCommitPolicy(k);


/// <summary>
/// RateLimitCommitStrategy will only issue a request if it covers at least m bytes or if there has not been a
/// commit request in n milliseconds. Additional commit requests will fail and automatically retried
/// </summary>
/// <param name="thresholdMilli">
/// minimum time, in milliseconds, to be allowed between two commits, unless thresholdRange bytes will be committed
/// </param>
/// <param name="thresholdBytes">
/// minimum range, in bytes, to be allowed between two commits, unless it has been thresholdMilli milliseconds
/// </param>
/// <returns> policy object </returns>
public static IFasterLogCommitPolicy RateLimitCommitStrategy(long thresholdMilli, long thresholdBytes) =>
new RateLimitCommitPolicy(thresholdMilli, thresholdBytes);
/// <inheritdoc/>
public override void OnCommitFinished(FasterLogRecoveryInfo info) { }
}
}
4 changes: 2 additions & 2 deletions cs/src/core/Utilities/BufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ public sealed class SectorAlignedBufferPool
/// Disable buffer pool.
/// This static option should be enabled on program entry, and not modified once FASTER is instantiated.
/// </summary>
public static bool Disabled = false;
public static bool Disabled;

/// <summary>
/// Unpin objects when they are returned to the pool, so that we do not hold pinned objects long term.
/// If set, we will unpin when objects are returned and re-pin when objects are returned from the pool.
/// This static option should be enabled on program entry, and not modified once FASTER is instantiated.
/// </summary>
public static bool UnpinOnReturn = false;
public static bool UnpinOnReturn;

private const int levels = 32;
private readonly int recordSize;
Expand Down
14 changes: 6 additions & 8 deletions cs/test/DeviceFasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public void BasicHighLatencyDeviceTest()
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);

// Create devices \ log for test for in memory device
using LocalMemoryDevice device = new LocalMemoryDevice(1L << 28, 1L << 25, 2, latencyMs: 20);
using FasterLog LocalMemorylog = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 80, MemorySizeBits = 20, GetMemory = null, SegmentSizeBits = 80, MutableFraction = 0.2, LogCommitManager = null });
using var device = new LocalMemoryDevice(1L << 28, 1L << 25, 2, latencyMs: 20);
using var LocalMemorylog = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 80, MemorySizeBits = 20, GetMemory = null, SegmentSizeBits = 80, MutableFraction = 0.2, LogCommitManager = null });

int entryLength = 10;

Expand All @@ -86,13 +86,11 @@ public void BasicHighLatencyDeviceTest()

// Read the log just to verify was actually committed
int currentEntry = 0;
using (var iter = LocalMemorylog.Scan(0, 100_000_000))
using var iter = LocalMemorylog.Scan(0, 100_000_000);
while (iter.GetNext(out byte[] result, out _, out _))
{
while (iter.GetNext(out byte[] result, out _, out _))
{
Assert.IsTrue(result[currentEntry] == currentEntry, "Fail - Result[" + currentEntry.ToString() + "]: is not same as " + currentEntry.ToString());
currentEntry++;
}
Assert.IsTrue(result[currentEntry] == currentEntry, "Fail - Result[" + currentEntry.ToString() + "]: is not same as " + currentEntry.ToString());
currentEntry++;
}
}

Expand Down
Loading