diff --git a/mongo/change_stream.go b/mongo/change_stream.go index 23a3538232..df2bbb0362 100644 --- a/mongo/change_stream.go +++ b/mongo/change_stream.go @@ -162,6 +162,14 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in return cs, cs.Err() } +func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment { + return &changeStreamDeployment{ + topologyKind: cs.client.deployment.Kind(), + server: server, + conn: connection, + } +} + func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error { var server driver.Server var conn driver.Connection @@ -176,9 +184,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err defer conn.Close() - cs.aggregate.Deployment(driver.SingleConnectionDeployment{ - C: conn, - }) + cs.aggregate.Deployment(cs.createOperationDeployment(server, conn)) if resuming { cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version @@ -230,9 +236,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err break } - cs.aggregate.Deployment(driver.SingleConnectionDeployment{ - C: conn, - }) + cs.aggregate.Deployment(cs.createOperationDeployment(server, conn)) cs.err = cs.aggregate.Execute(ctx) } diff --git a/mongo/change_stream_deployment.go b/mongo/change_stream_deployment.go new file mode 100644 index 0000000000..d36c0d387f --- /dev/null +++ b/mongo/change_stream_deployment.go @@ -0,0 +1,49 @@ +// Copyright (C) MongoDB, Inc. 2017-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package mongo + +import ( + "context" + + "go.mongodb.org/mongo-driver/x/mongo/driver" + "go.mongodb.org/mongo-driver/x/mongo/driver/description" +) + +type changeStreamDeployment struct { + topologyKind description.TopologyKind + server driver.Server + conn driver.Connection +} + +var _ driver.Deployment = (*changeStreamDeployment)(nil) +var _ driver.Server = (*changeStreamDeployment)(nil) +var _ driver.ErrorProcessor = (*changeStreamDeployment)(nil) + +func (c *changeStreamDeployment) SelectServer(context.Context, description.ServerSelector) (driver.Server, error) { + return c, nil +} + +func (c *changeStreamDeployment) SupportsRetryWrites() bool { + return false +} + +func (c *changeStreamDeployment) Kind() description.TopologyKind { + return c.topologyKind +} + +func (c *changeStreamDeployment) Connection(context.Context) (driver.Connection, error) { + return c.conn, nil +} + +func (c *changeStreamDeployment) ProcessError(err error) { + ep, ok := c.server.(driver.ErrorProcessor) + if !ok { + return + } + + ep.ProcessError(err) +} diff --git a/mongo/integration/change_stream_test.go b/mongo/integration/change_stream_test.go index 716128ca59..355586df26 100644 --- a/mongo/integration/change_stream_test.go +++ b/mongo/integration/change_stream_test.go @@ -12,6 +12,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/event" "go.mongodb.org/mongo-driver/internal/testutil/assert" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/integration/mtest" @@ -515,6 +516,92 @@ func TestChangeStream_ReplicaSet(t *testing.T) { tryNextGetmoreError(mt, cs) }) }) + + customDeploymentClientOpts := options.Client(). + SetPoolMonitor(poolMonitor). + SetWriteConcern(mtest.MajorityWc). + SetReadConcern(mtest.MajorityRc). + SetRetryReads(false) + customDeploymentOpts := mtest.NewOptions(). + Topologies(mtest.ReplicaSet). // Avoid complexity of sharded fail points. + MinServerVersion("4.0"). // 4.0 is needed to use replica set fail points. + ClientOptions(customDeploymentClientOpts). + CreateClient(false) + mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) { + // Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather + // than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired + // by ChangeStream when executing an aggregate. + + mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) { + clearPoolChan() + mt.SetFailPoint(mtest.FailPoint{ + ConfigureFailPoint: "failCommand", + Mode: mtest.FailPointMode{ + Times: 1, + }, + Data: mtest.FailPointData{ + FailCommands: []string{"aggregate"}, + CloseConnection: true, + }, + }) + + _, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{}) + assert.NotNil(mt, err, "expected Watch error, got nil") + assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not") + }) + mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) { + clearPoolChan() + mt.SetFailPoint(mtest.FailPoint{ + ConfigureFailPoint: "failCommand", + Mode: mtest.FailPointMode{ + Times: 1, + }, + Data: mtest.FailPointData{ + FailCommands: []string{"getMore"}, + CloseConnection: true, + }, + }) + + cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{}) + assert.Nil(mt, err, "Watch error: %v", err) + defer closeStream(cs) + + _, err = mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}}) + assert.Nil(mt, err, "InsertOne error: %v", err) + + assert.True(mt, cs.Next(mtest.Background), "expected Next to return true, got false (iteration error %v)", + cs.Err()) + assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not") + }) + retryAggClientOpts := options.Client().SetRetryReads(true).SetPoolMonitor(poolMonitor) + retryAggMtOpts := mtest.NewOptions().ClientOptions(retryAggClientOpts) + mt.RunOpts("errors are processed for SDAM on retried aggregate", retryAggMtOpts, func(mt *mtest.T) { + clearPoolChan() + + mt.SetFailPoint(mtest.FailPoint{ + ConfigureFailPoint: "failCommand", + Mode: mtest.FailPointMode{ + Times: 2, + }, + Data: mtest.FailPointData{ + FailCommands: []string{"aggregate"}, + CloseConnection: true, + }, + }) + + _, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{}) + assert.NotNil(mt, err, "expected Watch error, got nil") + + var numClearedEvents int + for len(poolChan) > 0 { + curr := <-poolChan + if curr.Type == event.PoolCleared { + numClearedEvents++ + } + } + assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents) + }) + }) } func closeStream(cs *mongo.ChangeStream) { diff --git a/x/mongo/driver/driver.go b/x/mongo/driver/driver.go index 41ae11dab9..c4f1224293 100644 --- a/x/mongo/driver/driver.go +++ b/x/mongo/driver/driver.go @@ -105,8 +105,10 @@ func (SingleServerDeployment) SupportsRetryWrites() bool { return false } // Kind implements the Deployment interface. It always returns description.Single. func (SingleServerDeployment) Kind() description.TopologyKind { return description.Single } -// SingleConnectionDeployment is an implementation of Deployment that always returns the same -// Connection. +// SingleConnectionDeployment is an implementation of Deployment that always returns the same Connection. This +// implementation should only be used for connection handshakes and server heartbeats as it does not implement +// ErrorProcessor, which is necessary for application operations and wraps the connection in nopCloserConnection, +// which does not implement Compressor. type SingleConnectionDeployment struct{ C Connection } var _ Deployment = SingleConnectionDeployment{}