Skip to content

Commit

Permalink
reduce select usage when query tables (pingcap#110)
Browse files Browse the repository at this point in the history
* reduce select usage when query tables

* fix empty_database bug
  • Loading branch information
lichunzhu authored Jun 30, 2020
1 parent 79ebf47 commit b18f65e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 26 deletions.
21 changes: 2 additions & 19 deletions v4/export/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,12 @@ func prepareDumpingDatabases(conf *Config, db *sql.DB) ([]string, error) {

func listAllTables(db *sql.DB, databaseNames []string) (DatabaseTables, error) {
log.Debug("list all the tables")
dbTables := DatabaseTables{}
for _, dbName := range databaseNames {
tables, err := ListAllTables(db, dbName)
if err != nil {
return nil, err
}
dbTables[dbName] = make([]*TableInfo, 0, len(tables))
dbTables = dbTables.AppendTables(dbName, tables...)
}
return dbTables, nil
return ListAllDatabasesTables(db, databaseNames, TableTypeBase)
}

func listAllViews(db *sql.DB, databaseNames []string) (DatabaseTables, error) {
log.Debug("list all the views")
dbTables := DatabaseTables{}
for _, dbName := range databaseNames {
views, err := ListAllViews(db, dbName)
if err != nil {
return nil, err
}
dbTables = dbTables.AppendViews(dbName, views...)
}
return dbTables, nil
return ListAllDatabasesTables(db, databaseNames, TableTypeView)
}

type databaseName = string
Expand Down
12 changes: 6 additions & 6 deletions v4/export/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ func (s *testPrepareSuite) TestListAllTables(c *C) {
AppendViews("db3", "t6", "t7", "t8")

var dbNames []databaseName
rows := sqlmock.NewRows([]string{"table_schema", "table_name"})
for dbName, tableInfos := range data {
dbNames = append(dbNames, dbName)

rows := sqlmock.NewRows([]string{"Table_name"})
for _, tbInfo := range tableInfos {
if tbInfo.Type == TableTypeView {
continue
}
rows.AddRow(tbInfo.Name)
rows.AddRow(dbName, tbInfo.Name)
}
query := "SELECT table_name FROM information_schema.tables WHERE table_schema = (.*) and table_type = (.*)"
mock.ExpectQuery(query).WillReturnRows(rows)
}
query := "SELECT table_schema,table_name FROM information_schema.tables WHERE table_type = (.*)"
mock.ExpectQuery(query).WillReturnRows(rows)

tables, err := listAllTables(db, dbNames)
c.Assert(err, IsNil)
Expand All @@ -94,8 +94,8 @@ func (s *testPrepareSuite) TestListAllTables(c *C) {
data = NewDatabaseTables().
AppendTables("db", "t1").
AppendViews("db", "t2")
query := "SELECT table_name FROM information_schema.tables WHERE table_schema = (.*) and table_type = (.*)"
mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"Table_name"}).AddRow("t2"))
query = "SELECT table_schema,table_name FROM information_schema.tables WHERE table_type = (.*)"
mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"table_schema", "table_name"}).AddRow("db", "t2"))
tables, err = listAllViews(db, []string{"db"})
c.Assert(err, IsNil)
c.Assert(len(tables), Equals, 1)
Expand Down
36 changes: 35 additions & 1 deletion v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,40 @@ func ListAllTables(db *sql.DB, database string) ([]string, error) {
return tables.data, nil
}

func ListAllDatabasesTables(db *sql.DB, databaseNames []string, tableType TableType) (DatabaseTables, error) {
var tableTypeStr string
switch tableType {
case TableTypeBase:
tableTypeStr = "BASE TABLE"
case TableTypeView:
tableTypeStr = "VIEW"
default:
return nil, errors.Errorf("unknown table type %v", tableType)
}

query := fmt.Sprintf("SELECT table_schema,table_name FROM information_schema.tables WHERE table_type = '%s'", tableTypeStr)
dbTables := DatabaseTables{}
for _, schema := range databaseNames {
dbTables[schema] = make([]*TableInfo, 0)
}

if err := simpleQueryWithArgs(db, func(rows *sql.Rows) error {
var schema, table string
if err := rows.Scan(&schema, &table); err != nil {
return withStack(err)
}

// only append tables to schemas in databaseNames
if _, ok := dbTables[schema]; ok {
dbTables[schema] = append(dbTables[schema], &TableInfo{table, tableType})
}
return nil
}, query); err != nil {
return nil, errors.WithMessage(err, query)
}
return dbTables, nil
}

func ListAllViews(db *sql.DB, database string) ([]string, error) {
var views oneStrColumnTable
const query = "SELECT table_name FROM information_schema.tables WHERE table_schema = ? and table_type = 'VIEW'"
Expand Down Expand Up @@ -426,7 +460,7 @@ func simpleQueryWithArgs(db *sql.DB, handleOneRow func(*sql.Rows) error, sql str
return withStack(err)
}
}
return nil
return rows.Err()
}

func pickupPossibleField(dbName, tableName string, db *sql.DB, conf *Config) (string, error) {
Expand Down

0 comments on commit b18f65e

Please sign in to comment.