Skip to content

Commit

Permalink
Merge branch 'master' into model2-update
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Dec 6, 2022
2 parents 8361589 + 3ee869e commit db1bc53
Show file tree
Hide file tree
Showing 40 changed files with 11,733 additions and 10,475 deletions.
8 changes: 2 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3519,8 +3519,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:vlgZedcfExiTzB3BB4nt5CpaghDfm9La/0Ofn7weIUA=",
version = "v2.0.3-0.20221129032117-857772dd0907",
sum = "h1:Nr2EhvqkOE9xFyU7LV9c9EbsgN3OzVALdbfobK7Fmn4=",
version = "v2.0.3-0.20221205084317-ad59ca833a78",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down Expand Up @@ -4132,10 +4132,6 @@ def go_deps():
name = "io_etcd_go_etcd_raft_v3",
build_file_proto_mode = "disable_global",
importpath = "go.etcd.io/etcd/raft/v3",
patch_args = ["-p1"],
patches = [
"//build/patches:io_etcd_go_etcd_raft_v3.patch",
],
sum = "h1:uCC37qOXqBvKqTGHGyhASsaCsnTuJugl1GvneJNwHWo=",
version = "v3.5.2",
)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration
var wg sync.WaitGroup
errCh := r.startCheckpointRunner(cctx, &wg)
ticker := time.NewTicker(tickDuration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,11 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
mock.MatchExpectationsInOrder(false)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand All @@ -510,13 +510,13 @@ func TestCheckTableEmpty(t *testing.T) {
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
// test auto retry retryable error
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnError(&gmysql.MySQLError{Number: errno.ErrPDServerTimeout})
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand All @@ -532,11 +532,11 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test2`.`tbl1` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
targetInfoGetter.targetDBGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
// only need to check the one that is not in checkpoint
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test1`.`tbl2` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
err = rc.checkTableEmpty(ctx)
require.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/restore/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ func (g *TargetInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName stri
}
var dump int
err = exec.QueryRow(ctx, "check table empty",
fmt.Sprintf("SELECT 1 FROM %s LIMIT 1", common.UniqueTable(schemaName, tableName)),
// Here we use the `USE INDEX()` hint to skip fetch the record from index.
// In Lightning, if previous importing is halted half-way, it is possible that
// the data is partially imported, but the index data has not been imported.
// In this situation, if no hint is added, the SQL executor might fetch the record from index,
// which is empty. This will result in missing check.
fmt.Sprintf("SELECT 1 FROM %s USE INDEX() LIMIT 1", common.UniqueTable(schemaName, tableName)),
&dump,
)

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NoError(t, err)
require.Equal(t, lnConfig, targetGetter.cfg)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnError(&mysql_sql_driver.MySQLError{
Number: errno.ErrNoSuchTable,
Message: "Table 'test_db.test_tbl' doesn't exist",
Expand All @@ -772,7 +772,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, true, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(
sqlmock.NewRows([]string{"1"}).
RowError(0, sql.ErrNoRows),
Expand All @@ -782,7 +782,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, true, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnRows(
sqlmock.NewRows([]string{"1"}).AddRow(1),
)
Expand All @@ -791,7 +791,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NotNil(t, pIsEmpty)
require.Equal(t, false, *pIsEmpty)

mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` LIMIT 1").
mock.ExpectQuery("SELECT 1 FROM `test_db`.`test_tbl` USE INDEX\\(\\) LIMIT 1").
WillReturnError(errors.New("some dummy error"))
_, err = targetGetter.IsTableEmpty(ctx, "test_db", "test_tbl")
require.Error(t, err)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
if cp.Status < checkpoints.CheckpointStatusIndexImported {
var err error
if indexEngineCp.Status < checkpoints.CheckpointStatusImported {
failpoint.Inject("FailBeforeStartImportingIndexEngine", func() {
errMsg := "fail before importing index KV data"
tr.logger.Warn(errMsg)
failpoint.Return(errors.New(errMsg))
})
err = tr.importKV(ctx, closedIndexEngine, rc, indexEngineID)
failpoint.Inject("FailBeforeIndexEngineImported", func() {
finished := rc.status.FinishedFileSize.Load()
Expand Down
5 changes: 5 additions & 0 deletions br/tests/lightning_check_partial_imported/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tikv-importer]
backend = "local"

[mydumper.csv]
header = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE tbl01 (
`id` INTEGER,
`val` VARCHAR(64),
`aaa` CHAR(66) DEFAULT NULL,
`bbb` CHAR(10) NOT NULL,
`ccc` CHAR(42) DEFAULT NULL,
`ddd` CHAR(42) DEFAULT NULL,
`eee` CHAR(66) DEFAULT NULL,
`fff` VARCHAR(128) DEFAULT NULL,
KEY `aaa` (`aaa`),
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
6 changes: 6 additions & 0 deletions br/tests/lightning_check_partial_imported/data/db01.tbl01.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,val,aaa,bbb,ccc,ddd,eee,fff
1,"v01","a01","b01","c01","d01","e01","f01"
2,"v02","a02","b02","c02","d02","e02","f02"
3,"v03","a03","b03","c03","d03","e03","f03"
4,"v04","a04","b04","c04","d04","e04","f04"
5,"v05","a05","b05","c05","d05","e05","f05"
47 changes: 47 additions & 0 deletions br/tests/lightning_check_partial_imported/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

MYDIR=$(dirname "${BASH_SOURCE[0]}")
set -eux

check_cluster_version 4 0 0 'local backend' || exit 0

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/FailBeforeStartImportingIndexEngine=return"
set +e
if run_lightning; then
echo "The first import doesn't fail as expected" >&2
exit 1
fi
set -e

data_records=$(tail -n +2 "${MYDIR}/data/db01.tbl01.csv" | wc -l | xargs echo )
run_sql "SELECT COUNT(*) FROM db01.tbl01 USE INDEX();"
check_contains "${data_records}"

export GO_FAILPOINTS=""
set +e
if run_lightning --check-requirements=1; then
echo "The pre-check doesn't find out the non-empty table problem"
exit 2
fi
set -e

run_sql "TRUNCATE TABLE db01.tbl01;"
run_lightning --check-requirements=1
run_sql "SELECT COUNT(*) FROM db01.tbl01;"
check_contains "${data_records}"
run_sql "SELECT COUNT(*) FROM db01.tbl01 USE INDEX();"
check_contains "${data_records}"
22 changes: 0 additions & 22 deletions build/patches/io_etcd_go_etcd_raft_v3.patch

This file was deleted.

1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ go_test(
"explainfor_test.go",
"grant_test.go",
"hash_table_test.go",
"historical_stats_test.go",
"hot_regions_history_table_test.go",
"index_advise_test.go",
"index_lookup_join_test.go",
Expand Down
102 changes: 0 additions & 102 deletions executor/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package analyzetest

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -2163,107 +2162,6 @@ func TestAnalyzeColumnsErrorAndWarning(t *testing.T) {
}
}

func TestRecordHistoryStatsAfterAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_analyze_version = 2")
tk.MustExec("set global tidb_enable_historical_stats = 0")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b varchar(10))")

h := dom.StatsHandle()
is := dom.InfoSchema()
tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

// 1. switch off the tidb_enable_historical_stats, and there is no records in table `mysql.stats_history`
rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Rows()
num, _ := strconv.Atoi(rows[0][0].(string))
require.Equal(t, num, 0)

tk.MustExec("analyze table t with 2 topn")
rows = tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Rows()
num, _ = strconv.Atoi(rows[0][0].(string))
require.Equal(t, num, 0)

// 2. switch on the tidb_enable_historical_stats and do analyze
tk.MustExec("set global tidb_enable_historical_stats = 1")
defer tk.MustExec("set global tidb_enable_historical_stats = 0")
tk.MustExec("analyze table t with 2 topn")
// dump historical stats
hsWorker := dom.GetHistoricalStatsWorker()
tblID := hsWorker.GetOneHistoricalStatsTable()
err = hsWorker.DumpHistoricalStats(tblID, h)
require.Nil(t, err)
rows = tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Rows()
num, _ = strconv.Atoi(rows[0][0].(string))
require.GreaterOrEqual(t, num, 1)

// 3. dump current stats json
dumpJSONTable, err := h.DumpStatsToJSON("test", tableInfo.Meta(), nil, true)
require.NoError(t, err)
jsOrigin, _ := json.Marshal(dumpJSONTable)

// 4. get the historical stats json
rows = tk.MustQuery(fmt.Sprintf("select * from mysql.stats_history where table_id = '%d' and create_time = ("+
"select create_time from mysql.stats_history where table_id = '%d' order by create_time desc limit 1) "+
"order by seq_no", tableInfo.Meta().ID, tableInfo.Meta().ID)).Rows()
num = len(rows)
require.GreaterOrEqual(t, num, 1)
data := make([][]byte, num)
for i, row := range rows {
data[i] = []byte(row[1].(string))
}
jsonTbl, err := handle.BlocksToJSONTable(data)
require.NoError(t, err)
jsCur, err := json.Marshal(jsonTbl)
require.NoError(t, err)
// 5. historical stats must be equal to the current stats
require.JSONEq(t, string(jsOrigin), string(jsCur))
}

func TestRecordHistoryStatsMetaAfterAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_analyze_version = 2")
tk.MustExec("set global tidb_enable_historical_stats = 0")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("analyze table test.t")

h := dom.StatsHandle()
is := dom.InfoSchema()
tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

// 1. switch off the tidb_enable_historical_stats, and there is no record in table `mysql.stats_meta_history`
tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0"))
// insert demo tuples, and there is no record either.
insertNums := 5
for i := 0; i < insertNums; i++ {
tk.MustExec("insert into test.t (a,b) values (1,1), (2,2), (3,3)")
err := h.DumpStatsDeltaToKV(handle.DumpDelta)
require.NoError(t, err)
}
tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0"))

// 2. switch on the tidb_enable_historical_stats and insert tuples to produce count/modifyCount delta change.
tk.MustExec("set global tidb_enable_historical_stats = 1")
defer tk.MustExec("set global tidb_enable_historical_stats = 0")

for i := 0; i < insertNums; i++ {
tk.MustExec("insert into test.t (a,b) values (1,1), (2,2), (3,3)")
err := h.DumpStatsDeltaToKV(handle.DumpDelta)
require.NoError(t, err)
}
tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta_history where table_id = '%d' order by create_time", tableInfo.Meta().ID)).Sort().Check(
testkit.Rows("18 18", "21 21", "24 24", "27 27", "30 30"))
}

func checkAnalyzeStatus(t *testing.T, tk *testkit.TestKit, jobInfo, status, failReason, comment string, timeLimit int64) {
rows := tk.MustQuery("show analyze status where table_schema = 'test' and table_name = 't' and partition_name = ''").Rows()
require.Equal(t, 1, len(rows), comment)
Expand Down
Loading

0 comments on commit db1bc53

Please sign in to comment.