diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 4e66e126e77..95e015c91dd 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -239,6 +239,8 @@ Usage of vtgate: Select tcp, tcp4, or tcp6 to control the socket type. (default tcp) --no_scatter when set to true, the planner will fail instead of producing a plan that includes scatter queries + --no_vstream_copy + when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this --normalize_queries Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars. (default true) --onclose_timeout duration diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 57cc04ad69d..c065c41e4eb 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -71,7 +71,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts streamSize := 10 var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests - vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion) + vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion, false) return nil } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index a6631966f7b..c1ae7e209bb 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -107,6 +107,9 @@ type Executor struct { // allowScatter will fail planning if set to false and a plan contains any scatter queries allowScatter bool + // allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream. + // This is temporary until RDONLYs are properly supported for bootstrapping. + allowVstreamCopy bool } var executorOnce sync.Once @@ -127,20 +130,22 @@ func NewExecutor( schemaTracker SchemaInfo, noScatter bool, pv plancontext.PlannerVersion, + noVstreamCopy bool, ) *Executor { e := &Executor{ - serv: serv, - cell: cell, - resolver: resolver, - scatterConn: resolver.scatterConn, - txConn: resolver.scatterConn.txConn, - plans: cache.NewDefaultCacheImpl(cacheCfg), - normalize: normalize, - warnShardedOnly: warnOnShardedOnly, - streamSize: streamSize, - schemaTracker: schemaTracker, - allowScatter: !noScatter, - pv: pv, + serv: serv, + cell: cell, + resolver: resolver, + scatterConn: resolver.scatterConn, + txConn: resolver.scatterConn.txConn, + plans: cache.NewDefaultCacheImpl(cacheCfg), + normalize: normalize, + warnShardedOnly: warnOnShardedOnly, + streamSize: streamSize, + schemaTracker: schemaTracker, + allowScatter: !noScatter, + allowVstreamCopy: !noVstreamCopy, + pv: pv, } vschemaacl.Init() @@ -1318,7 +1323,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar return err } - vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell) + vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell, e.allowVstreamCopy) vs := &vstream{ vgtid: vgtid, tabletType: topodatapb.TabletType_PRIMARY, diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 04f64cab47c..00b3161cb5c 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -469,7 +469,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn bad.VSchema = badVSchema getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} // create a new session each time so that ShardSessions don't get re-used across tests @@ -493,7 +493,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) // create a new session each time so that ShardSessions don't get re-used across tests primarySession = &vtgatepb.Session{ TargetString: "@primary", @@ -522,7 +522,7 @@ func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (e sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) // create a new session each time so that ShardSessions don't get re-used across tests primarySession = &vtgatepb.Session{ TargetString: "@primary", diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 3f9224fe003..535dc39f8a5 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1483,7 +1483,7 @@ func TestStreamSelectIN(t *testing.T) { } func createExecutor(serv *sandboxTopo, cell string, resolver *Resolver) *Executor { - return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) } func TestSelectScatter(t *testing.T) { @@ -2981,7 +2981,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { count++ } - executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) before := runtime.NumGoroutine() query := "select id, col from user order by id limit 2" diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index 8fea4ed985f..ee3038972c3 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -61,7 +61,7 @@ func TestStreamSQLSharded(t *testing.T) { for _, shard := range shards { _ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil) } - executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) sql := "stream * from sharded_user_msgs" result, err := executorStreamMessages(executor, sql) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 8563ef0f4f1..8c6dd9f04f4 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" @@ -47,6 +48,9 @@ type vstreamManager struct { resolver *srvtopo.Resolver toposerv srvtopo.Server cell string + // allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream. + // This is temporary until RDONLYs are properly supported for bootstrapping. + allowVstreamCopy bool vstreamsCreated *stats.CountersWithMultiLabels vstreamsLag *stats.GaugesWithMultiLabels @@ -119,12 +123,13 @@ type journalEvent struct { done chan struct{} } -func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager { +func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") return &vstreamManager{ - resolver: resolver, - toposerv: serv, - cell: cell, + resolver: resolver, + toposerv: serv, + cell: cell, + allowVstreamCopy: allowVstreamCopy, vstreamsCreated: exporter.NewCountersWithMultiLabels( "VStreamsCreated", "Number of vstreams created", @@ -540,6 +545,12 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha log.Infof("Starting to vstream from %s", tablet.Alias.String()) // Safe to access sgtid.Gtid here (because it can't change until streaming begins). var vstreamCreatedOnce sync.Once + + if !vs.vsm.allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) { + // We are attempting a vstream copy, but are not allowed (temporary until we can properly support RDONLYs for bootstrapping) + return vterrors.NewErrorf(vtrpc.Code_UNIMPLEMENTED, vterrors.NotSupportedYet, "vstream copy is not currently supported") + } + err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 241ed3280d4..56586cf6fef 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -90,7 +90,7 @@ func TestVStreamSkew(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}} want := int64(0) var sbc0, sbc1 *sandboxconn.SandboxConn @@ -136,7 +136,7 @@ func TestVStreamEvents(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -213,7 +213,7 @@ func TestVStreamChunks(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -298,7 +298,7 @@ func TestVStreamManagerGetCells(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) ts, _ := st.GetTopoServer() for _, tcase := range tcases { @@ -353,7 +353,7 @@ func TestVStreamMulti(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -415,7 +415,7 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -470,7 +470,7 @@ func TestVStreamRetry(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ @@ -511,7 +511,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -561,7 +561,7 @@ func TestVStreamJournalOneToMany(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -674,7 +674,7 @@ func TestVStreamJournalManyToOne(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -791,7 +791,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -920,7 +920,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet()) sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -1000,7 +1000,7 @@ func TestResolveVStreamParams(t *testing.T) { name := "TestVStream" _ = createSandbox(name) hc := discovery.NewFakeHealthCheck(nil) - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") + vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa", true) testcases := []struct { input *binlogdatapb.VGtid output *binlogdatapb.VGtid @@ -1146,7 +1146,7 @@ func TestVStreamIdleHeartbeat(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) vgtid := &binlogdatapb.VGtid{ @@ -1195,10 +1195,60 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } -func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { +func TestVstreamCopy(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cell := "aa" + ks := "TestVStreamCopy" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + + st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + commit := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_COMMIT}, + } + sbc0.AddVStreamEvents(commit, nil) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa")) + sbc0.AddVStreamEvents(commit, nil) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb")) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) + var count sync2.AtomicInt32 + count.Set(0) + // empty gtid id means no start position = bootstrapping/vstream copy + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "", + }}, + } + + // allowVstreamCopy = false + vsm := newTestVStreamManager(hc, st, "aa", false) + err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + count.Add(1) + return nil + }) + require.Error(t, err) + require.Equal(t, err.Error(), "vstream copy is not currently supported") + + // allowVstreamCopy = true + vsm2 := newTestVStreamManager(hc, st, "aa", true) + err = vsm2.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + count.Add(1) + return nil + }) + require.Equal(t, err.Error(), "final error") +} + +func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { gw := NewTabletGateway(context.Background(), hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) - return newVStreamManager(srvResolver, serv, cell) + return newVStreamManager(srvResolver, serv, cell, allowVstreamCopy) } func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid, flags *vtgatepb.VStreamFlags) <-chan *binlogdatapb.VStreamResponse { diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 084d3a149c2..23bdf94d90d 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -71,6 +71,7 @@ var ( defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable") dbDDLPlugin = flag.String("dbddl_plugin", "fail", "controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service") noScatter = flag.Bool("no_scatter", false, "when set to true, the planner will fail instead of producing a plan that includes scatter queries") + noVstreamCopy = flag.Bool("no_vstream_copy", false, "when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this") // TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed @@ -210,7 +211,7 @@ func Init( sc := NewScatterConn("VttabletCall", tc, gw) srvResolver := srvtopo.NewResolver(serv, gw, cell) resolver := NewResolver(srvResolver, serv, cell, sc) - vsm := newVStreamManager(srvResolver, serv, cell) + vsm := newVStreamManager(srvResolver, serv, cell, !*noVstreamCopy) var si SchemaInfo // default nil var st *vtschema.Tracker @@ -238,6 +239,7 @@ func Init( si, *noScatter, pv, + *noVstreamCopy, ) // connect the schema tracker with the vschema manager diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 46af517ec29..2590300f7c2 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -124,8 +124,9 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se // buildTablePlan identifies the tables for the copy phase and creates the plans which consist of the lastPK seen // for a table and its Rule (for filtering purposes by the vstreamer engine) // it can be called -// the first time, with just the filter and an empty pos -// during a restart, with both the filter and list of TableLastPK from the vgtid +// +// the first time, with just the filter and an empty pos +// during a restart, with both the filter and list of TableLastPK from the vgtid func (uvs *uvstreamer) buildTablePlan() error { uvs.plans = make(map[string]*tablePlan) tableLastPKs := make(map[string]*binlogdatapb.TableLastPK)