Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Cosmos | Connection endpoint rediscovery (connection state listener) #14697

Closed
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
84a37ab
Port from v4
Nov 7, 2019
d218925
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Nov 8, 2019
11cbbf3
Merge branch 'master' of github.com:David-Noble-at-work/azure-sdk-for…
Nov 8, 2019
05c7e05
Corrected package misspelling in log4j.properties and removed System.…
Nov 9, 2019
8dfa3db
Merge branch 'master' of github.com:David-Noble-at-work/azure-sdk-for…
Nov 9, 2019
e6e71f5
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Nov 9, 2019
e082d81
Merge branch 'master' of github.com:David-Noble-at-work/azure-sdk-for…
Nov 17, 2019
069c822
Merge branch 'master' of github.com:David-Noble-at-work/azure-sdk-for…
Nov 23, 2019
3ead1cc
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Dec 3, 2019
669ba8b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Dec 4, 2019
735f572
Responded to code review comments
Dec 4, 2019
6db330d
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Jan 4, 2020
48855b5
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Jan 4, 2020
2367f50
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Jan 17, 2020
fb9cac0
Updated sdk/cosmos/README.md with info on using system properties to …
Jan 23, 2020
1dd708e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Jan 23, 2020
203507a
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Mar 16, 2020
368b10f
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Mar 16, 2020
a0b44a7
Merge branch 'master' of github.com:David-Noble-at-work/azure-sdk-for…
Apr 15, 2020
d810ba8
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
May 6, 2020
64a12af
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Jun 4, 2020
29f2c73
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java
Jun 26, 2020
e4df526
Checkpoint for safe keeping
Jun 27, 2020
916f99e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
Jun 30, 2020
ae1b11b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
Jul 2, 2020
019e90b
Updated error handling in RntbdTransportClient.invokeStoreAsync, opti…
Jul 2, 2020
ba4d4ed
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into fea…
Jul 7, 2020
2f0b886
Ported changes from David-Noble-at-work/issue/cosmos-4.X/#10401
Jul 7, 2020
ba13859
Checkpoint for safe keeping.
Jul 8, 2020
3293e11
Checkpoint for safe keeping.
Jul 9, 2020
a83cd62
Checkpoint for safe keeping
Jul 9, 2020
1902669
Checkpoint for safe keeping
Jul 10, 2020
f466158
Revisions for correctness
Jul 11, 2020
6b72281
Merge from origin master + refinements: e2e test passes
Jul 11, 2020
4a0360e
Refinements to implementation
Jul 12, 2020
ea4ca55
Rediscovery is now correctly triggered whenever a failure occurs. E2e…
Jul 13, 2020
a04d40d
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into fea…
Jul 18, 2020
18ab502
Bug fix
Jul 19, 2020
d2d892c
Bug fix
Jul 21, 2020
201b643
Removed some dead code
Jul 22, 2020
fa9eaf6
Clarified the text of the message produced by RntbdServiceEndpoint.th…
Jul 24, 2020
7c830cf
Tweaked RntbdClientChannelHandler.init based on preliminary test resu…
Jul 25, 2020
d4c8561
Added new RntbdTransportClient.Options member and removed some dead m…
Jul 25, 2020
6c57447
RntbdClientChannelHandler does not install an IdleStateHandler when t…
Jul 31, 2020
b08361e
Tweaked request timeline which now reports a null event duration if t…
Aug 3, 2020
1ca1c3b
Benchmark tweak to ensure collections are recreated if they already e…
Aug 6, 2020
233f5b9
Tweaked benchmark and updated RntbdConnectionStateListener.updateAddr…
Aug 17, 2020
a631804
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into fea…
Aug 17, 2020
2079b5b
Experimental: treat 410 with sub-status code zero as server-down due …
Aug 26, 2020
64d2111
Tweaked diagnostics
Aug 26, 2020
bd54f11
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into fea…
Aug 26, 2020
94834b9
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into fea…
Aug 27, 2020
130de85
Refinements
Aug 30, 2020
4ff0680
RntbdAddressCacheToken now computes partition key range identities fo…
Aug 31, 2020
45ffb90
Implemented UpdateStrategy for connection state listener: DEFERRED or…
Sep 1, 2020
c280476
Completed a number of TODOs for this PR
Sep 2, 2020
9f57a9c
Refinements and corrections based on review feedback
Sep 3, 2020
77f3e88
Refinements and corrections based on review feedback and test-33 results
Sep 4, 2020
cc2b936
Refinements and corrections based on review feedback
Sep 4, 2020
ff4d980
Formatted JavaDoc
Sep 4, 2020
ac593ec
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into fea…
Sep 4, 2020
4502126
Formatted JavaDoc
Sep 4, 2020
90d98b9
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into fea…
Sep 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ abstract class AsyncBenchmark<T> {
configuration = cfg;
logger = LoggerFactory.getLogger(this.getClass());

cosmosAsyncDatabase = cosmosClient.getDatabase(this.configuration.getDatabaseId());

try {
cosmosAsyncDatabase = cosmosClient.getDatabase(this.configuration.getDatabaseId());
cosmosAsyncDatabase.read().block();
} catch (CosmosException e) {
if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
Expand All @@ -103,25 +104,25 @@ abstract class AsyncBenchmark<T> {
}
}

try {
cosmosAsyncContainer = cosmosAsyncDatabase.getContainer(this.configuration.getCollectionId());
cosmosAsyncContainer = cosmosAsyncDatabase.getContainer(this.configuration.getCollectionId());

try {
cosmosAsyncContainer.read().block();

} catch (CosmosException e) {
if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
cosmosAsyncDatabase.createContainer(
this.configuration.getCollectionId(),
Configuration.DEFAULT_PARTITION_KEY_PATH,
ThroughputProperties.createManualThroughput(this.configuration.getThroughput())
).block();

cosmosAsyncContainer = cosmosAsyncDatabase.getContainer(this.configuration.getCollectionId());
logger.info("Collection {} is created for this test", this.configuration.getCollectionId());
collectionCreated = true;
} else {

if (e.getStatusCode() != HttpConstants.StatusCodes.NOTFOUND) {
throw e;
}

cosmosAsyncDatabase.createContainer(
this.configuration.getCollectionId(),
Configuration.DEFAULT_PARTITION_KEY_PATH,
ThroughputProperties.createManualThroughput(this.configuration.getThroughput())
).block();

cosmosAsyncContainer = cosmosAsyncDatabase.getContainer(this.configuration.getCollectionId());
logger.info("Collection {} is created for this test", this.configuration.getCollectionId());
collectionCreated = true;
}

partitionKey = cosmosAsyncContainer.read().block().getProperties().getPartitionKeyDefinition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ private void scheduleScaleUp(int delayStartInSeconds, int newThroughput) {
},
() -> {
logger.info("Collection Scale up request sent to the service");

}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class DirectConnectionConfig {
private static final int DEFAULT_MAX_REQUESTS_PER_CONNECTION = 30;

private Duration connectTimeout;
private boolean enableConnectionEndpointRediscovery;
private Duration idleConnectionTimeout;
private Duration idleEndpointTimeout;
private Duration requestTimeout;
Expand All @@ -33,6 +34,7 @@ public final class DirectConnectionConfig {
*/
public DirectConnectionConfig() {
this.connectTimeout = DEFAULT_CONNECT_TIMEOUT;
this.enableConnectionEndpointRediscovery = false;
this.idleConnectionTimeout = Duration.ZERO;
this.idleEndpointTimeout = DEFAULT_IDLE_ENDPOINT_TIMEOUT;
this.maxConnectionsPerEndpoint = DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT;
Expand Down Expand Up @@ -79,6 +81,28 @@ public DirectConnectionConfig setConnectTimeout(Duration connectTimeout) {
return this;
}

/**
* Gets a value that indicates whether Direct TCP connection endpoint rediscovery should is enabled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand on what it is from CX perspective.

QQ; Do we want this feature flag as prominent in ConnectionConfig?

Copy link
Author

@David-Noble-at-work David-Noble-at-work Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good question whether we want this flag as prominent as it is. This PR now enables connection endpoint rediscovery by default. I would prefer not to advertise the feature until we've got more experience with it. Enabling the feature by default is counter to that. One might argue that enabling the feature by default with a highly visible option to turn the feature off is preferred because it's easier to do that in code than by using a less obvious mechanism (such as azure.cosmos.directTcp.defaultOptions).

@kushagraThapar, @moderakh what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, any feature may have bug on its first release. I prefer if we change the default to enabled later when we are more confident on the behaviour.

*
* @return {@code true} if Direct TCP connection endpoint rediscovery should is enabled; {@code false} otherwise.\
*/
public boolean getEnableConnectionEndpointRediscovery() {
David-Noble-at-work marked this conversation as resolved.
Show resolved Hide resolved
return this.enableConnectionEndpointRediscovery;
}

/**
* Sets a value that indicates whether Direct TCP connection endpoint rediscovery should is enabled.
*
* @param enableConnectionEndpointRediscovery {@code true} if connection endpoint rediscovery is enabled; {@code
* false} otherwise.
*
* @return the {@linkplain DirectConnectionConfig}.
*/
public DirectConnectionConfig setEnableConnectionEndpointRediscovery(boolean enableConnectionEndpointRediscovery) {
David-Noble-at-work marked this conversation as resolved.
Show resolved Hide resolved
this.enableConnectionEndpointRediscovery = enableConnectionEndpointRediscovery;
return this;
}

/**
* Gets the idle connection timeout for direct client
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ public final class ConnectionPolicy {

// Direct connection config properties
private Duration connectTimeout;
private Duration idleEndpointTimeout;
private boolean enableTcpConnectionEndpointRediscovery;
private Duration idleTcpConnectionTimeout;
private Duration idleTcpEndpointTimeout;
private int maxConnectionsPerEndpoint;
private int maxRequestsPerConnection;
private Duration idleTcpConnectionTimeout;

/**
* Constructor.
*/
public ConnectionPolicy(GatewayConnectionConfig gatewayConnectionConfig) {
this(ConnectionMode.GATEWAY);
this.enableTcpConnectionEndpointRediscovery = false;
David-Noble-at-work marked this conversation as resolved.
Show resolved Hide resolved
this.idleHttpConnectionTimeout = gatewayConnectionConfig.getIdleConnectionTimeout();
this.maxConnectionPoolSize = gatewayConnectionConfig.getMaxConnectionPoolSize();
this.requestTimeout = BridgeInternal.getRequestTimeoutFromGatewayConnectionConfig(gatewayConnectionConfig);
Expand All @@ -60,8 +62,9 @@ public ConnectionPolicy(GatewayConnectionConfig gatewayConnectionConfig) {
public ConnectionPolicy(DirectConnectionConfig directConnectionConfig) {
this(ConnectionMode.DIRECT);
this.connectTimeout = directConnectionConfig.getConnectTimeout();
this.enableTcpConnectionEndpointRediscovery = directConnectionConfig.getEnableConnectionEndpointRediscovery();
this.idleTcpConnectionTimeout = directConnectionConfig.getIdleConnectionTimeout();
this.idleEndpointTimeout = directConnectionConfig.getIdleEndpointTimeout();
this.idleTcpEndpointTimeout = directConnectionConfig.getIdleEndpointTimeout();
this.maxConnectionsPerEndpoint = directConnectionConfig.getMaxConnectionsPerEndpoint();
this.maxRequestsPerConnection = directConnectionConfig.getMaxRequestsPerConnection();
this.requestTimeout = BridgeInternal.getRequestTimeoutFromDirectConnectionConfig(directConnectionConfig);
Expand All @@ -87,6 +90,25 @@ public static ConnectionPolicy getDefaultPolicy() {
return ConnectionPolicy.defaultPolicy;
}

/**
* Gets a value that indicates whether Direct TCP connection endpoint rediscovery should is enabled.
*
* @return {@code true} if Direct TCP connection endpoint rediscovery should is enabled; {@code false} otherwise.
*/
public boolean getEnableTcpConnectionEndpointRediscovery() {
return this.enableTcpConnectionEndpointRediscovery;
}

/**
* Sets a value that indicates whether Direct TCP connection endpoint rediscovery should is enabled.
*
* @return the {@linkplain ConnectionPolicy}.
*/
public ConnectionPolicy getEnableTcpConnectionEndpointRediscovery(boolean enableTcpConnectionEndpointRediscovery) {
this.enableTcpConnectionEndpointRediscovery = enableTcpConnectionEndpointRediscovery;
return this;
}

/**
* Gets the request timeout (time to wait for response from network peer).
*
Expand Down Expand Up @@ -426,17 +448,17 @@ public ConnectionPolicy setConnectTimeout(Duration connectTimeout) {
* Gets the idle endpoint timeout
* @return the idle endpoint timeout
*/
public Duration getIdleEndpointTimeout() {
return idleEndpointTimeout;
public Duration getIdleTcpEndpointTimeout() {
return this.idleTcpEndpointTimeout;
}

/**
* Sets the idle endpoint timeout
* @param idleEndpointTimeout the idle endpoint timeout
* @param idleTcpEndpointTimeout the idle endpoint timeout
* @return the {@link ConnectionPolicy}
*/
public ConnectionPolicy setIdleEndpointTimeout(Duration idleEndpointTimeout) {
this.idleEndpointTimeout = idleEndpointTimeout;
public ConnectionPolicy setIdleTcpEndpointTimeout(Duration idleTcpEndpointTimeout) {
this.idleTcpEndpointTimeout = idleTcpEndpointTimeout;
return this;
}

Expand Down Expand Up @@ -493,7 +515,7 @@ public String toString() {
", inetSocketProxyAddress=" + (proxy != null ? proxy.getAddress() : null) +
", readRequestsFallbackEnabled=" + readRequestsFallbackEnabled +
", connectTimeout=" + connectTimeout +
", idleEndpointTimeout=" + idleEndpointTimeout +
", idleEndpointTimeout=" + idleTcpEndpointTimeout +
", maxConnectionsPerEndpoint=" + maxConnectionsPerEndpoint +
David-Noble-at-work marked this conversation as resolved.
Show resolved Hide resolved
", maxRequestsPerConnection=" + maxRequestsPerConnection +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,37 @@ public static class StatusCodes {
}

public static class SubStatusCodes {

// Unknown SubStatus Code
public static final int UNKNOWN = 0;

// 400: Bad Request substatus
// 400: Bad Request sub-status
public static final int PARTITION_KEY_MISMATCH = 1001;
public static final int CROSS_PARTITION_QUERY_NOT_SERVABLE = 1004;

// 410: StatusCodeType_Gone: substatus
// 410: StatusCodeType_Gone: sub-status
public static final int NAME_CACHE_IS_STALE = 1000;
public static final int PARTITION_KEY_RANGE_GONE = 1002;
public static final int COMPLETING_SPLIT = 1007;
public static final int COMPLETING_PARTITION_MIGRATION = 1008;

// 403: Forbidden substatus
// 403: Forbidden sub-status
public static final int FORBIDDEN_WRITEFORBIDDEN = 3;
public static final int DATABASE_ACCOUNT_NOTFOUND = 1008;

// 404: LSN in session token is higher
public static final int READ_SESSION_NOT_AVAILABLE = 1002;

// 410: Gone sub-status
// Sub-status code indicating that a GoneException was instantiated by the client; not as a result of a response
// message provide by a Cosmos instance.
public static final int CLIENT_GENERATED = 10_000;

// 410: Gone sub-status
// Sub-status code zero in a response from a service endpoint indicates that a replica is being discontinued or
// reconfigured. When endpoint rediscovery is enabled the RntbdTransportClient converts sub-status code zero to
// this sub-status code value.
public static final int DISCONTINUING_SERVICE = CLIENT_GENERATED + 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How doe these sub-status code impact GoneRetryPolicy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed by way of the logs on test33 and also by code inspection. This does not effect retry policy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when rntbd throws GoneException, Gone will reach GoneAndRetryWithRetryPolicy, and it will decide how it should get retried, based on status/code and substatus code.

Please check the behaviour of GoneAndRetryWithRetryPolicy if for the given statuscode/substatus code it does the desired behabour or not.

}

public static class HeaderValues {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,10 @@ public static final class Event {
@JsonIgnore
private final Duration duration;

@JsonSerialize(using = ToStringSerializer.class)
private final long durationInMicroSec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we removing this? I found this very helpful.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is reported, just not stored as a value. Are you saying that it is useful for debugging?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as durationInMicroSec is removed, does the PR changes the information available in rntbd request diagnostics?

if so could you provide a sample on new rntbd request timeline diagnostics format, info?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value will still be logged:
"serializationDiagnosticsContext":{"serializationDiagnosticsList":[{"serializationType":"ITEM_SERIALIZATION","startTimeUTC":"22 Sep 2020 17:40:59.239","endTimeUTC":"22 Sep 2020 17:40:59.239","durationInMicroSec":0},{"serializationType":"PARTITION_KEY_FETCH_SERIALIZATION","startTimeUTC":"22 Sep 2020 17:40:59.317","endTimeUTC":"22 Sep 2020 17:40:59.323","durationInMicroSec":5998}]},"gatewayStatistics":null,"systemInformation":{"usedMemory":"60586 KB","availableMemory":"8309590 KB","systemCpuLoad":"(2020-09-22T17:40:58.729927700Z 20.1%)"}}


@JsonProperty("eventName")
private final String name;

@JsonSerialize(using = ToStringSerializer.class)
@JsonProperty("startTimeUTC")
private final Instant startTime;

public Event(final String name, final Instant from, final Instant to) {
Expand All @@ -172,11 +169,12 @@ public Event(final String name, final Instant from, final Instant to) {
this.name = name;
this.startTime = from;

this.duration = from == null ? null : to == null ? Duration.ZERO : Duration.between(from, to);
if(this.duration != null) {
this.durationInMicroSec = duration.toNanos()/1000L;
if (from == null) {
this.duration = null;
} else if (to == null) {
this.duration = Duration.ZERO;
David-Noble-at-work marked this conversation as resolved.
Show resolved Hide resolved
} else {
this.durationInMicroSec = 0;
this.duration = Duration.between(from, to);
}
}

Expand Down
Loading