Skip to content

Commit

Permalink
GODRIVER-1615 Use custom deployment for change streams (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
Divjot Arora committed Jun 3, 2020
1 parent 66f247d commit 012750a
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 8 deletions.
16 changes: 10 additions & 6 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
return cs, cs.Err()
}

func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment {
return &changeStreamDeployment{
topologyKind: cs.client.deployment.Kind(),
server: server,
conn: connection,
}
}

func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
var server driver.Server
var conn driver.Connection
Expand All @@ -176,9 +184,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err

defer conn.Close()

cs.aggregate.Deployment(driver.SingleConnectionDeployment{
C: conn,
})
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))

if resuming {
cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version
Expand Down Expand Up @@ -230,9 +236,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
break
}

cs.aggregate.Deployment(driver.SingleConnectionDeployment{
C: conn,
})
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
cs.err = cs.aggregate.Execute(ctx)
}

Expand Down
49 changes: 49 additions & 0 deletions mongo/change_stream_deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package mongo

import (
"context"

"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
)

type changeStreamDeployment struct {
topologyKind description.TopologyKind
server driver.Server
conn driver.Connection
}

var _ driver.Deployment = (*changeStreamDeployment)(nil)
var _ driver.Server = (*changeStreamDeployment)(nil)
var _ driver.ErrorProcessor = (*changeStreamDeployment)(nil)

func (c *changeStreamDeployment) SelectServer(context.Context, description.ServerSelector) (driver.Server, error) {
return c, nil
}

func (c *changeStreamDeployment) SupportsRetryWrites() bool {
return false
}

func (c *changeStreamDeployment) Kind() description.TopologyKind {
return c.topologyKind
}

func (c *changeStreamDeployment) Connection(context.Context) (driver.Connection, error) {
return c.conn, nil
}

func (c *changeStreamDeployment) ProcessError(err error) {
ep, ok := c.server.(driver.ErrorProcessor)
if !ok {
return
}

ep.ProcessError(err)
}
87 changes: 87 additions & 0 deletions mongo/integration/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
Expand Down Expand Up @@ -515,6 +516,92 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
tryNextGetmoreError(mt, cs)
})
})

customDeploymentClientOpts := options.Client().
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc).
SetReadConcern(mtest.MajorityRc).
SetRetryReads(false)
customDeploymentOpts := mtest.NewOptions().
Topologies(mtest.ReplicaSet). // Avoid complexity of sharded fail points.
MinServerVersion("4.0"). // 4.0 is needed to use replica set fail points.
ClientOptions(customDeploymentClientOpts).
CreateClient(false)
mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) {
// Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather
// than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired
// by ChangeStream when executing an aggregate.

mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) {
clearPoolChan()
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"aggregate"},
CloseConnection: true,
},
})

_, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
assert.NotNil(mt, err, "expected Watch error, got nil")
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
})
mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) {
clearPoolChan()
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"getMore"},
CloseConnection: true,
},
})

cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
assert.Nil(mt, err, "Watch error: %v", err)
defer closeStream(cs)

_, err = mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.Nil(mt, err, "InsertOne error: %v", err)

assert.True(mt, cs.Next(mtest.Background), "expected Next to return true, got false (iteration error %v)",
cs.Err())
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
})
retryAggClientOpts := options.Client().SetRetryReads(true).SetPoolMonitor(poolMonitor)
retryAggMtOpts := mtest.NewOptions().ClientOptions(retryAggClientOpts)
mt.RunOpts("errors are processed for SDAM on retried aggregate", retryAggMtOpts, func(mt *mtest.T) {
clearPoolChan()

mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 2,
},
Data: mtest.FailPointData{
FailCommands: []string{"aggregate"},
CloseConnection: true,
},
})

_, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
assert.NotNil(mt, err, "expected Watch error, got nil")

var numClearedEvents int
for len(poolChan) > 0 {
curr := <-poolChan
if curr.Type == event.PoolCleared {
numClearedEvents++
}
}
assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents)
})
})
}

func closeStream(cs *mongo.ChangeStream) {
Expand Down
6 changes: 4 additions & 2 deletions x/mongo/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ func (SingleServerDeployment) SupportsRetryWrites() bool { return false }
// Kind implements the Deployment interface. It always returns description.Single.
func (SingleServerDeployment) Kind() description.TopologyKind { return description.Single }

// SingleConnectionDeployment is an implementation of Deployment that always returns the same
// Connection.
// SingleConnectionDeployment is an implementation of Deployment that always returns the same Connection. This
// implementation should only be used for connection handshakes and server heartbeats as it does not implement
// ErrorProcessor, which is necessary for application operations and wraps the connection in nopCloserConnection,
// which does not implement Compressor.
type SingleConnectionDeployment struct{ C Connection }

var _ Deployment = SingleConnectionDeployment{}
Expand Down

0 comments on commit 012750a

Please sign in to comment.