Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Support FasterLog scan from different process #333

Merged
merged 5 commits into from
Sep 18, 2020

Conversation

badrishc
Copy link
Contributor

@badrishc badrishc commented Sep 16, 2020

  • Enables capability for a different process to consume (iterate or tailing scan) FasterLog in read-only mode
  • With this, we can have one committing process and another consuming process for the log entries
  • The way this is accomplished is by allowing the read-only iterating instance to continuously "recover" using the commits written out by the primary FasterLog writer
  • When used with AzureStorageDevice, the enqueuing/committing process and the consuming process may be on different machines, and approximately emulates Kafka/EventHubs behavior.

Sample usage: here

Inline sample:

static async Task SeparateConsumerAsync(CancellationToken cancellationToken)
{
    var device = Devices.CreateLogDevice(path + "mylog");
    var log = new FasterLog(new FasterLogSettings { LogDevice = device, ReadOnlyMode = true, PageSizeBits = 9, SegmentSizeBits = 9 });
    var _ = RecoverAsync(log, cancellationToken);

    using var iter = log.Scan(log.BeginAddress, long.MaxValue);

    await foreach (var (result, length, currentAddress, nextAddress) in iter.GetAsyncEnumerable(cancellationToken))
    {
        Console.WriteLine($"Consuming {Encoding.UTF8.GetString(result)}");
        iter.CompleteUntil(nextAddress);
    }
}

static async Task RecoverAsync(FasterLog log, CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(restorePeriodMs), cancellationToken);

        Console.WriteLine("Restoring ...");

        log.RecoverReadOnly();
    }
}

Fix #332

@badrishc badrishc changed the title [C#] [WIP] Support scan from different process [C#] Support scan from different process Sep 16, 2020
@badrishc badrishc changed the title [C#] Support scan from different process [C#] Support FasterLog scan from different process Sep 16, 2020
@AlgorithmsAreCool
Copy link

AlgorithmsAreCool commented Sep 17, 2020

I think there might be an issue. I'm getting an Access Violation when running a separate reader in an aggressive loop.

Fatal error. System.AccessViolationException: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
   at FASTER.core.FasterLog.GetLength(Byte*)
   at FASTER.core.FasterLogScanIterator.GetNextInternal(Int64 ByRef, Int32 ByRef, Int64 ByRef, Int64 ByRef)
   at FASTER.core.FasterLogScanIterator.GetNext(Byte[] ByRef, Int32 ByRef, Int64 ByRef, Int64 ByRef)
   at FASTER.core.FasterLogScanIterator+<GetAsyncEnumerable>d__19.MoveNext()
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]](System.__Canon ByRef)
   at FASTER.core.FasterLogScanIterator+<GetAsyncEnumerable>d__19.System.Collections.Generic.IAsyncEnumerator<(System.Byte[]entry,System.Int32entryLength,System.Int64currentAddress,System.Int64nextAddress)>.MoveNextAsync(System.Byte[]entry,System.Int32entryLength,System.Int64currentAddress,System.Int64nextAddress)>.MoveNextAsync()
   at FasterDiskTest.Program+<SeparateConsumerAsync>d__7.MoveNext()
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].MoveNext(System.Threading.Thread)
   at System.Threading.ThreadPoolGlobals+<>c.<.cctor>b__5_0(System.Object)
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].InvokeContinuation()
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].SignalCompletion()
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].SetResult(Boolean)
   at FASTER.core.FasterLogScanIterator+<GetAsyncEnumerable>d__19.MoveNext()
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].MoveNext(System.Threading.Thread)
   at System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean)
   at System.Threading.Tasks.Task.RunContinuations(System.Object)
   at System.Threading.Tasks.Task`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].TrySetResult(Boolean)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].SetResult(Boolean)
   at System.Runtime.CompilerServices.AsyncValueTaskMethodBuilder`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].SetResult(Boolean)
   at FASTER.core.FasterLogScanIterator+<SlowWaitAsync>d__22.MoveNext()
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].ExecutionContextCallback(System.Object)
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(System.Threading.Thread, System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].MoveNext(System.Threading.Thread)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].ExecuteFromThreadPool(System.Threading.Thread)
   at System.Threading.ThreadPoolWorkQueue.Dispatch()

@badrishc
Copy link
Contributor Author

Okay, we'll take a look. Any changes made to the sample that cause it?

@badrishc
Copy link
Contributor Author

The sample uses very small page sizes and segment sizes, btw for simplicity. For performance, we need to bump those up.

@AlgorithmsAreCool
Copy link

I think i'm running over some limit, it fails when currentAddress == 0x0000000080000000 exactly in GetNextInternal. Going to bump some settings up

@AlgorithmsAreCool
Copy link

currentAddress has become > _headAddress in GetNextInternal.

                if (currentAddress < _headAddress)
                {
                    if (BufferAndLoad(currentAddress, _currentPage, _currentFrame, _headAddress))
                        continue;
                    physicalAddress = frame.GetPhysicalAddress(_currentFrame, _currentOffset);
                }
                else
                { // this branch here fires just before the crash. 👈👈
                    physicalAddress = allocator.GetPhysicalAddress(currentAddress);
                }

                // Get and check entry length
                entryLength = fasterLog.GetLength((byte*)physicalAddress); //<-- physicalAddress = 0 here 💣 👈👈

@badrishc
Copy link
Contributor Author

Make sure the log settings for the writer and reader are identical, wrt page size, segment size.

@badrishc
Copy link
Contributor Author

Argh. This line is wrong:

allocator.HeadAddress = int.MaxValue;

It should be:

allocator.HeadAddress = long.MaxValue;

That explains the issue.

@AlgorithmsAreCool
Copy link

Ah, cool. I patched that in my copy, it seems to work past 2GB of writes now.

@AlgorithmsAreCool
Copy link

Perf looks decent too with default settings. Current Rate = 2,121,765.12 rec/s 565,212,495.70 B/s

@AlgorithmsAreCool
Copy link

On an unrelated note, Looking at GetNext(), can i assume that the physicalAddress returned by GetNextInternal(...) is only valid in between epoch.Resume() -> epoch.Suspend() calls.

If the physicalAddress is valid beyond that, is there an opportunity for a zero-copy read?

@badrishc
Copy link
Contributor Author

You are exactly right. It's a buffer address that is valid while you hold the epoch. That's the only reason we copy it out.

@badrishc
Copy link
Contributor Author

Also, with only ONE thread doing the iteration, we do know that the physical address is stable until their next GetNext call. But it's unclear how to safely expose this capability to users as a zero copy.

@badrishc badrishc merged commit ffa3caa into master Sep 18, 2020
@badrishc badrishc deleted the fasterlog-separate-scan branch September 18, 2020 23:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C#] FasterLog - tailing iterator from different process
2 participants