Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
reduce database usage, fix unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu committed Jul 5, 2021
1 parent cfb258a commit d3ba1e8
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 97 deletions.
84 changes: 47 additions & 37 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,9 @@ func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *sql.Conn, meta Ta
}
}

func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partition string, currentChunk, totalChunks int) error {
func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, meta TableMeta, taskChan chan<- Task, partition, orderByClause string, currentChunk, totalChunks int) error {
conf := d.conf
tableIR, err := SelectAllFromTable(conf, conn, meta, partition)
if err != nil {
return err
}
tableIR := SelectAllFromTable(conf, meta, partition, orderByClause)
task := NewTaskTableData(meta, tableIR, currentChunk, totalChunks)
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
Expand All @@ -432,7 +429,11 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()))
}
return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1)
orderByClause, err := buildOrderByClause(conf, conn, meta.DatabaseName(), meta.TableName())
if err != nil {
return err
}
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
}

// concurrentDumpTable tries to split table into several chunks to dump
Expand All @@ -443,24 +444,31 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
conf.ServerInfo.ServerVersion != nil &&
(conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 ||
(conf.ServerInfo.HasTiKV && conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0)) {
return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan)
err := d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan)
if err == nil {
return err
}
// don't retry on context error
if errors.ErrorEqual(err, context.Canceled) || errors.ErrorEqual(err, context.DeadlineExceeded) {
return err
}
tctx.L().Warn("fallback to concurrent dump tables using rows due to tidb error",
zap.String("database", db), zap.String("table", tbl), zap.Error(err))
}

orderByClause, err := buildOrderByClause(conf, conn, db, tbl)
if err != nil {
return err
}

field, err := pickupPossibleField(db, tbl, conn, conf)
if err != nil || field == "" {
// skip split chunk logic if not found proper field
tctx.L().Warn("fallback to sequential dump due to no proper field",
zap.String("database", db), zap.String("table", tbl), zap.Error(err))
return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1)
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
}

min, max, err := d.selectMinAndMaxIntValue(conn, db, tbl, field)
if err != nil {
return err
}
tctx.L().Debug("get int bounding values",
zap.String("lower", min.String()),
zap.String("upper", max.String()))

count := estimateCount(d.tctx, db, tbl, conn, field, conf)
tctx.L().Info("get estimated rows count",
zap.String("database", db),
Expand All @@ -473,9 +481,17 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
zap.Uint64("conf.rows", conf.Rows),
zap.String("database", db),
zap.String("table", tbl))
return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1)
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
}

min, max, err := d.selectMinAndMaxIntValue(conn, db, tbl, field)
if err != nil {
return err
}
tctx.L().Debug("get int bounding values",
zap.String("lower", min.String()),
zap.String("upper", max.String()))

// every chunk would have eventual adjustments
estimatedChunks := count / conf.Rows
estimatedStep := new(big.Int).Sub(max, min).Uint64()/estimatedChunks + 1
Expand All @@ -486,15 +502,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
totalChunks = new(big.Int).Sub(max, min).Uint64() + 1
}

selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert)
if err != nil {
return err
}

orderByClause, err := buildOrderByClause(conf, conn, db, tbl)
if err != nil {
return err
}
selectField, selectLen := meta.SelectedField(), meta.SelectedLen()

chunkIndex := 0
nullValueCondition := ""
Expand Down Expand Up @@ -595,7 +603,7 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn
if err != nil {
return err
}
return d.sendConcurrentDumpTiDBTasks(tctx, conn, meta, taskChan, handleColNames, handleVals, "", 0, len(handleVals)+1)
return d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, handleVals, "", 0, len(handleVals)+1)
}

func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partitions []string) error {
Expand All @@ -621,7 +629,7 @@ func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn
cachedHandleVals[i] = handleVals
}
for i, partition := range partitions {
err := d.sendConcurrentDumpTiDBTasks(tctx, conn, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk)
err := d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk)
if err != nil {
return err
}
Expand All @@ -631,17 +639,18 @@ func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn
}

func (d *Dumper) sendConcurrentDumpTiDBTasks(tctx *tcontext.Context,
conn *sql.Conn, meta TableMeta, taskChan chan<- Task,
meta TableMeta, taskChan chan<- Task,
handleColNames []string, handleVals [][]string, partition string, startChunkIdx, totalChunk int) error {
db, tbl := meta.DatabaseName(), meta.TableName()
if len(handleVals) == 0 {
return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, partition, startChunkIdx, totalChunk)
if partition == "" {
// return error to make outside function try using rows method to dump data
return errors.Errorf("empty handleVals for TiDB table `%s`.`%s`", escapeString(db), escapeString(tbl))
}
return d.dumpWholeTableDirectly(tctx, meta, taskChan, partition, buildOrderByClauseString(handleColNames), startChunkIdx, totalChunk)
}
conf := d.conf
db, tbl := meta.DatabaseName(), meta.TableName()
selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert)
if err != nil {
return err
}
selectField, selectLen := meta.SelectedField(), meta.SelectedLen()
where := buildWhereClauses(handleColNames, handleVals)
orderByClause := buildOrderByClauseString(handleColNames)

Expand Down Expand Up @@ -813,7 +822,7 @@ func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, t
}
}

return pkVals, err
return pkVals, nil
}

func extractTiDBRowIDFromDecodedKey(indexField, key string) (string, error) {
Expand Down Expand Up @@ -849,7 +858,7 @@ func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn)

func dumpTableMeta(conf *Config, conn *sql.Conn, db string, table *TableInfo) (TableMeta, error) {
tbl := table.Name
selectField, _, err := buildSelectField(conn, db, tbl, conf.CompleteInsert)
selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert)
if err != nil {
return nil, err
}
Expand All @@ -870,6 +879,7 @@ func dumpTableMeta(conf *Config, conn *sql.Conn, db string, table *TableInfo) (T
table: tbl,
colTypes: colTypes,
selectedField: selectField,
selectedLen: selectLen,
specCmts: []string{
"/*!40101 SET NAMES binary*/;",
},
Expand Down
2 changes: 2 additions & 0 deletions v4/export/ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type TableMeta interface {
ColumnTypes() []string
ColumnNames() []string
SelectedField() string
SelectedLen() int
SpecialComments() StringIter
ShowCreateTable() string
ShowCreateView() string
Expand Down Expand Up @@ -98,6 +99,7 @@ func setTableMetaFromRows(rows *sql.Rows) (TableMeta, error) {
return &tableMeta{
colTypes: tps,
selectedField: strings.Join(nms, ","),
selectedLen: len(nms),
specCmts: []string{"/*!40101 SET NAMES binary*/;"},
}, nil
}
11 changes: 6 additions & 5 deletions v4/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package export

import (
"database/sql"
"fmt"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -254,6 +253,7 @@ type tableMeta struct {
table string
colTypes []*sql.ColumnType
selectedField string
selectedLen int
specCmts []string
showCreateTable string
showCreateView string
Expand Down Expand Up @@ -288,10 +288,11 @@ func (tm *tableMeta) ColumnCount() uint {
}

func (tm *tableMeta) SelectedField() string {
if tm.selectedField == "*" || tm.selectedField == "" {
return tm.selectedField
}
return fmt.Sprintf("(%s)", tm.selectedField)
return tm.selectedField
}

func (tm *tableMeta) SelectedLen() int {
return tm.selectedLen
}

func (tm *tableMeta) SpecialComments() StringIter {
Expand Down
16 changes: 4 additions & 12 deletions v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,23 +188,15 @@ func SelectVersion(db *sql.DB) (string, error) {
}

// SelectAllFromTable dumps data serialized from a specified table
func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta, partition string) (TableDataIR, error) {
func SelectAllFromTable(conf *Config, meta TableMeta, partition, orderByClause string) TableDataIR {
database, table := meta.DatabaseName(), meta.TableName()
selectedField, selectLen, err := buildSelectField(db, database, table, conf.CompleteInsert)
if err != nil {
return nil, err
}

orderByClause, err := buildOrderByClause(conf, db, database, table)
if err != nil {
return nil, err
}
selectedField, selectLen := meta.SelectedField(), meta.SelectedLen()
query := buildSelectQuery(database, table, selectedField, partition, buildWhereCondition(conf, ""), orderByClause)

return &tableData{
query: query,
colLen: selectLen,
}, nil
}
}

func buildSelectQuery(database, table, fields, partition, where, orderByClause string) string {
Expand Down Expand Up @@ -835,7 +827,7 @@ func pickupPossibleField(dbName, tableName string, db *sql.Conn, conf *Config) (
if conf.ServerInfo.ServerType == ServerTypeTiDB {
ok, err := SelectTiDBRowID(db, dbName, tableName)
if err != nil {
return "", nil
return "", err
}
if ok {
return "_tidb_rowid", nil
Expand Down
58 changes: 16 additions & 42 deletions v4/export/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) {
if len(handleColNames) > 0 {
taskChan := make(chan Task, 128)
quotaCols := make([]string, 0, len(handleColNames))
for _, col := range quotaCols {
for _, col := range handleColNames {
quotaCols = append(quotaCols, wrapBackTicks(col))
}
selectFields := strings.Join(quotaCols, ",")
Expand Down Expand Up @@ -688,16 +688,13 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) {
}
mock.ExpectQuery(fmt.Sprintf("SELECT .* FROM `%s`.`%s` TABLESAMPLE REGIONS", database, table)).WillReturnRows(rows)

rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"})
for _, handleCol := range handleColNames {
rows.AddRow(handleCol, "")
}
mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table).
WillReturnRows(rows)
// special case, no value found, will scan whole table and try build order clause
if len(handleVals) == 0 {
mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)).
WillReturnResult(sqlmock.NewResult(0, 0))
// mock second connection error
mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)).
WillReturnError(errors.New("bad connection"))
}

c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil)
Expand All @@ -717,13 +714,13 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) {
// special case, no value found
if len(handleVals) == 0 {
orderByClause = orderByTiDBRowID
query := buildSelectQuery(database, table, "*", "", "", orderByClause)
query := buildSelectQuery(database, table, selectFields, "", "", orderByClause)
checkQuery(0, query)
continue
}

for i, w := range testCase.expectedWhereClauses {
query := buildSelectQuery(database, table, "*", "", buildWhereCondition(d.conf, w), orderByClause)
query := buildSelectQuery(database, table, selectFields, "", buildWhereCondition(d.conf, w), orderByClause)
checkQuery(i, query)
}
}
Expand Down Expand Up @@ -806,6 +803,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) {
ServerType: ServerTypeTiDB,
ServerVersion: gcSafePointVersion,
}
d.conf.Rows = 200000
database := "foo"
table := "bar"

Expand Down Expand Up @@ -884,15 +882,11 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) {

// Test build tasks through table region
taskChan := make(chan Task, 128)
quotaCols := make([]string, 0, len(handleColNames))
for _, col := range quotaCols {
quotaCols = append(quotaCols, wrapBackTicks(col))
}
selectFields := strings.Join(quotaCols, ",")
meta := &tableMeta{
database: database,
table: table,
selectedField: selectFields,
selectedField: "*",
selectedLen: len(handleColNames),
specCmts: []string{
"/*!40101 SET NAMES binary*/;",
},
Expand Down Expand Up @@ -925,18 +919,16 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) {
mock.ExpectQuery("SELECT START_KEY,tidb_decode_key\\(START_KEY\\) from INFORMATION_SCHEMA.TIKV_REGION_STATUS").
WithArgs(database, table).WillReturnRows(rows)

rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"})
for _, handleCol := range handleColNames {
rows.AddRow(handleCol, "")
}
mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table).
WillReturnRows(rows)

orderByClause := buildOrderByClauseString(handleColNames)
// special case, no enough value to split chunks
if len(regionResults) <= 1 {
mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)).
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)).
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectQuery("EXPLAIN SELECT `_tidb_rowid`").
WillReturnRows(sqlmock.NewRows([]string{"id", "count", "task", "operator info"}).
AddRow("IndexReader_5", "0.00", "root", "index:IndexScan_4"))
orderByClause = orderByTiDBRowID
}
c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil)
Expand Down Expand Up @@ -1096,15 +1088,11 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) {

// Test build tasks through table region
taskChan := make(chan Task, 128)
quotaCols := make([]string, 0, len(handleColNames))
for _, col := range quotaCols {
quotaCols = append(quotaCols, wrapBackTicks(col))
}
selectFields := strings.Join(quotaCols, ",")
meta := &tableMeta{
database: database,
table: table,
selectedField: selectFields,
selectedField: "*",
selectedLen: len(handleColNames),
specCmts: []string{
"/*!40101 SET NAMES binary*/;",
},
Expand Down Expand Up @@ -1143,20 +1131,6 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) {
WillReturnRows(rows)
}

for range partitions {
rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"})
for _, handleCol := range handleColNames {
rows.AddRow(handleCol, "")
}
mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table).
WillReturnRows(rows)
// special case, dump whole table
if testCase.dumpWholeTable {
mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)).
WillReturnResult(sqlmock.NewResult(0, 0))
}
}

orderByClause := buildOrderByClauseString(handleColNames)
c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
Expand Down
Loading

0 comments on commit d3ba1e8

Please sign in to comment.