Skip to content

GODRIVER-3444 Adjust getMore maxTimeMS Calculation for tailable awaitData Cursors #1925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e4aa2b3
DRIVERS-2868 Guard server round trips from timeout
prestonvasquez Jan 14, 2025
b2e5cbb
GODRIVER-3444 Implement validation
prestonvasquez Jan 29, 2025
7397921
Merge branch 'master' into GODRIVER-3444
prestonvasquez Jan 29, 2025
a0fa4b7
GODRIVER-3444 Add test when maxAwaitTimeMS is lt remaining timeout
prestonvasquez Jan 31, 2025
bdf5945
Merge branch 'GODRIVER-3444' of github.com:prestonvasquez/mongo-go-dr…
prestonvasquez Jan 31, 2025
a6fbd2c
GODRIVER-3444 Serverless does not apply to tailable awaitData cursors
prestonvasquez Feb 3, 2025
fdeddb1
GODRIVER-3444 Use prose tests
prestonvasquez Feb 26, 2025
6e37314
Merge branch 'master' into GODRIVER-3444
prestonvasquez Feb 26, 2025
2da2627
GODRIVER-3444 Remove arguments from tests
prestonvasquez Feb 27, 2025
5e7921b
GODRIVER-3444 Update unified spect tests
prestonvasquez Mar 7, 2025
a435efe
GODRIVER-3444 Update tests to reduce event race
prestonvasquez Mar 14, 2025
9d03793
GODRIVER-3444 Resolve merge conflicts
prestonvasquez Apr 2, 2025
8459cd9
GODRIVER-3444 Remove error parameter from CalculateMaxTimeMS
prestonvasquez Apr 2, 2025
e5d5bce
GODRIVER-3444 Run pre-commit
prestonvasquez Apr 2, 2025
0f2ec20
GODRIVER-3444 Check max int
prestonvasquez Apr 2, 2025
210e653
GORIVER-3444 Extend skip to include twd
prestonvasquez Apr 2, 2025
06692da
GORIVER-3444 Extend skip to include twd
prestonvasquez Apr 3, 2025
92b8b5b
GODRIVER-3444 Update error messages
prestonvasquez Apr 3, 2025
db060be
GODRIVER-3444 Resolve merge conflicts
prestonvasquez Apr 16, 2025
62f8bc4
GODRIVER-3444 Resovle merge conflicts
prestonvasquez Apr 16, 2025
34dff0f
Update internal/driverutil/operation.go
prestonvasquez Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions internal/driverutil/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@

package driverutil

import (
"context"
"math"
"time"
)

// Operation Names should be sourced from the command reference documentation:
// https://www.mongodb.com/docs/manual/reference/command/
const (
Expand All @@ -30,3 +36,34 @@ const (
UpdateOp = "update" // UpdateOp is the name for updating
BulkWriteOp = "bulkWrite" // BulkWriteOp is the name for client-level bulk write
)

// CalculateMaxTimeMS calculates the maxTimeMS value to send to the server
// based on the context deadline and the minimum round trip time. If the
// calculated maxTimeMS is likely to cause a socket timeout, then this function
// will return 0 and false.
func CalculateMaxTimeMS(ctx context.Context, rttMin time.Duration) (int64, bool) {
deadline, ok := ctx.Deadline()
if !ok {
return 0, true
}

remainingTimeout := time.Until(deadline)

// Always round up to the next millisecond value so we never truncate the calculated
// maxTimeMS value (e.g. 400 microseconds evaluates to 1ms, not 0ms).
maxTimeMS := int64((remainingTimeout - rttMin + time.Millisecond - 1) / time.Millisecond)
if maxTimeMS <= 0 {
return 0, false
}

// The server will return a "BadValue" error if maxTimeMS is greater
// than the maximum positive int32 value (about 24.9 days). If the
// user specified a timeout value greater than that, omit maxTimeMS
// and let the client-side timeout handle cancelling the op if the
// timeout is ever reached.
if maxTimeMS > math.MaxInt32 {
return 0, true
}

return maxTimeMS, true
}
70 changes: 70 additions & 0 deletions internal/integration/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/internal/failpoint"
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
"go.mongodb.org/mongo-driver/v2/internal/require"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
Expand Down Expand Up @@ -303,6 +304,75 @@ func TestCursor(t *testing.T) {
batchSize = sizeVal.Int32()
assert.Equal(mt, int32(4), batchSize, "expected batchSize 4, got %v", batchSize)
})

tailableAwaitDataCursorOpts := mtest.NewOptions().MinServerVersion("4.4").
Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.LoadBalanced, mtest.Single)

mt.RunOpts("tailable awaitData cursor", tailableAwaitDataCursorOpts, func(mt *mtest.T) {
mt.Run("apply remaining timeoutMS if less than maxAwaitTimeMS", func(mt *mtest.T) {
initCollection(mt, mt.Coll)
mt.ClearEvents()

// Create a find cursor
opts := options.Find().SetBatchSize(1).SetMaxAwaitTime(100 * time.Millisecond)

cursor, err := mt.Coll.Find(context.Background(), bson.D{}, opts)
require.NoError(mt, err)

_ = mt.GetStartedEvent() // Empty find from started list.

defer cursor.Close(context.Background())

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

// Iterate twice to force a getMore
cursor.Next(ctx)
cursor.Next(ctx)

cmd := mt.GetStartedEvent().Command

maxTimeMSRaw, err := cmd.LookupErr("maxTimeMS")
require.NoError(mt, err)

got, ok := maxTimeMSRaw.AsInt64OK()
require.True(mt, ok)

assert.LessOrEqual(mt, got, int64(50))
})

mt.RunOpts("apply maxAwaitTimeMS if less than remaining timeout", tailableAwaitDataCursorOpts, func(mt *mtest.T) {
initCollection(mt, mt.Coll)
mt.ClearEvents()

// Create a find cursor
opts := options.Find().SetBatchSize(1).SetMaxAwaitTime(50 * time.Millisecond)

cursor, err := mt.Coll.Find(context.Background(), bson.D{}, opts)
require.NoError(mt, err)

_ = mt.GetStartedEvent() // Empty find from started list.

defer cursor.Close(context.Background())

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

// Iterate twice to force a getMore
cursor.Next(ctx)
cursor.Next(ctx)

cmd := mt.GetStartedEvent().Command

maxTimeMSRaw, err := cmd.LookupErr("maxTimeMS")
require.NoError(mt, err)

got, ok := maxTimeMSRaw.AsInt64OK()
require.True(mt, ok)

assert.LessOrEqual(mt, got, int64(50))
})
})
}

type tryNextCursor interface {
Expand Down
15 changes: 15 additions & 0 deletions internal/integration/unified/collection_operation_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"go.mongodb.org/mongo-driver/v2/bson"
Expand Down Expand Up @@ -1485,6 +1486,20 @@ func createFindCursor(ctx context.Context, operation *operation) (*cursorResult,
opts.SetSkip(int64(val.Int32()))
case "sort":
opts.SetSort(val.Document())
case "timeoutMode":
return nil, newSkipTestError("timeoutMode is not supported")
case "cursorType":
switch strings.ToLower(val.StringValue()) {
case "tailable":
opts.SetCursorType(options.Tailable)
case "tailableawait":
opts.SetCursorType(options.TailableAwait)
case "nontailable":
opts.SetCursorType(options.NonTailable)
}
case "maxAwaitTimeMS":
maxAwaitTimeMS := time.Duration(val.Int32()) * time.Millisecond
opts.SetMaxAwaitTime(maxAwaitTimeMS)
default:
return nil, fmt.Errorf("unrecognized find option %q", key)
}
Expand Down
19 changes: 19 additions & 0 deletions internal/spectest/skip.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ var skipTests = map[string][]string{
"TestUnifiedSpec/client-side-operations-timeout/tests/retryability-timeoutMS.json/operation_is_retried_multiple_times_for_non-zero_timeoutMS_-_aggregate_on_collection",
"TestUnifiedSpec/client-side-operations-timeout/tests/retryability-timeoutMS.json/operation_is_retried_multiple_times_for_non-zero_timeoutMS_-_aggregate_on_database",
"TestUnifiedSpec/client-side-operations-timeout/tests/gridfs-find.json/timeoutMS_applied_to_find_command",
"TestUnifiedSpec/client-side-operations-timeout/tests/tailable-awaitData.json/timeoutMS_applied_to_find",
"TestUnifiedSpec/client-side-operations-timeout/tests/tailable-awaitData.json/timeoutMS_is_refreshed_for_getMore_if_maxAwaitTimeMS_is_not_set",
"TestUnifiedSpec/client-side-operations-timeout/tests/tailable-awaitData.json/timeoutMS_is_refreshed_for_getMore_if_maxAwaitTimeMS_is_set",
"TestUnifiedSpec/client-side-operations-timeout/tests/tailable-awaitData.json/timeoutMS_is_refreshed_for_getMore_-_failure",
},

// TODO(GODRIVER-3411): Tests require "getMore" with "maxTimeMS" settings. Not
Expand Down Expand Up @@ -819,6 +823,21 @@ var skipTests = map[string][]string{
"TestUnifiedSpec/transactions-convenient-api/tests/unified/transaction-options.json/withTransaction_explicit_transaction_options_override_client_options",
"TestUnifiedSpec/transactions-convenient-api/tests/unified/commit.json/withTransaction_commits_after_callback_returns",
},

// GODRIVER-3473: the implementation of DRIVERS-2868 makes it clear that the
// Go Driver does not correctly implement the following validation for
// tailable awaitData cursors:
//
// Drivers MUST error if this option is set, timeoutMS is set to a
// non-zero value, and maxAwaitTimeMS is greater than or equal to
// timeoutMS.
//
// Once GODRIVER-3473 is completed, we can continue running these tests.
"When constructing tailable awaitData cusors must validate, timeoutMS is set to a non-zero value, and maxAwaitTimeMS is greater than or equal to timeoutMS (GODRIVER-3473)": {
"TestUnifiedSpec/client-side-operations-timeout/tailable-awaitData.json/apply_remaining_timeoutMS_if_less_than_maxAwaitTimeMS",
"TestUnifiedSpec/client-side-operations-timeout/tailable-awaitData.json/error_if_maxAwaitTimeMS_is_equal_to_timeoutMS",
"TestUnifiedSpec/client-side-operations-timeout/tailable-awaitData.json/error_if_maxAwaitTimeMS_is_greater_than_timeoutMS",
},
}

// CheckSkip checks if the fully-qualified test name matches a list of skipped test names for a given reason.
Expand Down
Loading
Loading