Skip to content

Commit

Permalink
import: check table empty skips using index (#39604)
Browse files Browse the repository at this point in the history
close #39477
  • Loading branch information
dsdashun authored Dec 5, 2022
1 parent 5127ad2 commit 35819ee
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 16 deletions.
22 changes: 11 additions & 11 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,11 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
mock.MatchExpectationsInOrder(false)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand All @@ -510,13 +510,13 @@ func TestCheckTableEmpty(t *testing.T) {
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
// test auto retry retryable error
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnError(&gmysql.MySQLError{Number: errno.ErrPDServerTimeout})
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand All @@ -532,11 +532,11 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
// only need to check the one that is not in checkpoint
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
err = rc.checkTableEmpty(ctx)
require.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/restore/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ func (g *TargetInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName stri
}
var dump int
err = exec.QueryRow(ctx, "check table empty",
fmt.Sprintf("SELECT 1 FROM %s LIMIT 1", common.UniqueTable(schemaName, tableName)),
// Here we use the `USE INDEX()` hint to skip fetch the record from index.
// In Lightning, if previous importing is halted half-way, it is possible that
// the data is partially imported, but the index data has not been imported.
// In this situation, if no hint is added, the SQL executor might fetch the record from index,
// which is empty. This will result in missing check.
fmt.Sprintf("SELECT 1 FROM %s USE INDEX() LIMIT 1", common.UniqueTable(schemaName, tableName)),
&dump,
)

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NoError(t, err)
require.Equal(t, lnConfig, targetGetter.cfg)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnError(&mysql_sql_driver.MySQLError{
Number: errno.ErrNoSuchTable,
Message: "Table 'test_db.test_tbl' doesn't exist",
Expand All @@ -772,7 +772,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, true, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(
sqlmock.NewRows([]string{"1"}).
RowError(0, sql.ErrNoRows),
Expand All @@ -782,7 +782,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, true, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(
sqlmock.NewRows([]string{"1"}).AddRow(1),
)
Expand All @@ -791,7 +791,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, false, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnError(errors.New("some dummy error"))
_, err = targetGetter.IsTableEmpty(ctx, "test_db", "test_tbl")
require.Error(t, err)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
if cp.Status < checkpoints.CheckpointStatusIndexImported {
var err error
if indexEngineCp.Status < checkpoints.CheckpointStatusImported {
failpoint.Inject("FailBeforeStartImportingIndexEngine", func() {
errMsg := "fail before importing index KV data"
tr.logger.Warn(errMsg)
failpoint.Return(errors.New(errMsg))
})
err = tr.importKV(ctx, closedIndexEngine, rc, indexEngineID)
failpoint.Inject("FailBeforeIndexEngineImported", func() {
finished := rc.status.FinishedFileSize.Load()
Expand Down
5 changes: 5 additions & 0 deletions br/tests/lightning_check_partial_imported/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tikv-importer]
backend = "local"

[mydumper.csv]
header = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE tbl01 (
`id` INTEGER,
`val` VARCHAR(64),
`aaa` CHAR(66) DEFAULT NULL,
`bbb` CHAR(10) NOT NULL,
`ccc` CHAR(42) DEFAULT NULL,
`ddd` CHAR(42) DEFAULT NULL,
`eee` CHAR(66) DEFAULT NULL,
`fff` VARCHAR(128) DEFAULT NULL,
KEY `aaa` (`aaa`),
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
6 changes: 6 additions & 0 deletions br/tests/lightning_check_partial_imported/data/db01.tbl01.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,val,aaa,bbb,ccc,ddd,eee,fff
1,"v01","a01","b01","c01","d01","e01","f01"
2,"v02","a02","b02","c02","d02","e02","f02"
3,"v03","a03","b03","c03","d03","e03","f03"
4,"v04","a04","b04","c04","d04","e04","f04"
5,"v05","a05","b05","c05","d05","e05","f05"
47 changes: 47 additions & 0 deletions br/tests/lightning_check_partial_imported/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

MYDIR=$(dirname "${BASH_SOURCE[0]}")
set -eux

check_cluster_version 4 0 0 'local backend' || exit 0

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/FailBeforeStartImportingIndexEngine=return"
set +e
if run_lightning; then
echo "The first import doesn't fail as expected" >&2
exit 1
fi
set -e

data_records=$(tail -n +2 "${MYDIR}/data/db01.tbl01.csv" | wc -l | xargs echo )
run_sql "SELECT COUNT(*) FROM db01.tbl01 USE INDEX();"
check_contains "${data_records}"

export GO_FAILPOINTS=""
set +e
if run_lightning --check-requirements=1; then
echo "The pre-check doesn't find out the non-empty table problem"
exit 2
fi
set -e

run_sql "TRUNCATE TABLE db01.tbl01;"
run_lightning --check-requirements=1
run_sql "SELECT COUNT(*) FROM db01.tbl01;"
check_contains "${data_records}"
run_sql "SELECT COUNT(*) FROM db01.tbl01 USE INDEX();"
check_contains "${data_records}"

0 comments on commit 35819ee

Please sign in to comment.