diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index 71bf08f06c8..82c1e320c06 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -136,6 +136,7 @@ type ExpectedResult struct { type exprResult struct { expr *regexp.Regexp result *sqltypes.Result + err string } // ExpectedExecuteFetch defines for an expected query the to be faked output. @@ -391,6 +392,9 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R if ok { userCallback(query) } + if pat.err != "" { + return fmt.Errorf(pat.err) + } return callback(pat.result) } } @@ -504,7 +508,20 @@ func (db *DB) AddQueryPattern(queryPattern string, expectedResult *sqltypes.Resu result := *expectedResult db.mu.Lock() defer db.mu.Unlock() - db.patternData = append(db.patternData, exprResult{expr, &result}) + db.patternData = append(db.patternData, exprResult{expr: expr, result: &result}) +} + +// RejectQueryPattern allows a query pattern to be rejected with an error +func (db *DB) RejectQueryPattern(queryPattern, error string) { + expr := regexp.MustCompile("(?is)^" + queryPattern + "$") + db.mu.Lock() + defer db.mu.Unlock() + db.patternData = append(db.patternData, exprResult{expr: expr, err: error}) +} + +// ClearQueryPattern removes all query patterns set up +func (db *DB) ClearQueryPattern() { + db.patternData = nil } // AddQueryPatternWithCallback is similar to AddQueryPattern: in addition it calls the provided callback function diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 86824be43ca..b178a5f0d5f 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -44,6 +44,10 @@ const ( mariaDBReplicationHackPrefix = "5.5.5-" // mariaDBVersionString is present in mariaDBVersionString = "MariaDB" + // mysql57VersionPrefix is the prefix for 5.7 mysql version, such as 5.7.31-log + mysql57VersionPrefix = "5.7." + // mysql80VersionPrefix is the prefix for 8.0 mysql version, such as 8.0.19 + mysql80VersionPrefix = "8.0." ) // flavor is the abstract interface for a flavor. @@ -111,6 +115,8 @@ type flavor interface { // timestamp cannot be set by regular clients. enableBinlogPlaybackCommand() string disableBinlogPlaybackCommand() string + + baseShowTablesWithSizes() string } // flavors maps flavor names to their implementation. @@ -131,23 +137,27 @@ var flavors = make(map[string]func() flavor) // as well (not matching what c.ServerVersion is, but matching after we remove // the prefix). func (c *Conn) fillFlavor(params *ConnParams) { - if flavorFunc := flavors[params.Flavor]; flavorFunc != nil { - c.flavor = flavorFunc() - return - } + flavorFunc := flavors[params.Flavor] - if strings.HasPrefix(c.ServerVersion, mariaDBReplicationHackPrefix) { + switch { + case flavorFunc != nil: + c.flavor = flavorFunc() + case strings.HasPrefix(c.ServerVersion, mariaDBReplicationHackPrefix): c.ServerVersion = c.ServerVersion[len(mariaDBReplicationHackPrefix):] - c.flavor = mariadbFlavor{} - return - } - - if strings.Contains(c.ServerVersion, mariaDBVersionString) { - c.flavor = mariadbFlavor{} - return + c.flavor = mariadbFlavor101{} + case strings.Contains(c.ServerVersion, mariaDBVersionString): + mariadbVersion, err := strconv.ParseFloat(c.ServerVersion[:4], 64) + if err != nil || mariadbVersion < 10.2 { + c.flavor = mariadbFlavor101{} + } + c.flavor = mariadbFlavor102{} + case strings.HasPrefix(c.ServerVersion, mysql57VersionPrefix): + c.flavor = mysqlFlavor57{} + case strings.HasPrefix(c.ServerVersion, mysql80VersionPrefix): + c.flavor = mysqlFlavor80{} + default: + c.flavor = mysqlFlavor56{} } - - c.flavor = mysqlFlavor{} } // @@ -159,8 +169,11 @@ func (c *Conn) fillFlavor(params *ConnParams) { // is identified as MariaDB. Most applications should not care, but // this is useful in tests. func (c *Conn) IsMariaDB() bool { - _, ok := c.flavor.(mariadbFlavor) - return ok + switch c.flavor.(type) { + case mariadbFlavor101, mariadbFlavor102: + return true + } + return false } // MasterPosition returns the current master replication position. @@ -390,3 +403,8 @@ func (c *Conn) EnableBinlogPlaybackCommand() string { func (c *Conn) DisableBinlogPlaybackCommand() string { return c.flavor.disableBinlogPlaybackCommand() } + +// BaseShowTables returns a query that shows tables and their sizes +func (c *Conn) BaseShowTables() string { + return c.flavor.baseShowTablesWithSizes() +} diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 134de8ba707..33e67b76624 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -271,3 +271,8 @@ func (*filePosFlavor) enableBinlogPlaybackCommand() string { func (*filePosFlavor) disableBinlogPlaybackCommand() string { return "" } + +// baseShowTablesWithSizes is part of the Flavor interface. +func (*filePosFlavor) baseShowTablesWithSizes() string { + return TablesWithSize56 +} diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 6d7db404442..422344d4f5a 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -30,6 +30,15 @@ import ( // mariadbFlavor implements the Flavor interface for MariaDB. type mariadbFlavor struct{} +type mariadbFlavor101 struct { + mariadbFlavor +} +type mariadbFlavor102 struct { + mariadbFlavor +} + +var _ flavor = (*mariadbFlavor101)(nil) +var _ flavor = (*mariadbFlavor102)(nil) // masterGTIDSet is part of the Flavor interface. func (mariadbFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) { diff --git a/go/mysql/flavor_mariadb_binlog_playback.go b/go/mysql/flavor_mariadb_binlog_playback.go index c30e39d2787..e862e744d04 100644 --- a/go/mysql/flavor_mariadb_binlog_playback.go +++ b/go/mysql/flavor_mariadb_binlog_playback.go @@ -29,3 +29,13 @@ func (mariadbFlavor) enableBinlogPlaybackCommand() string { func (mariadbFlavor) disableBinlogPlaybackCommand() string { return "" } + +// baseShowTablesWithSizes is part of the Flavor interface. +func (mariadbFlavor101) baseShowTablesWithSizes() string { + return TablesWithSize56 +} + +// baseShowTablesWithSizes is part of the Flavor interface. +func (mariadbFlavor102) baseShowTablesWithSizes() string { + return TablesWithSize57 +} diff --git a/go/mysql/flavor_mariadb_test.go b/go/mysql/flavor_mariadb_test.go index 120cde37583..82a5b1312b4 100644 --- a/go/mysql/flavor_mariadb_test.go +++ b/go/mysql/flavor_mariadb_test.go @@ -40,7 +40,7 @@ func TestMariadbSetMasterCommands(t *testing.T) { MASTER_CONNECT_RETRY = 1234, MASTER_USE_GTID = current_pos` - conn := &Conn{flavor: mariadbFlavor{}} + conn := &Conn{flavor: mariadbFlavor101{}} got := conn.SetMasterCommand(params, masterHost, masterPort, masterConnectRetry) if got != want { t.Errorf("mariadbFlavor.SetMasterCommands(%#v, %#v, %#v, %#v) = %#v, want %#v", params, masterHost, masterPort, masterConnectRetry, got, want) @@ -73,7 +73,7 @@ func TestMariadbSetMasterCommandsSSL(t *testing.T) { MASTER_SSL_KEY = 'ssl-key', MASTER_USE_GTID = current_pos` - conn := &Conn{flavor: mariadbFlavor{}} + conn := &Conn{flavor: mariadbFlavor101{}} got := conn.SetMasterCommand(params, masterHost, masterPort, masterConnectRetry) if got != want { t.Errorf("mariadbFlavor.SetMasterCommands(%#v, %#v, %#v, %#v) = %#v, want %#v", params, masterHost, masterPort, masterConnectRetry, got, want) diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 590cc3ed2d5..9d4dd5e08da 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -29,6 +29,19 @@ import ( // mysqlFlavor implements the Flavor interface for Mysql. type mysqlFlavor struct{} +type mysqlFlavor56 struct { + mysqlFlavor +} +type mysqlFlavor57 struct { + mysqlFlavor +} +type mysqlFlavor80 struct { + mysqlFlavor +} + +var _ flavor = (*mysqlFlavor56)(nil) +var _ flavor = (*mysqlFlavor57)(nil) +var _ flavor = (*mysqlFlavor80)(nil) // masterGTIDSet is part of the Flavor interface. func (mysqlFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) { @@ -231,3 +244,40 @@ func (mysqlFlavor) enableBinlogPlaybackCommand() string { func (mysqlFlavor) disableBinlogPlaybackCommand() string { return "" } + +// TablesWithSize56 is a query to select table along with size for mysql 5.6 +const TablesWithSize56 = `SELECT table_name, table_type, unix_timestamp(create_time), table_comment, SUM( data_length + index_length), SUM( data_length + index_length) + FROM information_schema.tables WHERE table_schema = database() group by table_name` + +// TablesWithSize57 is a query to select table along with size for mysql 5.7. +// It's a little weird, because the JOIN predicate only works if the table and databases do not contain weird characters. +// As a fallback, we use the mysql 5.6 query, which is not always up to date, but works for all table/db names. +const TablesWithSize57 = `SELECT t.table_name, t.table_type, unix_timestamp(t.create_time), t.table_comment, i.file_size, i.allocated_size + FROM information_schema.tables t, information_schema.innodb_sys_tablespaces i + WHERE t.table_schema = database() and i.name = concat(t.table_schema,'/',t.table_name) +UNION ALL + SELECT table_name, table_type, unix_timestamp(create_time), table_comment, SUM( data_length + index_length), SUM( data_length + index_length) + FROM information_schema.tables t + WHERE table_schema = database() AND NOT EXISTS(SELECT * FROM information_schema.innodb_sys_tablespaces i WHERE i.name = concat(t.table_schema,'/',t.table_name)) + group by table_name, table_type, unix_timestamp(create_time), table_comment +` + +// TablesWithSize80 is a query to select table along with size for mysql 8.0 +const TablesWithSize80 = `SELECT t.table_name, t.table_type, unix_timestamp(t.create_time), t.table_comment, i.file_size, i.allocated_size + FROM information_schema.tables t, information_schema.innodb_tablespaces i + WHERE t.table_schema = database() and i.name = concat(t.table_schema,'/',t.table_name)` + +// baseShowTablesWithSizes is part of the Flavor interface. +func (mysqlFlavor56) baseShowTablesWithSizes() string { + return TablesWithSize56 +} + +// baseShowTablesWithSizes is part of the Flavor interface. +func (mysqlFlavor57) baseShowTablesWithSizes() string { + return TablesWithSize57 +} + +// baseShowTablesWithSizes is part of the Flavor interface. +func (mysqlFlavor80) baseShowTablesWithSizes() string { + return TablesWithSize80 +} diff --git a/go/mysql/flavor_mysql_test.go b/go/mysql/flavor_mysql_test.go index 398c3d4e147..8f72242a891 100644 --- a/go/mysql/flavor_mysql_test.go +++ b/go/mysql/flavor_mysql_test.go @@ -39,7 +39,7 @@ func TestMysql56SetMasterCommands(t *testing.T) { MASTER_CONNECT_RETRY = 1234, MASTER_AUTO_POSITION = 1` - conn := &Conn{flavor: mysqlFlavor{}} + conn := &Conn{flavor: mysqlFlavor57{}} got := conn.SetMasterCommand(params, masterHost, masterPort, masterConnectRetry) if got != want { t.Errorf("mysqlFlavor.SetMasterCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, masterHost, masterPort, masterConnectRetry, got, want) @@ -72,7 +72,7 @@ func TestMysql56SetMasterCommandsSSL(t *testing.T) { MASTER_SSL_KEY = 'ssl-key', MASTER_AUTO_POSITION = 1` - conn := &Conn{flavor: mysqlFlavor{}} + conn := &Conn{flavor: mysqlFlavor57{}} got := conn.SetMasterCommand(params, masterHost, masterPort, masterConnectRetry) if got != want { t.Errorf("mysqlFlavor.SetMasterCommands(%#v, %#v, %#v, %#v) = %#v, want %#v", params, masterHost, masterPort, masterConnectRetry, got, want) diff --git a/go/mysql/schema.go b/go/mysql/schema.go index 3174dad7274..0ea29da129a 100644 --- a/go/mysql/schema.go +++ b/go/mysql/schema.go @@ -30,9 +30,6 @@ import ( // data. const ( - // BaseShowTables is the base query used in further methods. - BaseShowTables = "SELECT table_name, table_type, unix_timestamp(create_time), table_comment FROM information_schema.tables WHERE table_schema = database()" - // BaseShowPrimary is the base query for fetching primary key info. BaseShowPrimary = "SELECT table_name, column_name FROM information_schema.key_column_usage WHERE table_schema=database() AND constraint_name='PRIMARY' ORDER BY table_name, ordinal_position" ) @@ -40,48 +37,55 @@ const ( // BaseShowTablesFields contains the fields returned by a BaseShowTables or a BaseShowTablesForTable command. // They are validated by the // testBaseShowTables test. -var BaseShowTablesFields = []*querypb.Field{ - { - Name: "table_name", - Type: querypb.Type_VARCHAR, - Table: "tables", - OrgTable: "TABLES", - Database: "information_schema", - OrgName: "TABLE_NAME", - ColumnLength: 192, - Charset: CharacterSetUtf8, - Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG), - }, - { - Name: "table_type", - Type: querypb.Type_VARCHAR, - Table: "tables", - OrgTable: "TABLES", - Database: "information_schema", - OrgName: "TABLE_TYPE", - ColumnLength: 192, - Charset: CharacterSetUtf8, - Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG), - }, - { - Name: "unix_timestamp(create_time)", - Type: querypb.Type_INT64, - ColumnLength: 11, - Charset: CharacterSetBinary, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG | querypb.MySqlFlag_NUM_FLAG), - }, - { - Name: "table_comment", - Type: querypb.Type_VARCHAR, - Table: "tables", - OrgTable: "TABLES", - Database: "information_schema", - OrgName: "TABLE_COMMENT", - ColumnLength: 6144, - Charset: CharacterSetUtf8, - Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG), - }, -} +var BaseShowTablesFields = []*querypb.Field{{ + Name: "t.table_name", + Type: querypb.Type_VARCHAR, + Table: "tables", + OrgTable: "TABLES", + Database: "information_schema", + OrgName: "TABLE_NAME", + ColumnLength: 192, + Charset: CharacterSetUtf8, + Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG), +}, { + Name: "t.table_type", + Type: querypb.Type_VARCHAR, + Table: "tables", + OrgTable: "TABLES", + Database: "information_schema", + OrgName: "TABLE_TYPE", + ColumnLength: 192, + Charset: CharacterSetUtf8, + Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG), +}, { + Name: "unix_timestamp(t.create_time)", + Type: querypb.Type_INT64, + ColumnLength: 11, + Charset: CharacterSetBinary, + Flags: uint32(querypb.MySqlFlag_BINARY_FLAG | querypb.MySqlFlag_NUM_FLAG), +}, { + Name: "t.table_comment", + Type: querypb.Type_VARCHAR, + Table: "tables", + OrgTable: "TABLES", + Database: "information_schema", + OrgName: "TABLE_COMMENT", + ColumnLength: 6144, + Charset: CharacterSetUtf8, + Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG), +}, { + Name: "i.file_size", + Type: querypb.Type_INT64, + ColumnLength: 11, + Charset: CharacterSetBinary, + Flags: uint32(querypb.MySqlFlag_BINARY_FLAG | querypb.MySqlFlag_NUM_FLAG), +}, { + Name: "i.allocated_size", + Type: querypb.Type_INT64, + ColumnLength: 11, + Charset: CharacterSetBinary, + Flags: uint32(querypb.MySqlFlag_BINARY_FLAG | querypb.MySqlFlag_NUM_FLAG), +}} // BaseShowTablesRow returns the fields from a BaseShowTables or // BaseShowTablesForTable command. @@ -95,6 +99,8 @@ func BaseShowTablesRow(tableName string, isView bool, comment string) []sqltypes sqltypes.MakeTrusted(sqltypes.VarChar, []byte(tableType)), sqltypes.MakeTrusted(sqltypes.Int64, []byte("1427325875")), // unix_timestamp(create_time) sqltypes.MakeTrusted(sqltypes.VarChar, []byte(comment)), + sqltypes.MakeTrusted(sqltypes.Int64, []byte("100")), // file_size + sqltypes.MakeTrusted(sqltypes.Int64, []byte("150")), // allocated_size } } diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go index 34773864c79..c7f5f3d47fc 100644 --- a/go/vt/vtexplain/vtexplain_vttablet.go +++ b/go/vt/vtexplain/vtexplain_vttablet.go @@ -382,7 +382,7 @@ func initTabletEnvironment(ddls []sqlparser.DDLStatement, opts *Options) error { } showTableRows = append(showTableRows, mysql.BaseShowTablesRow(table, false, options)) } - schemaQueries[mysql.BaseShowTables] = &sqltypes.Result{ + schemaQueries[mysql.TablesWithSize57] = &sqltypes.Result{ Fields: mysql.BaseShowTablesFields, Rows: showTableRows, } diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index fefd3ca5c19..a41e7a5039a 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -374,6 +374,11 @@ func (dbc *DBConn) ID() int64 { return dbc.conn.ID() } +// BaseShowTables returns a query that shows tables and their sizes +func (dbc *DBConn) BaseShowTables() string { + return dbc.conn.BaseShowTables() +} + func (dbc *DBConn) reconnect(ctx context.Context) error { dbc.conn.Close() // Reuse MySQLTimings from dbc.conn. diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index 029fc363e31..58d78ed30b3 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -35,10 +35,10 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/cache" - "vitess.io/vitess/go/streamlog" - + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/tableacl" "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" @@ -120,6 +120,15 @@ func TestGetMessageStreamPlan(t *testing.T) { for query, result := range schematest.Queries() { db.AddQuery(query, result) } + db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{ + Fields: mysql.BaseShowTablesFields, + Rows: [][]sqltypes.Value{ + mysql.BaseShowTablesRow("test_table_01", false, ""), + mysql.BaseShowTablesRow("test_table_02", false, ""), + mysql.BaseShowTablesRow("test_table_03", false, ""), + mysql.BaseShowTablesRow("seq", false, "vitess_sequence"), + mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), + }}) qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) qe.se.Open() qe.Open() diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index b293533c6ea..a6c5996230e 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -604,7 +604,6 @@ func TestQueryExecutorPlanNextval(t *testing.T) { sqltypes.NewInt64(7), sqltypes.NewInt64(3), }}, - RowsAffected: 2, }) updateQuery = "update seq set next_id = 13 where id = 0" db.AddQuery(updateQuery, &sqltypes.Result{}) @@ -1158,14 +1157,24 @@ func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, tx func setUpQueryExecutorTest(t *testing.T) *fakesqldb.DB { db := fakesqldb.New(t) - initQueryExecutorTestDB(db, true) + initQueryExecutorTestDB(db) return db } -func initQueryExecutorTestDB(db *fakesqldb.DB, testTableHasMultipleUniqueKeys bool) { - for query, result := range getQueryExecutorSupportedQueries(testTableHasMultipleUniqueKeys) { +const baseShowTablesPattern = `SELECT t\.table_name.*` + +func initQueryExecutorTestDB(db *fakesqldb.DB) { + for query, result := range getQueryExecutorSupportedQueries() { db.AddQuery(query, result) } + db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{ + Fields: mysql.BaseShowTablesFields, + Rows: [][]sqltypes.Value{ + mysql.BaseShowTablesRow("test_table", false, ""), + mysql.BaseShowTablesRow("seq", false, "vitess_sequence"), + mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), + }, + }) } func getTestTableFields() []*querypb.Field { @@ -1176,7 +1185,7 @@ func getTestTableFields() []*querypb.Field { } } -func getQueryExecutorSupportedQueries(testTableHasMultipleUniqueKeys bool) map[string]*sqltypes.Result { +func getQueryExecutorSupportedQueries() map[string]*sqltypes.Result { return map[string]*sqltypes.Result{ // queries for twopc fmt.Sprintf(sqlCreateSidecarDB, "_vt"): {}, @@ -1238,24 +1247,13 @@ func getQueryExecutorSupportedQueries(testTableHasMultipleUniqueKeys bool) map[s Fields: []*querypb.Field{{ Type: sqltypes.Uint64, }}, - Rows: [][]sqltypes.Value{}, - RowsAffected: 0, + Rows: [][]sqltypes.Value{}, }, "(select 0 as x from dual where 1 != 1) union (select 1 as y from dual where 1 != 1) limit 10001": { Fields: []*querypb.Field{{ Type: sqltypes.Uint64, }}, - Rows: [][]sqltypes.Value{}, - RowsAffected: 0, - }, - mysql.BaseShowTables: { - Fields: mysql.BaseShowTablesFields, - Rows: [][]sqltypes.Value{ - mysql.BaseShowTablesRow("test_table", false, ""), - mysql.BaseShowTablesRow("seq", false, "vitess_sequence"), - mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), - }, - RowsAffected: 3, + Rows: [][]sqltypes.Value{}, }, mysql.BaseShowPrimary: { Fields: mysql.ShowPrimaryFields, @@ -1264,7 +1262,6 @@ func getQueryExecutorSupportedQueries(testTableHasMultipleUniqueKeys bool) map[s mysql.ShowPrimaryRow("seq", "id"), mysql.ShowPrimaryRow("msg", "id"), }, - RowsAffected: 3, }, "select * from test_table where 1 != 1": { Fields: []*querypb.Field{{ diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index f49229c9ca7..631b8b8f82a 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -76,6 +77,9 @@ type Engine struct { // dbCreationFailed is for preventing log spam. dbCreationFailed bool + + tableFileSizeGauge *stats.GaugesWithSingleLabel + tableAllocatedSizeGauge *stats.GaugesWithSingleLabel } // NewEngine creates a new Engine. @@ -93,6 +97,8 @@ func NewEngine(env tabletenv.Env) *Engine { reloadTime: reloadTime, } _ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval) + se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table") + se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table") env.Exporter().HandleFunc("/debug/schema", se.handleDebugSchema) env.Exporter().HandleFunc("/schemaz", func(w http.ResponseWriter, r *http.Request) { @@ -305,7 +311,7 @@ func (se *Engine) reload(ctx context.Context) error { if se.SkipMetaCheck { return nil } - tableData, err := conn.Exec(ctx, mysql.BaseShowTables, maxTableCount, false) + tableData, err := conn.Exec(ctx, conn.BaseShowTables(), maxTableCount, false) if err != nil { return err } @@ -321,19 +327,32 @@ func (se *Engine) reload(ctx context.Context) error { tableName := row[0].ToString() curTables[tableName] = true createTime, _ := evalengine.ToInt64(row[2]) + fileSize, _ := evalengine.ToUint64(row[4]) + allocatedSize, _ := evalengine.ToUint64(row[5]) + + // publish the size metrics + se.tableFileSizeGauge.Set(tableName, int64(fileSize)) + se.tableAllocatedSizeGauge.Set(tableName, int64(allocatedSize)) + // TODO(sougou); find a better way detect changed tables. This method // seems unreliable. The endtoend test flags all tables as changed. - if _, ok := se.tables[tableName]; ok && createTime < se.lastChange { + tbl, isInTablesMap := se.tables[tableName] + if isInTablesMap && createTime < se.lastChange { + tbl.FileSize = fileSize + tbl.AllocatedSize = allocatedSize continue } + log.V(2).Infof("Reading schema for table: %s", tableName) - table, err := LoadTable(conn, tableName, row[1].ToString(), row[3].ToString()) + table, err := LoadTable(conn, tableName, row[3].ToString()) if err != nil { rec.RecordError(err) continue } + table.FileSize = fileSize + table.AllocatedSize = allocatedSize changedTables[tableName] = table - if _, ok := se.tables[tableName]; ok { + if isInTablesMap { altered = append(altered, tableName) } else { created = append(created, tableName) @@ -346,11 +365,10 @@ func (se *Engine) reload(ctx context.Context) error { // Compute and handle dropped tables. var dropped []string for tableName := range se.tables { - if curTables[tableName] { - continue + if !curTables[tableName] { + dropped = append(dropped, tableName) + delete(se.tables, tableName) } - dropped = append(dropped, tableName) - delete(se.tables, tableName) } // Populate PKColumns for changed tables. @@ -512,10 +530,10 @@ func (se *Engine) handleDebugSchema(response http.ResponseWriter, request *http. acl.SendError(response, err) return } - se.handleHTTPSchema(response, request) + se.handleHTTPSchema(response) } -func (se *Engine) handleHTTPSchema(response http.ResponseWriter, request *http.Request) { +func (se *Engine) handleHTTPSchema(response http.ResponseWriter) { // Ensure schema engine is Open. If vttablet came up in a non_serving role, // the schema engine may not have been initialized. err := se.Open() diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index ab5dc20cf03..89382a7d7f4 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -18,7 +18,6 @@ package schema import ( "expvar" - "fmt" "net/http" "net/http/httptest" "sort" @@ -26,6 +25,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/test/utils" + "context" "github.com/stretchr/testify/assert" @@ -42,12 +43,36 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" ) +const baseShowTablesPattern = `SELECT t\.table_name.*` + +var mustMatch = utils.MustMatchFn( + []interface{}{ // types with unexported fields + sqlparser.TableIdent{}, + }, + []string{".Mutex"}, // ignored fields +) + func TestOpenAndReload(t *testing.T) { db := fakesqldb.New(t) defer db.Close() for query, result := range schematest.Queries() { db.AddQuery(query, result) } + db.AddQueryPattern(baseShowTablesPattern, + &sqltypes.Result{ + Fields: mysql.BaseShowTablesFields, + RowsAffected: 0, + InsertID: 0, + Rows: [][]sqltypes.Value{ + mysql.BaseShowTablesRow("test_table_01", false, ""), + mysql.BaseShowTablesRow("test_table_02", false, ""), + mysql.BaseShowTablesRow("test_table_03", false, ""), + mysql.BaseShowTablesRow("seq", false, "vitess_sequence"), + mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), + }, + SessionStateChanges: "", + StatusFlags: 0, + }) // pre-advance to above the default 1427325875. db.AddQuery("select unix_timestamp()", sqltypes.MakeTestResult(sqltypes.MakeTestFields( @@ -55,12 +80,12 @@ func TestOpenAndReload(t *testing.T) { "int64"), "1427325876", )) - se := newEngine(10, 10*time.Second, 10*time.Second, true, db) + se := newEngine(10, 10*time.Second, 10*time.Second, db) se.Open() defer se.Close() want := initialSchema() - assert.Equal(t, want, se.GetSchema()) + mustMatch(t, want, se.GetSchema()) // Advance time some more. db.AddQuery("select unix_timestamp()", sqltypes.MakeTestResult(sqltypes.MakeTestFields( @@ -71,17 +96,19 @@ func TestOpenAndReload(t *testing.T) { // Modify test_table_03 // Add test_table_04 // Drop msg - db.AddQuery(mysql.BaseShowTables, &sqltypes.Result{ + db.ClearQueryPattern() + db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{ Fields: mysql.BaseShowTablesFields, Rows: [][]sqltypes.Value{ mysql.BaseShowTablesRow("test_table_01", false, ""), mysql.BaseShowTablesRow("test_table_02", false, ""), { - sqltypes.MakeTrusted(sqltypes.VarChar, []byte("test_table_03")), - sqltypes.MakeTrusted(sqltypes.VarChar, []byte("BASE TABLE")), - // Match the timestamp. - sqltypes.MakeTrusted(sqltypes.Int64, []byte("1427325877")), - sqltypes.MakeTrusted(sqltypes.VarChar, []byte("")), + sqltypes.MakeTrusted(sqltypes.VarChar, []byte("test_table_03")), // table_name + sqltypes.MakeTrusted(sqltypes.VarChar, []byte("BASE TABLE")), // table_type + sqltypes.MakeTrusted(sqltypes.Int64, []byte("1427325877")), // unix_timestamp(t.create_time) // Match the timestamp. + sqltypes.MakeTrusted(sqltypes.VarChar, []byte("")), // table_comment + sqltypes.MakeTrusted(sqltypes.Int64, []byte("128")), // file_size + sqltypes.MakeTrusted(sqltypes.Int64, []byte("256")), // allocated_size }, // test_table_04 will in spite of older timestamp because it doesn't exist yet. mysql.BaseShowTablesRow("test_table_04", false, ""), @@ -149,7 +176,9 @@ func TestOpenAndReload(t *testing.T) { Name: "val", Type: sqltypes.Int32, }}, - PKColumns: []int{0, 1}, + PKColumns: []int{0, 1}, + FileSize: 128, + AllocatedSize: 256, } want["test_table_04"] = &Table{ Name: sqlparser.NewTableIdent("test_table_04"), @@ -157,7 +186,9 @@ func TestOpenAndReload(t *testing.T) { Name: "pk", Type: sqltypes.Int32, }}, - PKColumns: []int{0}, + PKColumns: []int{0}, + FileSize: 100, + AllocatedSize: 150, } delete(want, "msg") assert.Equal(t, want, se.GetSchema()) @@ -177,8 +208,8 @@ func TestOpenAndReload(t *testing.T) { require.NoError(t, err) assert.Equal(t, want, se.GetSchema()) - // delete table test_table_03 - db.AddQuery(mysql.BaseShowTables, &sqltypes.Result{ + db.ClearQueryPattern() + db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{ Fields: mysql.BaseShowTablesFields, Rows: [][]sqltypes.Value{ mysql.BaseShowTablesRow("test_table_01", false, ""), @@ -220,7 +251,7 @@ func TestOpenFailedDueToMissMySQLTime(t *testing.T) { {sqltypes.NewVarBinary("1427325875")}, }, }) - se := newEngine(10, 1*time.Second, 1*time.Second, false, db) + se := newEngine(10, 1*time.Second, 1*time.Second, db) err := se.Open() want := "could not get MySQL time" if err == nil || !strings.Contains(err.Error(), want) { @@ -240,7 +271,7 @@ func TestOpenFailedDueToIncorrectMysqlRowNum(t *testing.T) { {sqltypes.NULL}, }, }) - se := newEngine(10, 1*time.Second, 1*time.Second, false, db) + se := newEngine(10, 1*time.Second, 1*time.Second, db) err := se.Open() want := "unexpected result for MySQL time" if err == nil || !strings.Contains(err.Error(), want) { @@ -260,7 +291,7 @@ func TestOpenFailedDueToInvalidTimeFormat(t *testing.T) { {sqltypes.NewVarBinary("invalid_time")}, }, }) - se := newEngine(10, 1*time.Second, 1*time.Second, false, db) + se := newEngine(10, 1*time.Second, 1*time.Second, db) err := se.Open() want := "could not parse time" if err == nil || !strings.Contains(err.Error(), want) { @@ -274,10 +305,11 @@ func TestOpenFailedDueToExecErr(t *testing.T) { for query, result := range schematest.Queries() { db.AddQuery(query, result) } - db.AddRejectedQuery(mysql.BaseShowTables, fmt.Errorf("injected error")) - se := newEngine(10, 1*time.Second, 1*time.Second, false, db) - err := se.Open() + want := "injected error" + db.RejectQueryPattern(baseShowTablesPattern, want) + se := newEngine(10, 1*time.Second, 1*time.Second, db) + err := se.Open() if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("se.Open: %v, want %s", err, want) } @@ -289,7 +321,7 @@ func TestOpenFailedDueToTableErr(t *testing.T) { for query, result := range schematest.Queries() { db.AddQuery(query, result) } - db.AddQuery(mysql.BaseShowTables, &sqltypes.Result{ + db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{ Fields: mysql.BaseShowTablesFields, Rows: [][]sqltypes.Value{ mysql.BaseShowTablesRow("test_table", false, ""), @@ -306,7 +338,7 @@ func TestOpenFailedDueToTableErr(t *testing.T) { {sqltypes.NewVarBinary("")}, }, }) - se := newEngine(10, 1*time.Second, 1*time.Second, false, db) + se := newEngine(10, 1*time.Second, 1*time.Second, db) err := se.Open() want := "Row count exceeded" if err == nil || !strings.Contains(err.Error(), want) { @@ -320,7 +352,7 @@ func TestExportVars(t *testing.T) { for query, result := range schematest.Queries() { db.AddQuery(query, result) } - se := newEngine(10, 1*time.Second, 1*time.Second, true, db) + se := newEngine(10, 1*time.Second, 1*time.Second, db) se.Open() defer se.Close() expvar.Do(func(kv expvar.KeyValue) { @@ -334,7 +366,7 @@ func TestStatsURL(t *testing.T) { for query, result := range schematest.Queries() { db.AddQuery(query, result) } - se := newEngine(10, 1*time.Second, 1*time.Second, true, db) + se := newEngine(10, 1*time.Second, 1*time.Second, db) se.Open() defer se.Close() @@ -343,7 +375,7 @@ func TestStatsURL(t *testing.T) { se.handleDebugSchema(response, request) } -func newEngine(queryCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, strict bool, db *fakesqldb.DB) *Engine { +func newEngine(queryCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, db *fakesqldb.DB) *Engine { config := tabletenv.NewDefaultConfig() config.QueryCacheSize = queryCacheSize config.SchemaReloadIntervalSeconds.Set(reloadTime) @@ -372,7 +404,9 @@ func initialSchema() map[string]*Table { Name: "pk", Type: sqltypes.Int32, }}, - PKColumns: []int{0}, + PKColumns: []int{0}, + FileSize: 0x64, + AllocatedSize: 0x96, }, "test_table_02": { Name: sqlparser.NewTableIdent("test_table_02"), @@ -380,7 +414,9 @@ func initialSchema() map[string]*Table { Name: "pk", Type: sqltypes.Int32, }}, - PKColumns: []int{0}, + PKColumns: []int{0}, + FileSize: 0x64, + AllocatedSize: 0x96, }, "test_table_03": { Name: sqlparser.NewTableIdent("test_table_03"), @@ -388,7 +424,9 @@ func initialSchema() map[string]*Table { Name: "pk", Type: sqltypes.Int32, }}, - PKColumns: []int{0}, + PKColumns: []int{0}, + FileSize: 0x64, + AllocatedSize: 0x96, }, "seq": { Name: sqlparser.NewTableIdent("seq"), @@ -406,8 +444,10 @@ func initialSchema() map[string]*Table { Name: "increment", Type: sqltypes.Int64, }}, - PKColumns: []int{0}, - SequenceInfo: &SequenceInfo{}, + PKColumns: []int{0}, + FileSize: 0x64, + AllocatedSize: 0x96, + SequenceInfo: &SequenceInfo{}, }, "msg": { Name: sqlparser.NewTableIdent("msg"), @@ -431,7 +471,9 @@ func initialSchema() map[string]*Table { Name: "message", Type: sqltypes.Int64, }}, - PKColumns: []int{0}, + PKColumns: []int{0}, + FileSize: 0x64, + AllocatedSize: 0x96, MessageInfo: &MessageInfo{ Fields: []*querypb.Field{{ Name: "id", diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index ce3b2f3f881..3d32dbd9075 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -28,7 +28,7 @@ import ( ) // LoadTable creates a Table from the schema info in the database. -func LoadTable(conn *connpool.DBConn, tableName string, tableType string, comment string) (*Table, error) { +func LoadTable(conn *connpool.DBConn, tableName string, comment string) (*Table, error) { ta := NewTable(tableName) sqlTableName := sqlparser.String(ta.Name) if err := fetchColumns(ta, conn, sqlTableName); err != nil { diff --git a/go/vt/vttablet/tabletserver/schema/load_table_test.go b/go/vt/vttablet/tabletserver/schema/load_table_test.go index 25c0924b693..8d7e784ff0b 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table_test.go +++ b/go/vt/vttablet/tabletserver/schema/load_table_test.go @@ -173,7 +173,7 @@ func newTestLoadTable(tableType string, comment string, db *fakesqldb.DB) (*Tabl } defer conn.Recycle() - return LoadTable(conn, "test_table", tableType, comment) + return LoadTable(conn, "test_table", comment) } func getTestLoadTableQueries() map[string]*sqltypes.Result { diff --git a/go/vt/vttablet/tabletserver/schema/main_test.go b/go/vt/vttablet/tabletserver/schema/main_test.go index e260cbbc697..19d03dd809f 100644 --- a/go/vt/vttablet/tabletserver/schema/main_test.go +++ b/go/vt/vttablet/tabletserver/schema/main_test.go @@ -34,10 +34,9 @@ func getTestSchemaEngine(t *testing.T) (*Engine, *fakesqldb.DB, func()) { "int64"), "1427325876", )) - db.AddQuery(mysql.BaseShowTables, &sqltypes.Result{}) - + db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{}) db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{}) - se := newEngine(10, 10*time.Second, 10*time.Second, true, db) + se := newEngine(10, 10*time.Second, 10*time.Second, db) require.NoError(t, se.Open()) cancel := func() { defer db.Close() diff --git a/go/vt/vttablet/tabletserver/schema/schema.go b/go/vt/vttablet/tabletserver/schema/schema.go index ebf622bddb6..69aae6387ac 100644 --- a/go/vt/vttablet/tabletserver/schema/schema.go +++ b/go/vt/vttablet/tabletserver/schema/schema.go @@ -52,6 +52,9 @@ type Table struct { // MessageInfo contains info for message tables. MessageInfo *MessageInfo + + FileSize uint64 + AllocatedSize uint64 } // SequenceInfo contains info specific to sequence tabels. diff --git a/go/vt/vttablet/tabletserver/schema/schematest/schematest.go b/go/vt/vttablet/tabletserver/schema/schematest/schematest.go index 64395121714..236ed3d024f 100644 --- a/go/vt/vttablet/tabletserver/schema/schematest/schematest.go +++ b/go/vt/vttablet/tabletserver/schema/schematest/schematest.go @@ -61,16 +61,6 @@ func Queries() map[string]*sqltypes.Result { {sqltypes.NewVarBinary("0")}, }, }, - mysql.BaseShowTables: { - Fields: mysql.BaseShowTablesFields, - Rows: [][]sqltypes.Value{ - mysql.BaseShowTablesRow("test_table_01", false, ""), - mysql.BaseShowTablesRow("test_table_02", false, ""), - mysql.BaseShowTablesRow("test_table_03", false, ""), - mysql.BaseShowTablesRow("seq", false, "vitess_sequence"), - mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), - }, - }, mysql.BaseShowPrimary: { Fields: mysql.ShowPrimaryFields, Rows: [][]sqltypes.Value{ diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 96665297eec..f870de335e0 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -2458,6 +2458,14 @@ func setupFakeDB(t *testing.T) *fakesqldb.DB { for query, result := range getSupportedQueries() { db.AddQuery(query, result) } + db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{ + Fields: mysql.BaseShowTablesFields, + Rows: [][]sqltypes.Value{ + mysql.BaseShowTablesRow("test_table", false, ""), + mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), + }, + }) + return db } @@ -2520,13 +2528,6 @@ func getSupportedQueries() map[string]*sqltypes.Result { {sqltypes.NewVarBinary("0")}, }, }, - mysql.BaseShowTables: { - Fields: mysql.BaseShowTablesFields, - Rows: [][]sqltypes.Value{ - mysql.BaseShowTablesRow("test_table", false, ""), - mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"), - }, - }, mysql.BaseShowPrimary: { Fields: mysql.ShowPrimaryFields, Rows: [][]sqltypes.Value{