From 9b74943ed0c9c15a6d4de47b7c02953d35c73a64 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 19 Feb 2021 13:16:04 +0100 Subject: [PATCH 1/4] add innodb_read_rows as vttablet metric Signed-off-by: Andres Taylor --- .../tabletserver/query_engine_test.go | 26 ++++++++++++----- .../tabletserver/query_executor_test.go | 5 ++++ go/vt/vttablet/tabletserver/schema/engine.go | 28 +++++++++++++++++-- .../tabletserver/schema/engine_test.go | 19 +++++++++++++ .../vttablet/tabletserver/schema/main_test.go | 1 + 5 files changed, 70 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index 9d4803b0f2d..7c075864975 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -32,10 +32,11 @@ import ( "testing" "time" + "vitess.io/vitess/go/mysql" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/cache" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/streamlog" @@ -114,12 +115,7 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) { } } -func TestGetMessageStreamPlan(t *testing.T) { - db := fakesqldb.New(t) - defer db.Close() - for query, result := range schematest.Queries() { - db.AddQuery(query, result) - } +func addSchemaEngineQueries(db *fakesqldb.DB) { db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{ Fields: mysql.BaseShowTablesFields, Rows: [][]sqltypes.Value{ @@ -129,6 +125,22 @@ func TestGetMessageStreamPlan(t *testing.T) { mysql.BaseShowTablesRow("seq", false, "vitess_sequence"), mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), }}) + db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Variable_name|Value", + "varchar|int64"), + "Innodb_rows_read|0", + )) +} + +func TestGetMessageStreamPlan(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + for query, result := range schematest.Queries() { + db.AddQuery(query, result) + } + + addSchemaEngineQueries(db) + qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) qe.se.Open() qe.Open() diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index a6c5996230e..56e936b8f8b 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1175,6 +1175,11 @@ func initQueryExecutorTestDB(db *fakesqldb.DB) { mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), }, }) + db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Variable_name|Value", + "varchar|int64"), + "Innodb_rows_read|0", + )) } func getTestTableFields() []*querypb.Field { diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 631b8b8f82a..5704f7f7124 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -80,6 +80,7 @@ type Engine struct { tableFileSizeGauge *stats.GaugesWithSingleLabel tableAllocatedSizeGauge *stats.GaugesWithSingleLabel + innoDbReadRowsGauge *stats.GaugesWithSingleLabel } // NewEngine creates a new Engine. @@ -99,6 +100,7 @@ func NewEngine(env tabletenv.Env) *Engine { _ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval) se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table") se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table") + se.innoDbReadRowsGauge = env.Exporter().NewGaugesWithSingleLabel("InnodbRowsRead", "number of rows read by mysql", "Database") env.Exporter().HandleFunc("/debug/schema", se.handleDebugSchema) env.Exporter().HandleFunc("/schemaz", func(w http.ResponseWriter, r *http.Request) { @@ -290,9 +292,7 @@ func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error { // reload reloads the schema. It can also be used to initialize it. func (se *Engine) reload(ctx context.Context) error { - //start := time.Now() defer func() { - //log.Infof("Time taken to load the schema: %v", time.Since(start)) se.env.LogError() }() @@ -316,6 +316,11 @@ func (se *Engine) reload(ctx context.Context) error { return err } + err = se.updateInnoDBRowsRead(ctx, conn) + if err != nil { + return err + } + rec := concurrency.AllErrorRecorder{} // curTables keeps track of tables in the new snapshot so we can detect what was dropped. curTables := map[string]bool{"dual": true} @@ -388,6 +393,25 @@ func (se *Engine) reload(ctx context.Context) error { return nil } +func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.DBConn) error { + readRowsData, err := conn.Exec(ctx, "show status like 'Innodb_rows_read'", 10, false) + if err != nil { + return err + } + + if !(len(readRowsData.Rows) == 1 && len(readRowsData.Rows[0]) == 2) { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "got bad output from query, expected one row, two columns") + } + + value, err := evalengine.ToInt64(readRowsData.Rows[0][1]) + if err != nil { + return err + } + + se.innoDbReadRowsGauge.Set("read_rows", value) + return nil +} + func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.DBConn) (int64, error) { // Keep `SELECT UNIX_TIMESTAMP` is in uppercase because binlog server queries are case sensitive and expect it to be so. tm, err := conn.Exec(ctx, "SELECT UNIX_TIMESTAMP()", 1, false) diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 89382a7d7f4..47521f03f96 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -18,6 +18,7 @@ package schema import ( "expvar" + "fmt" "net/http" "net/http/httptest" "sort" @@ -80,6 +81,8 @@ func TestOpenAndReload(t *testing.T) { "int64"), "1427325876", )) + firstReadRowsValue := 12 + AddFakeInnoDBReadRowsResult(db, firstReadRowsValue) se := newEngine(10, 10*time.Second, 10*time.Second, db) se.Open() defer se.Close() @@ -93,6 +96,8 @@ func TestOpenAndReload(t *testing.T) { "int64"), "1427325877", )) + assert.Contains(t, se.innoDbReadRowsGauge.Counts(), "read_rows") + assert.EqualValues(t, firstReadRowsValue, se.innoDbReadRowsGauge.Counts()["read_rows"]) // Modify test_table_03 // Add test_table_04 // Drop msg @@ -144,6 +149,8 @@ func TestOpenAndReload(t *testing.T) { mysql.ShowPrimaryRow("seq", "id"), }, }) + secondReadRowsValue := 123 + AddFakeInnoDBReadRowsResult(db, secondReadRowsValue) firstTime := true notifier := func(full map[string]*Table, created, altered, dropped []string) { @@ -164,6 +171,9 @@ func TestOpenAndReload(t *testing.T) { err := se.Reload(context.Background()) require.NoError(t, err) + assert.Contains(t, se.innoDbReadRowsGauge.Counts(), "read_rows") + assert.EqualValues(t, secondReadRowsValue, se.innoDbReadRowsGauge.Counts()["read_rows"]) + want["test_table_03"] = &Table{ Name: sqlparser.NewTableIdent("test_table_03"), Fields: []*querypb.Field{{ @@ -338,6 +348,7 @@ func TestOpenFailedDueToTableErr(t *testing.T) { {sqltypes.NewVarBinary("")}, }, }) + AddFakeInnoDBReadRowsResult(db, 0) se := newEngine(10, 1*time.Second, 1*time.Second, db) err := se.Open() want := "Row count exceeded" @@ -492,3 +503,11 @@ func initialSchema() map[string]*Table { }, } } + +func AddFakeInnoDBReadRowsResult(db *fakesqldb.DB, value int) *fakesqldb.ExpectedResult { + return db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Variable_name|Value", + "varchar|int64"), + fmt.Sprintf("Innodb_rows_read|%d", value), + )) +} diff --git a/go/vt/vttablet/tabletserver/schema/main_test.go b/go/vt/vttablet/tabletserver/schema/main_test.go index 19d03dd809f..ada5c8085a1 100644 --- a/go/vt/vttablet/tabletserver/schema/main_test.go +++ b/go/vt/vttablet/tabletserver/schema/main_test.go @@ -36,6 +36,7 @@ func getTestSchemaEngine(t *testing.T) (*Engine, *fakesqldb.DB, func()) { )) db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{}) db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{}) + AddFakeInnoDBReadRowsResult(db, 1) se := newEngine(10, 10*time.Second, 10*time.Second, db) require.NoError(t, se.Open()) cancel := func() { From 58749436b1ace8fadcc77799d76f8500b9dd6a40 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 19 Feb 2021 19:11:41 +0100 Subject: [PATCH 2/4] test fixes Signed-off-by: Andres Taylor --- go/vt/vttablet/tabletserver/tabletserver_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index f870de335e0..99304c36335 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -2465,6 +2465,11 @@ func setupFakeDB(t *testing.T) *fakesqldb.DB { mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), }, }) + db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Variable_name|Value", + "varchar|int64"), + "Innodb_rows_read|0", + )) return db } From cbc6056debbd5836f8176d714ac9f174f2445fd3 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 19 Feb 2021 19:28:35 +0100 Subject: [PATCH 3/4] make tests more resilient Signed-off-by: Andres Taylor --- go/vt/vttablet/tabletserver/schema/engine.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 5704f7f7124..57dbfa326eb 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -399,16 +399,14 @@ func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.DBCon return err } - if !(len(readRowsData.Rows) == 1 && len(readRowsData.Rows[0]) == 2) { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "got bad output from query, expected one row, two columns") - } + if len(readRowsData.Rows) == 1 && len(readRowsData.Rows[0]) == 2 { + value, err := evalengine.ToInt64(readRowsData.Rows[0][1]) + if err != nil { + return err + } - value, err := evalengine.ToInt64(readRowsData.Rows[0][1]) - if err != nil { - return err + se.innoDbReadRowsGauge.Set("read_rows", value) } - - se.innoDbReadRowsGauge.Set("read_rows", value) return nil } From 891ec0a8cc8a48a3724c92adfef6bb46cab7da70 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 22 Feb 2021 06:28:34 +0100 Subject: [PATCH 4/4] use gauge instead of single label gauge Signed-off-by: Andres Taylor --- go/vt/vttablet/tabletserver/schema/engine.go | 8 +++++--- go/vt/vttablet/tabletserver/schema/engine_test.go | 6 ++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 57dbfa326eb..22a27b19e16 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -80,7 +80,7 @@ type Engine struct { tableFileSizeGauge *stats.GaugesWithSingleLabel tableAllocatedSizeGauge *stats.GaugesWithSingleLabel - innoDbReadRowsGauge *stats.GaugesWithSingleLabel + innoDbReadRowsGauge *stats.Gauge } // NewEngine creates a new Engine. @@ -100,7 +100,7 @@ func NewEngine(env tabletenv.Env) *Engine { _ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval) se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table") se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table") - se.innoDbReadRowsGauge = env.Exporter().NewGaugesWithSingleLabel("InnodbRowsRead", "number of rows read by mysql", "Database") + se.innoDbReadRowsGauge = env.Exporter().NewGauge("InnodbRowsRead", "number of rows read by mysql") env.Exporter().HandleFunc("/debug/schema", se.handleDebugSchema) env.Exporter().HandleFunc("/schemaz", func(w http.ResponseWriter, r *http.Request) { @@ -405,7 +405,9 @@ func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.DBCon return err } - se.innoDbReadRowsGauge.Set("read_rows", value) + se.innoDbReadRowsGauge.Set(value) + } else { + log.Warningf("got strange results from 'show status': %v", readRowsData.Rows) } return nil } diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 47521f03f96..5a0d908436b 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -96,8 +96,7 @@ func TestOpenAndReload(t *testing.T) { "int64"), "1427325877", )) - assert.Contains(t, se.innoDbReadRowsGauge.Counts(), "read_rows") - assert.EqualValues(t, firstReadRowsValue, se.innoDbReadRowsGauge.Counts()["read_rows"]) + assert.EqualValues(t, firstReadRowsValue, se.innoDbReadRowsGauge.Get()) // Modify test_table_03 // Add test_table_04 // Drop msg @@ -171,8 +170,7 @@ func TestOpenAndReload(t *testing.T) { err := se.Reload(context.Background()) require.NoError(t, err) - assert.Contains(t, se.innoDbReadRowsGauge.Counts(), "read_rows") - assert.EqualValues(t, secondReadRowsValue, se.innoDbReadRowsGauge.Counts()["read_rows"]) + assert.EqualValues(t, secondReadRowsValue, se.innoDbReadRowsGauge.Get()) want["test_table_03"] = &Table{ Name: sqlparser.NewTableIdent("test_table_03"),