diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index 9d4803b0f2d..7c075864975 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -32,10 +32,11 @@ import ( "testing" "time" + "vitess.io/vitess/go/mysql" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/cache" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/streamlog" @@ -114,12 +115,7 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) { } } -func TestGetMessageStreamPlan(t *testing.T) { - db := fakesqldb.New(t) - defer db.Close() - for query, result := range schematest.Queries() { - db.AddQuery(query, result) - } +func addSchemaEngineQueries(db *fakesqldb.DB) { db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{ Fields: mysql.BaseShowTablesFields, Rows: [][]sqltypes.Value{ @@ -129,6 +125,22 @@ func TestGetMessageStreamPlan(t *testing.T) { mysql.BaseShowTablesRow("seq", false, "vitess_sequence"), mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), }}) + db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Variable_name|Value", + "varchar|int64"), + "Innodb_rows_read|0", + )) +} + +func TestGetMessageStreamPlan(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + for query, result := range schematest.Queries() { + db.AddQuery(query, result) + } + + addSchemaEngineQueries(db) + qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) qe.se.Open() qe.Open() diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index a6c5996230e..56e936b8f8b 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1175,6 +1175,11 @@ func initQueryExecutorTestDB(db *fakesqldb.DB) { mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), }, }) + db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Variable_name|Value", + "varchar|int64"), + "Innodb_rows_read|0", + )) } func getTestTableFields() []*querypb.Field { diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 631b8b8f82a..22a27b19e16 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -80,6 +80,7 @@ type Engine struct { tableFileSizeGauge *stats.GaugesWithSingleLabel tableAllocatedSizeGauge *stats.GaugesWithSingleLabel + innoDbReadRowsGauge *stats.Gauge } // NewEngine creates a new Engine. @@ -99,6 +100,7 @@ func NewEngine(env tabletenv.Env) *Engine { _ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval) se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table") se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table") + se.innoDbReadRowsGauge = env.Exporter().NewGauge("InnodbRowsRead", "number of rows read by mysql") env.Exporter().HandleFunc("/debug/schema", se.handleDebugSchema) env.Exporter().HandleFunc("/schemaz", func(w http.ResponseWriter, r *http.Request) { @@ -290,9 +292,7 @@ func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error { // reload reloads the schema. It can also be used to initialize it. func (se *Engine) reload(ctx context.Context) error { - //start := time.Now() defer func() { - //log.Infof("Time taken to load the schema: %v", time.Since(start)) se.env.LogError() }() @@ -316,6 +316,11 @@ func (se *Engine) reload(ctx context.Context) error { return err } + err = se.updateInnoDBRowsRead(ctx, conn) + if err != nil { + return err + } + rec := concurrency.AllErrorRecorder{} // curTables keeps track of tables in the new snapshot so we can detect what was dropped. curTables := map[string]bool{"dual": true} @@ -388,6 +393,25 @@ func (se *Engine) reload(ctx context.Context) error { return nil } +func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.DBConn) error { + readRowsData, err := conn.Exec(ctx, "show status like 'Innodb_rows_read'", 10, false) + if err != nil { + return err + } + + if len(readRowsData.Rows) == 1 && len(readRowsData.Rows[0]) == 2 { + value, err := evalengine.ToInt64(readRowsData.Rows[0][1]) + if err != nil { + return err + } + + se.innoDbReadRowsGauge.Set(value) + } else { + log.Warningf("got strange results from 'show status': %v", readRowsData.Rows) + } + return nil +} + func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.DBConn) (int64, error) { // Keep `SELECT UNIX_TIMESTAMP` is in uppercase because binlog server queries are case sensitive and expect it to be so. tm, err := conn.Exec(ctx, "SELECT UNIX_TIMESTAMP()", 1, false) diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 89382a7d7f4..5a0d908436b 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -18,6 +18,7 @@ package schema import ( "expvar" + "fmt" "net/http" "net/http/httptest" "sort" @@ -80,6 +81,8 @@ func TestOpenAndReload(t *testing.T) { "int64"), "1427325876", )) + firstReadRowsValue := 12 + AddFakeInnoDBReadRowsResult(db, firstReadRowsValue) se := newEngine(10, 10*time.Second, 10*time.Second, db) se.Open() defer se.Close() @@ -93,6 +96,7 @@ func TestOpenAndReload(t *testing.T) { "int64"), "1427325877", )) + assert.EqualValues(t, firstReadRowsValue, se.innoDbReadRowsGauge.Get()) // Modify test_table_03 // Add test_table_04 // Drop msg @@ -144,6 +148,8 @@ func TestOpenAndReload(t *testing.T) { mysql.ShowPrimaryRow("seq", "id"), }, }) + secondReadRowsValue := 123 + AddFakeInnoDBReadRowsResult(db, secondReadRowsValue) firstTime := true notifier := func(full map[string]*Table, created, altered, dropped []string) { @@ -164,6 +170,8 @@ func TestOpenAndReload(t *testing.T) { err := se.Reload(context.Background()) require.NoError(t, err) + assert.EqualValues(t, secondReadRowsValue, se.innoDbReadRowsGauge.Get()) + want["test_table_03"] = &Table{ Name: sqlparser.NewTableIdent("test_table_03"), Fields: []*querypb.Field{{ @@ -338,6 +346,7 @@ func TestOpenFailedDueToTableErr(t *testing.T) { {sqltypes.NewVarBinary("")}, }, }) + AddFakeInnoDBReadRowsResult(db, 0) se := newEngine(10, 1*time.Second, 1*time.Second, db) err := se.Open() want := "Row count exceeded" @@ -492,3 +501,11 @@ func initialSchema() map[string]*Table { }, } } + +func AddFakeInnoDBReadRowsResult(db *fakesqldb.DB, value int) *fakesqldb.ExpectedResult { + return db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Variable_name|Value", + "varchar|int64"), + fmt.Sprintf("Innodb_rows_read|%d", value), + )) +} diff --git a/go/vt/vttablet/tabletserver/schema/main_test.go b/go/vt/vttablet/tabletserver/schema/main_test.go index 19d03dd809f..ada5c8085a1 100644 --- a/go/vt/vttablet/tabletserver/schema/main_test.go +++ b/go/vt/vttablet/tabletserver/schema/main_test.go @@ -36,6 +36,7 @@ func getTestSchemaEngine(t *testing.T) (*Engine, *fakesqldb.DB, func()) { )) db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{}) db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{}) + AddFakeInnoDBReadRowsResult(db, 1) se := newEngine(10, 10*time.Second, 10*time.Second, db) require.NoError(t, se.Open()) cancel := func() { diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index f870de335e0..99304c36335 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -2465,6 +2465,11 @@ func setupFakeDB(t *testing.T) *fakesqldb.DB { mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), }, }) + db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Variable_name|Value", + "varchar|int64"), + "Innodb_rows_read|0", + )) return db }