Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] CTL: Adds query scenario #2701

Merged
merged 14 commits into from
Sep 14, 2021
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos.Samples/Tools/CTL/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ private static ICTLScenario CreateScenario(WorkloadType workloadType)
WorkloadType.ReadWriteQuery => new ReadWriteQueryScenario(),
WorkloadType.ChangeFeedProcessor => new ChangeFeedProcessorScenario(),
WorkloadType.ChangeFeedPull => new ChangeFeedPullScenario(),
WorkloadType.Query => new QueryScenario(),
_ => throw new NotImplementedException($"No mapping for {workloadType}"),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ namespace CosmosCTL

internal class ChangeFeedProcessorScenario : ICTLScenario
{
private InitializationResult initializationResult;
private Utils.InitializationResult initializationResult;

public async Task InitializeAsync(
CTLConfig config,
CosmosClient cosmosClient,
ILogger logger)
{
this.initializationResult = await CreateDatabaseAndContainerAsync(config, cosmosClient);
this.initializationResult = await Utils.CreateDatabaseAndContainerAsync(config, cosmosClient);
if (this.initializationResult.CreatedDatabase)
{
logger.LogInformation("Created database for execution");
Expand All @@ -39,7 +39,8 @@ public async Task InitializeAsync(
if (config.PreCreatedDocuments > 0)
{
logger.LogInformation("Pre-populating {0} documents", config.PreCreatedDocuments);
await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) });
IReadOnlyDictionary<string, IReadOnlyList<Dictionary<string, string>>> insertedDocuments = await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) });
this.initializationResult.InsertedDocuments = insertedDocuments[config.Collection].Count;
}
}

Expand All @@ -61,10 +62,17 @@ public async Task RunAsync(

try
{
object lockObject = new object();
long documentTotal = 0;
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(config.Database, config.Collection)
.GetChangeFeedProcessorBuilder<SimpleItem>("ctlProcessor",
(IReadOnlyCollection<SimpleItem> docs, CancellationToken token) =>
{
lock (lockObject)
{
documentTotal += docs.Count;
}

metrics.Measure.Counter.Increment(documentCounter, docs.Count);
return Task.CompletedTask;
})
Expand Down Expand Up @@ -112,6 +120,14 @@ public async Task RunAsync(

previousMin = sortedRange.Max;
}

if (config.PreCreatedDocuments > 0)
{
if (this.initializationResult.InsertedDocuments != documentTotal)
{
logger.LogError($"Expected to receive {this.initializationResult.InsertedDocuments} documents and got {documentTotal}");
}
}
}
catch (Exception ex)
{
Expand All @@ -132,48 +148,6 @@ public async Task RunAsync(
}
}

private static async Task<InitializationResult> CreateDatabaseAndContainerAsync(
CTLConfig config,
CosmosClient cosmosClient)
{
InitializationResult result = new InitializationResult()
{
CreatedDatabase = false,
CreatedContainer = false
};

Database database;
try
{
database = await cosmosClient.GetDatabase(config.Database).ReadAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
{
DatabaseResponse databaseResponse = await cosmosClient.CreateDatabaseAsync(config.Database, config.Throughput);
result.CreatedDatabase = true;
database = databaseResponse.Database;
}

Container container;
try
{
container = await database.GetContainer(config.Collection).ReadContainerAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
{
await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}");
result.CreatedContainer = true;
}

return result;
}

private struct InitializationResult
{
public bool CreatedDatabase;
public bool CreatedContainer;
}

private class SimpleItem
{
[JsonProperty("id")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ namespace CosmosCTL
using App.Metrics;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;
using App.Metrics.Gauge;

internal class ChangeFeedPullScenario : ICTLScenario
{
private InitializationResult initializationResult;
private Utils.InitializationResult initializationResult;

public async Task InitializeAsync(
CTLConfig config,
CosmosClient cosmosClient,
ILogger logger)
{
this.initializationResult = await CreateDatabaseAndContainerAsync(config, cosmosClient);
this.initializationResult = await Utils.CreateDatabaseAndContainerAsync(config, cosmosClient);

if (this.initializationResult.CreatedDatabase)
{
Expand All @@ -37,7 +39,8 @@ public async Task InitializeAsync(
if (config.PreCreatedDocuments > 0)
{
logger.LogInformation("Pre-populating {0} documents", config.PreCreatedDocuments);
await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) });
IReadOnlyDictionary<string, IReadOnlyList<Dictionary<string, string>>> insertedDocuments = await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) });
this.initializationResult.InsertedDocuments = insertedDocuments[config.Collection].Count;
}
}

Expand All @@ -51,13 +54,16 @@ public async Task RunAsync(
{
Stopwatch stopWatch = Stopwatch.StartNew();

GaugeOptions documentGauge= new GaugeOptions { Name = "#Documents received", Context = loggingContextIdentifier };
Container container = cosmosClient.GetContainer(config.Database, config.Collection);

while (stopWatch.Elapsed <= config.RunningTimeDurationAsTimespan)
{
int documentTotal = 0;
long documentTotal = 0;
string continuation = null;
Container container = cosmosClient.GetContainer(config.Database, config.Collection);
FeedIterator<Dictionary<string, string>> changeFeedPull = container.GetChangeFeedIterator<Dictionary<string, string>>
(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.Incremental);
using FeedIterator<Dictionary<string, string>> changeFeedPull
= container.GetChangeFeedIterator<Dictionary<string, string>>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.Incremental);

try
{
while (changeFeedPull.HasMoreResults)
Expand All @@ -71,65 +77,28 @@ public async Task RunAsync(
}
}

if (config.PreCreatedDocuments == documentTotal)
{
logger.LogInformation($"Success: The number of new documents match the number of pre-created documents: {config.PreCreatedDocuments}");
}
else
metrics.Measure.Gauge.SetValue(documentGauge, documentTotal);

if (config.PreCreatedDocuments > 0)
{
logger.LogError($"The prepopulated documents and the new documents don't match. Preconfigured Docs = {config.PreCreatedDocuments}, New Documents = {documentTotal}");
logger.LogError(continuation);
if (this.initializationResult.InsertedDocuments == documentTotal)
{
logger.LogInformation($"Success: The number of new documents match the number of pre-created documents: {this.initializationResult.InsertedDocuments}");
}
else
{
logger.LogError($"The prepopulated documents and the change feed documents don't match. Preconfigured Docs = {this.initializationResult.InsertedDocuments}, Change feed Documents = {documentTotal}.{Environment.NewLine}{continuation}");
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failure while looping through new documents");
metrics.Measure.Gauge.SetValue(documentGauge, documentTotal);
logger.LogError(ex, "Failure while looping through change feed documents");
}
}

stopWatch.Stop();
}
private static async Task<InitializationResult> CreateDatabaseAndContainerAsync(
CTLConfig config,
CosmosClient cosmosClient)
{
InitializationResult result = new InitializationResult()
{
CreatedDatabase = false,
CreatedContainer = false
};

Database database;

try
{
database = await cosmosClient.GetDatabase(config.Database).ReadAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
{
DatabaseResponse databaseResponse = await cosmosClient.CreateDatabaseAsync(config.Database, config.Throughput);
result.CreatedDatabase = true;
database = databaseResponse.Database;
}

Container container;

try
{
container = await database.GetContainer(config.Collection).ReadContainerAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
{
await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}");
result.CreatedContainer = true;
}

return result;
}
private struct InitializationResult
{
public bool CreatedDatabase;
public bool CreatedContainer;
}
}
}
Loading