Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos DB: Fix Hierarchical PK support #871

Merged
merged 12 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<!-- Extensions can have independent versions and only increment when released -->
<Version>3.0.0$(VersionSuffix)</Version>
<ExtensionsVersion>5.0.0$(VersionSuffix)</ExtensionsVersion> <!-- WebJobs.Extensions -->
<CosmosDBVersion>4.3.1$(VersionSuffix)</CosmosDBVersion>
<CosmosDBVersion>4.4.0$(VersionSuffix)</CosmosDBVersion>
<HttpVersion>3.2.0$(VersionSuffix)</HttpVersion>
<MobileAppsVersion>3.0.0$(VersionSuffix)</MobileAppsVersion>
<SendGridVersion>3.0.3$(VersionSuffix)</SendGridVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<WarningsAsErrors />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.33.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.35.4" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.37" />
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.1.0" />
Expand Down
135 changes: 76 additions & 59 deletions test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBEndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,66 +33,66 @@ public class CosmosDBEndToEndTests
[Fact]
public async Task CosmosDBEndToEnd()
{
_loggerProvider.ClearAllLogMessages();
using (var host = await StartHostAsync(typeof(EndToEndTestClass)))
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);

// Call the outputs function directly, which will write out 3 documents
// using with the 'input' property set to the value we provide.
var input = Guid.NewGuid().ToString();
var parameter = new Dictionary<string, object>();
parameter["input"] = input;
try
{
// Call the outputs function directly, which will write out 3 documents
// using with the 'input' property set to the value we provide.
var input = Guid.NewGuid().ToString();
var parameter = new Dictionary<string, object>();
parameter["input"] = input;

await host.GetJobHost().CallAsync(nameof(EndToEndTestClass.Outputs), parameter);
await host.GetJobHost().CallAsync(nameof(EndToEndTestClass.Outputs), parameter);

// Also insert a new Document so we can query on it.
var response = await client.GetContainer(DatabaseName, CollectionName).UpsertItemAsync<Item>(new Item() { Id = Guid.NewGuid().ToString() });
// Also insert a new Document so we can query on it.
var response = await client.GetContainer(DatabaseName, CollectionName).UpsertItemAsync<Item>(new Item() { Id = Guid.NewGuid().ToString() });

// Now craft a queue message to send to the Inputs, which will pull these documents.
var queueInput = new QueueItem
{
DocumentId = response.Resource.Id,
Input = input
};
// Now craft a queue message to send to the Inputs, which will pull these documents.
var queueInput = new QueueItem
{
DocumentId = response.Resource.Id,
Input = input
};

parameter.Clear();
parameter["item"] = JsonConvert.SerializeObject(queueInput);
parameter.Clear();
parameter["item"] = JsonConvert.SerializeObject(queueInput);

await host.GetJobHost().CallAsync(nameof(EndToEndTestClass.Inputs), parameter);
await host.GetJobHost().CallAsync(nameof(EndToEndTestClass.Inputs), parameter);

await TestHelpers.Await(() =>
{
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) == 4
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger with string called!")) == 4
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger with retry called!")) == 8
&& logMessages.Count(p => p.Exception != null && p.Exception.InnerException.Message.Contains("Test exception") && !p.Category.StartsWith("Host.Results")) > 0;
});

// Make sure the Options were logged. Just check a few values.
string optionsMessage = _loggerProvider.GetAllLogMessages()
.Single(m => m.Category == "Microsoft.Azure.WebJobs.Hosting.OptionsLoggingService" && m.FormattedMessage.StartsWith(nameof(CosmosDBOptions)))
.FormattedMessage;
JObject loggedOptions = JObject.Parse(optionsMessage.Substring(optionsMessage.IndexOf(Environment.NewLine)));
Assert.Null(loggedOptions["ConnectionMode"].Value<string>());

// Clean-up leases
Container leaseContainer = client.GetContainer(DatabaseName, LeaseCollectionName);
using FeedIterator<JObject> leaseIterator = leaseContainer.GetItemQueryIterator<JObject>();
while (leaseIterator.HasMoreResults)
{
FeedResponse<JObject> leaseIteratorResponse = await leaseIterator.ReadNextAsync();
foreach (JObject lease in leaseIteratorResponse)
await TestHelpers.Await(() =>
{
await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), new PartitionKey(lease.Value<string>("id")));
}
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) == 4
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger with string called!")) == 4
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger with retry called!")) == 8
&& logMessages.Count(p => p.Exception != null && p.Exception.InnerException.Message.Contains("Test exception") && !p.Category.StartsWith("Host.Results")) > 0;
});

// Make sure the Options were logged. Just check a few values.
string optionsMessage = _loggerProvider.GetAllLogMessages()
.Single(m => m.Category == "Microsoft.Azure.WebJobs.Hosting.OptionsLoggingService" && m.FormattedMessage.StartsWith(nameof(CosmosDBOptions)))
.FormattedMessage;
JObject loggedOptions = JObject.Parse(optionsMessage.Substring(optionsMessage.IndexOf(Environment.NewLine)));
Assert.Null(loggedOptions["ConnectionMode"].Value<string>());
}
finally
{
// Clean-up leases
await CleanUpLeaseContainer(client);

await host.StopAsync();
}
}
}

[Fact]
public async Task CosmosDBEndToEndCancellation()
{
_loggerProvider.ClearAllLogMessages();
using (var host = await StartHostAsync(typeof(EndToEndCancellationTestClass)))
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);
Expand All @@ -110,25 +110,23 @@ public async Task CosmosDBEndToEndCancellation()
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);

await TestHelpers.Await(() =>
try
{
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) > 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger canceled!")) == 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Saw the first document again!")) == 1
&& logMessages.Count(p => p.Exception is TaskCanceledException) > 0;
});

// Clean-up leases
Container leaseContainer = client.GetContainer(DatabaseName, LeaseCollectionName);
using FeedIterator<JObject> leaseIterator = leaseContainer.GetItemQueryIterator<JObject>();
while (leaseIterator.HasMoreResults)
await TestHelpers.Await(() =>
{
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) > 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger canceled!")) == 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Saw the first document again!")) == 1
&& logMessages.Count(p => p.Exception is TaskCanceledException) > 0;
});
}
finally
{
FeedResponse<JObject> leaseIteratorResponse = await leaseIterator.ReadNextAsync();
foreach (JObject lease in leaseIteratorResponse)
{
await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), new PartitionKey(lease.Value<string>("id")));
}
// Clean-up leases
await CleanUpLeaseContainer(client);

await host.StopAsync();
}
}
}
Expand All @@ -151,6 +149,25 @@ public static async Task<CosmosClient> InitializeDocumentClientAsync(IConfigurat
return client;
}

public static async Task CleanUpLeaseContainer(CosmosClient client)
{
Container leaseContainer = client.GetContainer(DatabaseName, LeaseCollectionName);
using FeedIterator<JObject> leaseIterator = leaseContainer.GetItemQueryIterator<JObject>();
while (leaseIterator.HasMoreResults)
{
FeedResponse<JObject> leaseIteratorResponse = await leaseIterator.ReadNextAsync();
foreach (JObject lease in leaseIteratorResponse)
{
ResponseMessage delete = await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), new PartitionKey(lease.Value<string>("id")));
if (delete.StatusCode == System.Net.HttpStatusCode.NotFound)
{
// Support old non-partitioned lease container in CI
await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), PartitionKey.None);
}
}
}
}

private async Task<IHost> StartHostAsync(Type testType)
{
ExplicitTypeLocator locator = new ExplicitTypeLocator(testType);
Expand Down