Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend LeaseCollection allowed partition key paths to "\partitionKey" to support gremlin accounts #158

Merged
merged 18 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ protected DocumentCollectionInfo MonitoredCollectionInfo

protected readonly bool IsPartitionedLeaseCollection;

protected readonly bool isPartitionedByLeasePk;

public IntegrationTest(
bool isPartitionedMonitoredCollection = true,
bool isPartitionedLeaseCollection = false)
bool isPartitionedLeaseCollection = false,
bool isPartitionedByLeasePk = false)
{
this.IsPartitionedMonitoredCollection = isPartitionedMonitoredCollection;
this.IsPartitionedLeaseCollection = isPartitionedLeaseCollection;
this.isPartitionedByLeasePk = isPartitionedByLeasePk;
}

public async Task InitializeAsync()
Expand All @@ -119,7 +123,7 @@ public async Task InitializeAsync()

if (this.IsPartitionedLeaseCollection)
{
leaseCollection.PartitionKey = new PartitionKeyDefinition { Paths = { "/id" } };
leaseCollection.PartitionKey = this.isPartitionedByLeasePk ? new PartitionKeyDefinition { Paths = { "/leasepk" } } : new PartitionKeyDefinition { Paths = { "/id" } };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please break too long line

}

using (var client = new DocumentClient(this.LeaseCollectionInfo.Uri, this.LeaseCollectionInfo.MasterKey, this.LeaseCollectionInfo.ConnectionPolicy))
Expand Down Expand Up @@ -160,7 +164,7 @@ private async Task CreateMonitoredCollectionAsync(string monitoredCollectionName

if (this.IsPartitionedMonitoredCollection)
{
monitoredCollection.PartitionKey = new PartitionKeyDefinition { Paths = { "/id" } };
monitoredCollection.PartitionKey = new PartitionKeyDefinition { Paths = { "/partitionId" } };
CPrashanth marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. Licensed under the MIT license.
//----------------------------------------------------------------

namespace Microsoft.Azure.Documents.ChangeFeedProcessor.IntegrationTests
{
using Xunit;

/// <summary>
/// To truly test this class, run emulator with /EnableGremlinEndpoint
/// </summary>
[Trait("Category", "Integration")]
[Collection("Integration tests")]
public class LeasePkLeaseCollectionTests : StaticCollectionTests
{
public LeasePkLeaseCollectionTests() :
base(true,true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after comma

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public StaticCollectionTests(
{
}

public StaticCollectionTests(
bool isPartitionedLeaseCollection, bool isPartitionedByLeasePk) :
base(isPartitionedLeaseCollection: isPartitionedLeaseCollection, isPartitionedByLeasePk: isPartitionedByLeasePk)
{
}

[Fact]
public async Task CountDocumentsInCollection_NormalCase()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,49 @@ public async Task BuildPassesPartitionKey_WhenLeaseCollectionIsPartitionedById()
Assert.Equal(typeof(LeaseLostException), exception.GetType());
}

[Fact]
public async Task BuildPassesPartitionKey_WhenLeaseCollectionIsPartitionedByLeasePk()
{
var leaseCollection = MockHelpers.CreateCollection(
"collectionId",
"collectionRid",
new PartitionKeyDefinition { Paths = { "/leasepk" } },
collectionLink);

var lease = new DocumentServiceLease()
{
LeasePartitionKey = "leasePk",
Id = "Id"
};

var leaseClient = this.CreateMockDocumentClient(collection);
Mock.Get(leaseClient)
.Setup(c => c.ReadDocumentCollectionAsync(
It.IsAny<Uri>(),
It.IsAny<RequestOptions>()))
.ReturnsAsync(new ResourceResponse<DocumentCollection>(leaseCollection));
Mock.Get(leaseClient)
.Setup(c => c.ReadDocumentAsync(
It.IsAny<Uri>(),
It.IsAny<RequestOptions>(),
It.IsAny<CancellationToken>()))
.Callback((Uri uri, RequestOptions options, CancellationToken token) =>
{
if (new PartitionKey("leasePk").Equals(options.PartitionKey))
throw DocumentExceptionHelpers.CreateNotFoundException(); // Success code path: cause lease lost.
throw new Exception("Failure");
});

this.builder
.WithFeedDocumentClient(this.CreateMockDocumentClient())
.WithLeaseDocumentClient(leaseClient)
.WithObserverFactory(Mock.Of<IChangeFeedObserverFactory>());
await this.builder.BuildAsync();

Exception exception = await Record.ExceptionAsync(() => this.builder.LeaseStoreManager.ReleaseAsync(lease));
Assert.Equal(typeof(LeaseLostException), exception.GetType());
}

[Fact]
public async Task BuildThrowsWhenBothPartitionProcessorFactoriesSpecified()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.LeaseManagemen
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Azure.Documents.ChangeFeedProcessor.LeaseManagement;
using Microsoft.Azure.Documents.ChangeFeedProcessor.PartitionManagement;
using Newtonsoft.Json;
using Xunit;

[Trait("Category", "Gated")]
Expand Down Expand Up @@ -55,6 +57,7 @@ public void ValidateProperties()
Assert.Equal(timestamp, lease.Timestamp);
Assert.Equal(value, lease.Properties[key]);
Assert.Equal(etag, lease.ConcurrencyToken);
Assert.Null(lease.LeasePartitionKey);
}

[Fact]
Expand All @@ -63,6 +66,7 @@ public void ValidateSerialization_AllFields()
DocumentServiceLease originalLease = new DocumentServiceLease
{
Id = "id",
LeasePartitionKey="leasePk",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not modify existing tests but add new tests for the case when lease collection is partitioned by partitionKey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ETag = "etag",
PartitionId = "0",
Owner = "owner",
Expand All @@ -80,6 +84,7 @@ public void ValidateSerialization_AllFields()
var lease = (DocumentServiceLease)formatter.Deserialize(stream2);

Assert.Equal(originalLease.Id, lease.Id);
Assert.Equal(originalLease.LeasePartitionKey, lease.LeasePartitionKey);
Assert.Equal(originalLease.ETag, lease.ETag);
Assert.Equal(originalLease.PartitionId, lease.PartitionId);
Assert.Equal(originalLease.Owner, lease.Owner);
Expand All @@ -102,12 +107,121 @@ public void ValidateSerialization_NullFields()
var lease = (DocumentServiceLease)formatter.Deserialize(stream2);

Assert.Null(lease.Id);
Assert.Null(lease.LeasePartitionKey);
Assert.Null(lease.ETag);
Assert.Null(lease.PartitionId);
Assert.Null(lease.Owner);
Assert.Null(lease.ContinuationToken);
Assert.Equal(new DocumentServiceLease().Timestamp, lease.Timestamp);
Assert.Empty(lease.Properties);
}


#region Compat_Tests
// this class doesnt contain LeaseId property

[Serializable]
class DocumentServiceLeaseV1
{
private static readonly DateTime UnixStartTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);

[JsonProperty("id")]
public string Id { get; set; }

[JsonProperty("_etag")]
public string ETag { get; set; }

[JsonProperty("PartitionId")]
public string PartitionId { get; set; }

[JsonProperty("Owner")]
public string Owner { get; set; }

/// <summary>
/// Gets or sets the current value for the offset in the stream.
/// </summary>
[JsonProperty("ContinuationToken")]
public string ContinuationToken { get; set; }

[JsonIgnore]
public DateTime Timestamp
{
get { return this.ExplicitTimestamp ?? UnixStartTime.AddSeconds(this.TS); }
set { this.ExplicitTimestamp = value; }
}

[JsonIgnore]
public string ConcurrencyToken => this.ETag;

[JsonProperty("properties")]
public Dictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();

[JsonIgnore]
public LeaseAcquireReason AcquireReason { get; set; }

[JsonProperty("timestamp")]
private DateTime? ExplicitTimestamp { get; set; }

[JsonProperty("_ts")]
private long TS { get; set; }

}

[Fact]
public void ValidateBackwardCompat_OldLeaseFormat()
{

DocumentServiceLeaseV1 originalLease = new DocumentServiceLeaseV1
{
Id = "id",
ETag = "etag",
PartitionId = "0",
Owner = "owner",
ContinuationToken = "continuation",
Timestamp = DateTime.Now - TimeSpan.FromSeconds(5),
Properties = new Dictionary<string, string> { { "key", "value" } }
};

var serializedV1Lease = JsonConvert.SerializeObject(originalLease);
var lease = JsonConvert.DeserializeObject<DocumentServiceLease>(serializedV1Lease);

Assert.Equal(originalLease.Id, lease.Id);
Assert.Equal(null, lease.LeasePartitionKey);
Assert.Equal(originalLease.ETag, lease.ETag);
Assert.Equal(originalLease.PartitionId, lease.PartitionId);
Assert.Equal(originalLease.Owner, lease.Owner);
Assert.Equal(originalLease.ContinuationToken, lease.ContinuationToken);
Assert.Equal(originalLease.Timestamp, lease.Timestamp);
Assert.Equal(originalLease.Properties["key"], lease.Properties["key"]);
}

[Fact]
public void ValidateForwardCompat_OldLeaseFormat()
{

DocumentServiceLease originalLease = new DocumentServiceLease
{
Id = "id",
ETag = "etag",
PartitionId = "0",
Owner = "owner",
ContinuationToken = "continuation",
Timestamp = DateTime.Now - TimeSpan.FromSeconds(5),
Properties = new Dictionary<string, string> { { "key", "value" } }
};

var serializedLease = JsonConvert.SerializeObject(originalLease);
var lease = JsonConvert.DeserializeObject<DocumentServiceLeaseV1>(serializedLease);

Assert.Equal(originalLease.Id, lease.Id);
Assert.Equal(originalLease.ETag, lease.ETag);
Assert.Equal(originalLease.PartitionId, lease.PartitionId);
Assert.Equal(originalLease.Owner, lease.Owner);
Assert.Equal(originalLease.ContinuationToken, lease.ContinuationToken);
Assert.Equal(originalLease.Timestamp, lease.Timestamp);
Assert.Equal(originalLease.Properties["key"], lease.Properties["key"]);
}
#endregion
CPrashanth marked this conversation as resolved.
Show resolved Hide resolved

}
}
30 changes: 20 additions & 10 deletions src/DocumentDB.ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public async Task<IChangeFeedProcessor> BuildAsync()

await this.InitializeCollectionPropertiesForBuildAsync().ConfigureAwait(false);

ILeaseStoreManager leaseStoreManager = await this.GetLeaseStoreManagerAsync(this.leaseCollectionLocation, true).ConfigureAwait(false);
ILeaseStoreManager leaseStoreManager = await this.GetLeaseStoreManagerAsync(this.leaseCollectionLocation).ConfigureAwait(false);
IPartitionManager partitionManager = this.BuildPartitionManager(leaseStoreManager);
return new ChangeFeedProcessor(partitionManager);
}
Expand All @@ -397,7 +397,7 @@ public async Task<IRemainingWorkEstimator> BuildEstimatorAsync()

await this.InitializeCollectionPropertiesForBuildAsync().ConfigureAwait(false);

var leaseStoreManager = await this.GetLeaseStoreManagerAsync(this.leaseCollectionLocation, true).ConfigureAwait(false);
var leaseStoreManager = await this.GetLeaseStoreManagerAsync(this.leaseCollectionLocation).ConfigureAwait(false);

IRemainingWorkEstimator remainingWorkEstimator = new RemainingWorkEstimator(
leaseStoreManager,
Expand Down Expand Up @@ -461,8 +461,7 @@ private IPartitionManager BuildPartitionManager(ILeaseStoreManager leaseStoreMan
}

private async Task<ILeaseStoreManager> GetLeaseStoreManagerAsync(
DocumentCollectionInfo collectionInfo,
bool isPartitionKeyByIdRequiredIfPartitioned)
DocumentCollectionInfo collectionInfo)
{
if (this.LeaseStoreManager == null)
{
Expand All @@ -473,15 +472,26 @@ private async Task<ILeaseStoreManager> GetLeaseStoreManagerAsync(
collection.PartitionKey != null &&
collection.PartitionKey.Paths != null &&
collection.PartitionKey.Paths.Count > 0;
if (isPartitioned && isPartitionKeyByIdRequiredIfPartitioned &&
(collection.PartitionKey.Paths.Count != 1 || collection.PartitionKey.Paths[0] != "/id"))

if (isPartitioned &&
(collection.PartitionKey.Paths.Count != 1 || !(collection.PartitionKey.Paths[0].Equals($"/{DocumentServiceLease.IdPropertyName}", StringComparison.OrdinalIgnoreCase) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please break too long lines. Normally 120 chars.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

collection.PartitionKey.Paths[0].Equals($"/{DocumentServiceLease.LeasePartitionKeyPropertyName}", StringComparison.OrdinalIgnoreCase))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't use OrdinalIgnoreCase. JSON is case-sensitive. Just compare using "!=" . If using .Equals also check that Paths[0] is not null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
throw new ArgumentException("The lease collection, if partitioned, must have partition key equal to id.");
throw new ArgumentException($"The lease collection, if partitioned, must have partition key equal to {DocumentServiceLease.IdPropertyName} or {DocumentServiceLease.LeasePartitionKeyPropertyName}.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more correct to say /id or /partitionKey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

var requestOptionsFactory = isPartitioned ?
(IRequestOptionsFactory)new PartitionedByIdCollectionRequestOptionsFactory() :
(IRequestOptionsFactory)new SinglePartitionRequestOptionsFactory();
IRequestOptionsFactory requestOptionsFactory = null;
if (isPartitioned)
{
// we allow only id or leasePk partitioning so check only flag
requestOptionsFactory = collection.PartitionKey.Paths[0].Equals($"/{DocumentServiceLease.IdPropertyName}", StringComparison.OrdinalIgnoreCase) ?
(IRequestOptionsFactory)new PartitionedByIdCollectionRequestOptionsFactory() :
(IRequestOptionsFactory)new PartitionedByLeasePkCollectionRequestOptionsFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think cast to interface is needed. Is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gives cs8370 error. conditional target type expression available in c# 9.0

}
else
{
requestOptionsFactory = (IRequestOptionsFactory)new SinglePartitionRequestOptionsFactory();
}

string leasePrefix = this.GetLeasePrefix();
var leaseStoreManagerBuilder = new DocumentServiceLeaseStoreManagerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace Microsoft.Azure.Documents.ChangeFeedProcessor.LeaseManagement
[Serializable]
internal class DocumentServiceLease : ILease, ILeaseAcquireReasonProvider
{
internal const string IdPropertyName = "id";
internal const string LeasePartitionKeyPropertyName = "leasepk";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's already a lease. Can we have this as "pk" or "partitionKey", I would prefer the latter as it more clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


private static readonly DateTime UnixStartTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);

public DocumentServiceLease()
Expand All @@ -32,9 +35,17 @@ public DocumentServiceLease(DocumentServiceLease other)
this.Properties = other.Properties;
}

[JsonProperty("id")]
[JsonProperty(IdPropertyName)]
public string Id { get; set; }

/// <summary>
/// Gets or sets property to be used as partition key path for lease collections.
/// This is clone of existing Id property to maintain backward compat.
/// This property name is compatible to both GremlinAccounts and SqlAccounts
/// </summary>
[JsonProperty(LeasePartitionKeyPropertyName)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure that when it's null, we don't serialize to JSON. I think that would be NullValueHandling property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

public string LeasePartitionKey { get; set; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name it as just PartitionKey. This class is already a lease.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


[JsonProperty("_etag")]
public string ETag { get; set; }

Expand Down
Loading