Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add vtgate flag that explicitly allows vstream copy #125

Merged
merged 6 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ Usage of vtgate:
Select tcp, tcp4, or tcp6 to control the socket type. (default tcp)
--no_scatter
when set to true, the planner will fail instead of producing a plan that includes scatter queries
--no_vstream_copy
when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this
--normalize_queries
Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars. (default true)
--onclose_timeout duration
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion, false)

return nil
}
Expand Down
31 changes: 18 additions & 13 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Executor struct {

// allowScatter will fail planning if set to false and a plan contains any scatter queries
allowScatter bool
// allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream.
// This is temporary until RDONLYs are properly supported for bootstrapping.
allowVstreamCopy bool
}

var executorOnce sync.Once
Expand All @@ -127,20 +130,22 @@ func NewExecutor(
schemaTracker SchemaInfo,
noScatter bool,
pv plancontext.PlannerVersion,
noVstreamCopy bool,
) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
allowVstreamCopy: !noVstreamCopy,
pv: pv,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1318,7 +1323,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar
return err
}

vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell)
vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell, e.allowVstreamCopy)
vs := &vstream{
vgtid: vgtid,
tabletType: topodatapb.TabletType_PRIMARY,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
bad.VSchema = badVSchema

getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
// create a new session each time so that ShardSessions don't get re-used across tests
Expand All @@ -493,7 +493,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
// create a new session each time so that ShardSessions don't get re-used across tests
primarySession = &vtgatepb.Session{
TargetString: "@primary",
Expand Down Expand Up @@ -522,7 +522,7 @@ func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (e
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
// create a new session each time so that ShardSessions don't get re-used across tests
primarySession = &vtgatepb.Session{
TargetString: "@primary",
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func TestStreamSelectIN(t *testing.T) {
}

func createExecutor(serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
}

func TestSelectScatter(t *testing.T) {
Expand Down Expand Up @@ -2981,7 +2981,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
count++
}

executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
before := runtime.NumGoroutine()

query := "select id, col from user order by id limit 2"
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestStreamSQLSharded(t *testing.T) {
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)

sql := "stream * from sharded_user_msgs"
result, err := executorStreamMessages(executor, sql)
Expand Down
19 changes: 15 additions & 4 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"

Expand All @@ -47,6 +48,9 @@ type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string
// allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream.
// This is temporary until RDONLYs are properly supported for bootstrapping.
allowVstreamCopy bool

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
Expand Down Expand Up @@ -119,12 +123,13 @@ type journalEvent struct {
done chan struct{}
}

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")
return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
resolver: resolver,
toposerv: serv,
cell: cell,
allowVstreamCopy: allowVstreamCopy,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
Expand Down Expand Up @@ -540,6 +545,12 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
log.Infof("Starting to vstream from %s", tablet.Alias.String())
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
var vstreamCreatedOnce sync.Once

if !vs.vsm.allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verifying whether this condition is sufficient for identifying a vcopy

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// We are attempting a vstream copy, but are not allowed (temporary until we can properly support RDONLYs for bootstrapping)
return vterrors.NewErrorf(vtrpc.Code_UNIMPLEMENTED, vterrors.NotSupportedYet, "vstream copy is not currently supported")
}

err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
Expand Down
82 changes: 66 additions & 16 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestVStreamSkew(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}}
want := int64(0)
var sbc0, sbc1 *sandboxconn.SandboxConn
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestVStreamEvents(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})

vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -213,7 +213,7 @@ func TestVStreamChunks(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestVStreamManagerGetCells(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
ts, _ := st.GetTopoServer()

for _, tcase := range tcases {
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestVStreamMulti(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestVStreamRetry(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -561,7 +561,7 @@ func TestVStreamJournalOneToMany(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestVStreamJournalManyToOne(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -791,7 +791,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -920,7 +920,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet())
sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func TestResolveVStreamParams(t *testing.T) {
name := "TestVStream"
_ = createSandbox(name)
hc := discovery.NewFakeHealthCheck(nil)
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa", true)
testcases := []struct {
input *binlogdatapb.VGtid
output *binlogdatapb.VGtid
Expand Down Expand Up @@ -1146,7 +1146,7 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
vgtid := &binlogdatapb.VGtid{
Expand Down Expand Up @@ -1195,10 +1195,60 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
}
}

func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
func TestVstreamCopy(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cell := "aa"
ks := "TestVStreamCopy"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_COMMIT},
}
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa"))
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error"))
var count sync2.AtomicInt32
count.Set(0)
// empty gtid id means no start position = bootstrapping/vstream copy
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "",
}},
}

// allowVstreamCopy = false
vsm := newTestVStreamManager(hc, st, "aa", false)
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
require.Error(t, err)
require.Equal(t, err.Error(), "vstream copy is not currently supported")

// allowVstreamCopy = true
vsm2 := newTestVStreamManager(hc, st, "aa", true)
err = vsm2.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
require.Equal(t, err.Error(), "final error")
}

func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
gw := NewTabletGateway(context.Background(), hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
return newVStreamManager(srvResolver, serv, cell)
return newVStreamManager(srvResolver, serv, cell, allowVstreamCopy)
}

func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid, flags *vtgatepb.VStreamFlags) <-chan *binlogdatapb.VStreamResponse {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var (
defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable")
dbDDLPlugin = flag.String("dbddl_plugin", "fail", "controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service")
noScatter = flag.Bool("no_scatter", false, "when set to true, the planner will fail instead of producing a plan that includes scatter queries")
noVstreamCopy = flag.Bool("no_vstream_copy", false, "when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this")

// TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed

Expand Down Expand Up @@ -210,7 +211,7 @@ func Init(
sc := NewScatterConn("VttabletCall", tc, gw)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
resolver := NewResolver(srvResolver, serv, cell, sc)
vsm := newVStreamManager(srvResolver, serv, cell)
vsm := newVStreamManager(srvResolver, serv, cell, !*noVstreamCopy)

var si SchemaInfo // default nil
var st *vtschema.Tracker
Expand Down Expand Up @@ -238,6 +239,7 @@ func Init(
si,
*noScatter,
pv,
*noVstreamCopy,
)

// connect the schema tracker with the vschema manager
Expand Down
Loading
Loading