Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Implements Client Retry policy #22394

Merged
merged 31 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9799481
implementation of client retry policy
simorenoh Feb 15, 2024
4a16dc1
ignore N-2 on ci
simorenoh Feb 15, 2024
fcee910
Update ci.yml
simorenoh Feb 15, 2024
8c515d4
changes to pass ci
simorenoh Feb 15, 2024
40c4ca2
Merge branch 'main' into cosmos_client_retry_policy
simorenoh Feb 15, 2024
fd5fe0d
Update go.mod
simorenoh Feb 15, 2024
b7d3930
Update go.sum
simorenoh Feb 15, 2024
309f1a8
make method private, add test
simorenoh Feb 15, 2024
ca73451
enableEndpointDiscovery->enableCrossRegionRetries, remove public area…
simorenoh Feb 21, 2024
2c23366
saved constants, moved logic around in policy for non-duplicity
simorenoh Feb 22, 2024
24ead83
added partial tests, missing 503s/ connectivity issues handling
simorenoh Feb 27, 2024
d62816c
finalizing behavior and tests
simorenoh Feb 29, 2024
b0613c0
revert pipeline useragent, return non-retryable errors to skip Core r…
simorenoh Mar 1, 2024
0e3f3ff
Merge branch 'main' into cosmos_client_retry_policy
simorenoh Mar 4, 2024
187fb8e
mark create/delete management plane operations as writes
simorenoh Mar 8, 2024
5fea609
force refresh ability added, delete/replace operations marked as write
simorenoh Mar 9, 2024
d7e41a9
remove print statements
simorenoh Mar 11, 2024
5c5ba4d
refactor
ealsur Mar 12, 2024
19d465d
missing comma
ealsur Mar 12, 2024
d3dedd8
detecting dns failures
ealsur Mar 12, 2024
704dcc6
missing update
ealsur Mar 12, 2024
0394d4b
deal with errors fetching initial account information
simorenoh Mar 12, 2024
8a042c6
linter
ealsur Mar 12, 2024
3de7be6
more linter
ealsur Mar 13, 2024
5f3fa59
Update cosmos_client_retry_policy_test.go
simorenoh Mar 13, 2024
b96c47a
add DNS test
simorenoh Mar 13, 2024
76c101a
fix error handling logic for dns
simorenoh Mar 13, 2024
adfe2b5
small fix to ensure no wrong index is called
simorenoh Mar 13, 2024
c4cc073
Merge branch 'main' into cosmos_client_retry_policy
simorenoh Mar 13, 2024
d1f2e16
fix new locking logic
simorenoh Mar 14, 2024
130e998
override header for response on write metadata operations
simorenoh Mar 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/data/azcosmos/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ stages:
parameters:
ServiceDirectory: 'data/azcosmos'
UsePipelineProxy: false
ExcludeGoNMinus2: true
- stage: Emulator
displayName: 'Cosmos Emulator'
variables:
Expand Down
18 changes: 11 additions & 7 deletions sdk/data/azcosmos/cosmos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
)
Expand All @@ -42,10 +41,11 @@ func (c *Client) Endpoint() string {
// options - Optional Cosmos client options. Pass nil to accept default values.
func NewClientWithKey(endpoint string, cred KeyCredential, o *ClientOptions) (*Client, error) {
preferredRegions := []string{}
enableCrossRegionRetries := true
if o != nil {
preferredRegions = o.PreferredRegions
}
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newSharedKeyCredPolicy(cred), o), preferredRegions, 0)
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newSharedKeyCredPolicy(cred), o), preferredRegions, 0, enableCrossRegionRetries)
if err != nil {
return nil, err
}
Expand All @@ -62,10 +62,11 @@ func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (
return nil, err
}
preferredRegions := []string{}
enableCrossRegionRetries := true
if o != nil {
preferredRegions = o.PreferredRegions
}
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), o), preferredRegions, 0)
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), o), preferredRegions, 0, enableCrossRegionRetries)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -124,6 +125,7 @@ func newPipeline(authPolicy policy.Policy, gem *globalEndpointManager, options *
},
PerRetry: []policy.Policy{
authPolicy,
&clientRetryPolicy{gem: gem},
},
},
&options.ClientOptions)
Expand Down Expand Up @@ -195,8 +197,10 @@ func (c *Client) CreateDatabase(
}

operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: ""}
resourceType: resourceTypeDatabase,
resourceAddress: "",
isWriteOperation: true,
}

path, err := generatePathForNameBased(resourceTypeDatabase, "", true)
if err != nil {
Expand All @@ -220,7 +224,7 @@ func (c *Client) CreateDatabase(
// NewQueryDatabasesPager executes query for databases.
// query - The SQL query to execute.
// o - Options for the operation.
func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions) *runtime.Pager[QueryDatabasesResponse] {
func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions) *azruntime.Pager[QueryDatabasesResponse] {
queryOptions := &QueryDatabasesOptions{}
if o != nil {
originalOptions := *o
Expand All @@ -234,7 +238,7 @@ func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions)

path, _ := generatePathForNameBased(resourceTypeDatabase, operationContext.resourceAddress, true)

return runtime.NewPager(runtime.PagingHandler[QueryDatabasesResponse]{
return azruntime.NewPager(azruntime.PagingHandler[QueryDatabasesResponse]{
More: func(page QueryDatabasesResponse) bool {
return page.ContinuationToken != ""
},
Expand Down
188 changes: 188 additions & 0 deletions sdk/data/azcosmos/cosmos_client_retry_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azcosmos

import (
"errors"
"fmt"
"net"
"net/http"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/internal/errorinfo"
)

type clientRetryPolicy struct {
gem *globalEndpointManager
useWriteEndpoint bool
retryCount int
sessionRetryCount int
preferredLocationIndex int
}

const maxRetryCount = 120
const defaultBackoff = 1

func (p *clientRetryPolicy) Do(req *policy.Request) (*http.Response, error) {
p.resetPolicyCounters()
o := pipelineRequestOptions{}
if !req.OperationValue(&o) {
return nil, fmt.Errorf("failed to obtain request options, please check request being sent: %s", req.Body())
}
for {
resolvedEndpoint := p.gem.ResolveServiceEndpoint(p.retryCount, o.isWriteOperation, p.useWriteEndpoint)
req.Raw().Host = resolvedEndpoint.Host
req.Raw().URL.Host = resolvedEndpoint.Host
response, err := req.Next() // err can happen in weird scenarios (connectivity, etc)
if err != nil {
if p.isNetworkConnectionError(err) {
shouldRetry, err := p.attemptRetryOnNetworkError(req)
if err != nil {
return nil, err
}
if !shouldRetry {
return nil, errorinfo.NonRetriableError(azruntime.NewResponseErrorWithErrorCode(response, response.Status))
}
err = req.RewindBody()
if err != nil {
return nil, err
}
p.retryCount += 1
continue
}

return nil, err
}
subStatus := response.Header.Get(cosmosHeaderSubstatus)
if p.shouldRetryStatus(response.StatusCode, subStatus) {
p.useWriteEndpoint = false
if response.StatusCode == http.StatusForbidden {
shouldRetry, err := p.attemptRetryOnEndpointFailure(req, o.isWriteOperation)
if err != nil {
return nil, err
}
if !shouldRetry {
return nil, errorinfo.NonRetriableError(azruntime.NewResponseErrorWithErrorCode(response, response.Status))
}
} else if response.StatusCode == http.StatusNotFound {
if !p.attemptRetryOnSessionUnavailable(req, o.isWriteOperation) {
return nil, errorinfo.NonRetriableError(azruntime.NewResponseErrorWithErrorCode(response, response.Status))
}
} else if response.StatusCode == http.StatusServiceUnavailable {
if !p.attemptRetryOnServiceUnavailable(req, o.isWriteOperation) {
return nil, errorinfo.NonRetriableError(azruntime.NewResponseErrorWithErrorCode(response, response.Status))
}
}
err = req.RewindBody()
if err != nil {
return response, err
}
p.retryCount += 1
continue
}

return response, err
}

}

func (p *clientRetryPolicy) shouldRetryStatus(status int, subStatus string) (shouldRetry bool) {
if (status == http.StatusForbidden && (subStatus == subStatusWriteForbidden || subStatus == subStatusDatabaseAccountNotFound)) ||
(status == http.StatusNotFound && subStatus == subStatusReadSessionNotAvailable) ||
(status == http.StatusServiceUnavailable) {
return true
}
return false
}

func (p *clientRetryPolicy) attemptRetryOnNetworkError(req *policy.Request) (bool, error) {
if (p.retryCount > maxRetryCount) || !p.gem.locationCache.enableCrossRegionRetries {
return false, nil
}

err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL)
if err != nil {
return false, err
}
err = p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL)
if err != nil {
return false, err
}
err = p.gem.Update(req.Raw().Context(), false)
if err != nil {
return false, err
}

time.Sleep(defaultBackoff * time.Second)
return true, nil
}

func (p *clientRetryPolicy) attemptRetryOnEndpointFailure(req *policy.Request, isWriteOperation bool) (bool, error) {
if (p.retryCount > maxRetryCount) || !p.gem.locationCache.enableCrossRegionRetries {
return false, nil
}
if isWriteOperation {
err := p.gem.MarkEndpointUnavailableForWrite(*req.Raw().URL)
if err != nil {
return false, err
}
} else {
err := p.gem.MarkEndpointUnavailableForRead(*req.Raw().URL)
if err != nil {
return false, err
}
}

err := p.gem.Update(req.Raw().Context(), isWriteOperation)
if err != nil {
return false, err
}

time.Sleep(defaultBackoff * time.Second)
return true, nil
}

func (p *clientRetryPolicy) attemptRetryOnSessionUnavailable(req *policy.Request, isWriteOperation bool) bool {
if p.gem.CanUseMultipleWriteLocations() {
endpoints := p.gem.locationCache.locationInfo.availReadLocations
if isWriteOperation {
endpoints = p.gem.locationCache.locationInfo.availWriteLocations
}
if p.sessionRetryCount >= len(endpoints) {
return false
}
} else {
if p.sessionRetryCount > 0 {
return false
}
p.useWriteEndpoint = true
}
p.sessionRetryCount += 1
return true
}

func (p *clientRetryPolicy) attemptRetryOnServiceUnavailable(req *policy.Request, isWriteOperation bool) bool {
if !p.gem.locationCache.enableCrossRegionRetries || p.preferredLocationIndex >= len(p.gem.preferredLocations) {
return false
}
if isWriteOperation && !p.gem.CanUseMultipleWriteLocations() {
return false
}
p.preferredLocationIndex += 1
return true
}

func (p *clientRetryPolicy) resetPolicyCounters() {
p.retryCount = 0
p.sessionRetryCount = 0
p.preferredLocationIndex = 0
}

// isNetworkConnectionError checks if the error is related to failure to connect / resolve DNS
func (p *clientRetryPolicy) isNetworkConnectionError(err error) bool {
var dnserror *net.DNSError
return errors.As(err, &dnserror)
}
Loading
Loading