From 0037ef5daf272f62598f47df3141b2fe3b699f2d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 11 Aug 2024 13:26:54 +0300 Subject: [PATCH 1/2] resolved conflict Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletmanager/vreplication/vplayer.go | 2 +- .../vreplication/vreplicator_test.go | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 8eee211ff9e..e9a7ecbbdc8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -114,7 +114,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map timeLastSaved: time.Now(), tablePlans: make(map[string]*TablePlan), phase: phase, - throttlerAppName: throttlerapp.VCopierName.ConcatenateString(vr.throttlerAppName()), + throttlerAppName: throttlerapp.VPlayerName.ConcatenateString(vr.throttlerAppName()), } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index 346e6b67eb3..b6ab3c56b4d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/mysqlctl" @@ -743,3 +744,59 @@ func waitForQueryResult(t *testing.T, dbc binlogplayer.DBClient, query, val stri } } } + +func TestThrottlerAppNames(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tablet := addTablet(100) + defer deleteTablet(tablet) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + } + id := int32(1) + vsclient := newTabletConnector(tablet) + stats := binlogplayer.NewStats() + defer stats.Stop() + dbClient := playerEngine.dbClientFactoryFiltered() + err := dbClient.Connect() + require.NoError(t, err) + defer dbClient.Close() + dbName := dbClient.DBName() + // Ensure there's a dummy vreplication workflow record + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, options) values (%d, 'test_workflow', '', '', 99999, 99999, 0, 0, 'Running', '%s', '{}') on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s'", + id, dbName, dbName), 1) + require.NoError(t, err) + defer func() { + _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) + require.NoError(t, err) + }() + vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine) + settings, _, err := vr.loadSettings(ctx, newVDBClient(dbClient, stats)) + require.NoError(t, err) + + throttlerAppName := vr.throttlerAppName() + assert.Contains(t, throttlerAppName, "test_workflow") + assert.Contains(t, throttlerAppName, "vreplication") + assert.NotContains(t, throttlerAppName, "vcopier") + assert.NotContains(t, throttlerAppName, "vplayer") + + vp := newVPlayer(vr, settings, nil, replication.Position{}, "") + assert.Contains(t, vp.throttlerAppName, "test_workflow") + assert.Contains(t, vp.throttlerAppName, "vreplication") + assert.Contains(t, vp.throttlerAppName, "vplayer") + assert.NotContains(t, vp.throttlerAppName, "vcopier") + + vc := newVCopier(vr) + assert.Contains(t, vc.throttlerAppName, "test_workflow") + assert.Contains(t, vc.throttlerAppName, "vreplication") + assert.Contains(t, vc.throttlerAppName, "vcopier") + assert.NotContains(t, vc.throttlerAppName, "vplayer") +} From c2b4c91e18fa29193040c646fb906d1d8bc66f96 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 11 Aug 2024 16:20:26 +0300 Subject: [PATCH 2/2] fix query, adapted for release-18.0 (no 'options' column) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index b6ab3c56b4d..7d08e3146a3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -771,7 +771,7 @@ func TestThrottlerAppNames(t *testing.T) { defer dbClient.Close() dbName := dbClient.DBName() // Ensure there's a dummy vreplication workflow record - _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, options) values (%d, 'test_workflow', '', '', 99999, 99999, 0, 0, 'Running', '%s', '{}') on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s'", + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) values (%d, 'test_workflow', '', '', 99999, 99999, 0, 0, 'Running', '%s') on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s'", id, dbName, dbName), 1) require.NoError(t, err) defer func() {