Skip to content

Commit

Permalink
Performance: Refactors query prefetch mechanism (#4361)
Browse files Browse the repository at this point in the history
* sketch out improved ParallelPrefetcher; focus is on reducing allocations, but we also can't be substantially slower to start all tasks

* little more cleanup to further reduce allocations, and save a tiny amount of CPU

* start on testing

* some tweaks and testing for buffer management

* test exception handling; fix a bug in high concurrency case that would swallow exceptions

* test cancellation

* more testing, a little bit of cleanup

* test the case where the enumerator faults; fixes a couple leaks of Tasks and buffers that could occur in that some places

* tiny bit of cleanup

* cleanup and expand comments; code is tricky, it needs documentation

* address a whole bunch of style nits, just to keep compiler Message counts down

* address some feedback on comment clarity

* don't rely on finalizers for testing, it's too brittle; hold up was not allocating more in the non-test cases, but found a field to reuse; needs benchmarking

* complete ITrace proxy for testing

* style nits and a bit more commentary

* explicit test for concurrent access to the inner IEnumerator

* explicit test that IEnumerator is disposed

* explicitly implement all ITrace members

* address feedback: internal class members should be public or private

* address feedback: break test-only bits of ParallelPrefetch out into a partial

* address feedback: move const above type declarations

* address feedback: naming nits

* address feedback: use the existing NoOpTrace

* address feedback: remove pointless using

* update baseline trace text for QueryAsync test

---------

Co-authored-by: neildsh <35383880+neildsh@users.noreply.github.com>
  • Loading branch information
kevin-montrose and neildsh authored Apr 1, 2024
1 parent 15d83a7 commit e04ce51
Show file tree
Hide file tree
Showing 4 changed files with 2,081 additions and 66 deletions.
121 changes: 121 additions & 0 deletions Microsoft.Azure.Cosmos/src/Pagination/ParallelPrefetch.Testing.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Pagination
{
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Tracing;

/// <summary>
/// Holds the "just for testing"-bits of <see cref="ParallelPrefetch"/>.
/// </summary>
internal static partial class ParallelPrefetch
{
/// <summary>
/// For testing purposes, provides ways to instrument <see cref="PrefetchInParallelAsync(IEnumerable{IPrefetcher}, int, ITrace, CancellationToken)"/>.
///
/// You shouldn't be using this outside of test projects.
/// </summary>
internal sealed class ParallelPrefetchTestConfig : ITrace
{
private ITrace innerTrace;

private int startedTasks;
private int awaitedTasks;

public ArrayPool<IPrefetcher> PrefetcherPool { get; private set; }
public ArrayPool<Task> TaskPool { get; private set; }
public ArrayPool<object> ObjectPool { get; private set; }

public int StartedTasks
=> this.startedTasks;

public int AwaitedTasks
=> this.awaitedTasks;

string ITrace.Name => this.innerTrace.Name;

Guid ITrace.Id => this.innerTrace.Id;

DateTime ITrace.StartTime => this.innerTrace.StartTime;

TimeSpan ITrace.Duration => this.innerTrace.Duration;

TraceLevel ITrace.Level => this.innerTrace.Level;

TraceComponent ITrace.Component => this.innerTrace.Component;

TraceSummary ITrace.Summary => this.innerTrace.Summary;

ITrace ITrace.Parent => this.innerTrace.Parent;

IReadOnlyList<ITrace> ITrace.Children => this.innerTrace.Children;

IReadOnlyDictionary<string, object> ITrace.Data => this.innerTrace.Data;

public ParallelPrefetchTestConfig(
ArrayPool<IPrefetcher> prefetcherPool,
ArrayPool<Task> taskPool,
ArrayPool<object> objectPool)
{
this.PrefetcherPool = prefetcherPool;
this.TaskPool = taskPool;
this.ObjectPool = objectPool;
}

public void SetInnerTrace(ITrace trace)
{
this.innerTrace = trace;
}

public void TaskStarted()
{
Interlocked.Increment(ref this.startedTasks);
}

public void TaskAwaited()
{
Interlocked.Increment(ref this.awaitedTasks);
}

ITrace ITrace.StartChild(string name)
{
return this.innerTrace.StartChild(name);
}

ITrace ITrace.StartChild(string name, TraceComponent component, TraceLevel level)
{
return this.innerTrace.StartChild(name, component, level);
}

void ITrace.AddDatum(string key, TraceDatum traceDatum)
{
this.innerTrace.AddDatum(key, traceDatum);
}

void ITrace.AddDatum(string key, object value)
{
this.innerTrace.AddDatum(key, value);
}

void ITrace.AddOrUpdateDatum(string key, object value)
{
this.innerTrace.AddOrUpdateDatum(key, value);
}

void ITrace.AddChild(ITrace trace)
{
this.innerTrace.AddChild(trace);
}

void IDisposable.Dispose()
{
this.innerTrace.Dispose();
}
}
}
}
Loading

0 comments on commit e04ce51

Please sign in to comment.