Skip to content

Commit

Permalink
Merge branch 'stable-TestBlocked' of https://github.com/longfangsong/…
Browse files Browse the repository at this point in the history
…tidb into stable-TestBlocked
  • Loading branch information
longfangsong committed Aug 24, 2021
2 parents 57ba0c3 + c847045 commit 2291450
Show file tree
Hide file tree
Showing 77 changed files with 1,463 additions and 456 deletions.
32 changes: 32 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -2453,6 +2454,37 @@ func (s *testSerialSuite) TestOptimizeOnlyOnce(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/planner/checkOptimizeCountOne"), IsNil)
}

func (s *testSerialSuite) TestIssue26377(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_global_temporary_table = true")
tk.MustExec("set @@tidb_enable_noop_functions=1;")
tk.MustExec("drop table if exists t1,tmp1")
tk.MustExec("create table t1(a int(11))")
tk.MustExec("create global temporary table tmp1(a int(11), key idx_a(a)) on commit delete rows;")
tk.MustExec("create temporary table tmp2(a int(11), key idx_a(a));")

queries := []string{
"create global binding for select * from t1 inner join tmp1 on t1.a=tmp1.a using select * from t1 inner join tmp1 on t1.a=tmp1.a;",
"create global binding for select * from t1 where t1.a in (select a from tmp1) using select * from t1 where t1.a in (select a from tmp1 use index (idx_a));",
"create global binding for select a from t1 union select a from tmp1 using select a from t1 union select a from tmp1 use index (idx_a);",
"create global binding for select t1.a, (select a from tmp1 where tmp1.a=1) as t2 from t1 using select t1.a, (select a from tmp1 where tmp1.a=1) as t2 from t1;",
"create global binding for select * from (select * from tmp1) using select * from (select * from tmp1);",
}
genLocalTemporarySQL := func(sql string) string {
return strings.Replace(sql, "tmp1", "tmp2", -1)
}
for _, query := range queries {
localSQL := genLocalTemporarySQL(query)
queries = append(queries, localSQL)
}

for _, q := range queries {
tk.MustGetErrCode(q, errno.ErrOptOnTemporaryTable)
}
}

func (s *testSuite) TestCaptureFilter(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -2755,8 +2755,8 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
l := len(w.writeBatch)
cnt := w.batchCount
var lastKey []byte
if len(w.writeBatch) > 0 {
lastKey = w.writeBatch[len(w.writeBatch)-1].Key
if cnt > 0 {
lastKey = w.writeBatch[cnt-1].Key
}
for _, pair := range kvs {
if w.isWriteBatchSorted && bytes.Compare(lastKey, pair.Key) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) {
f.wg.Add(1)
go f.ingestSSTLoop()
sorted := needSort && !partitialSort
w, err := openLocalWriter(context.Background(), &backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1<<20)
w, err := openLocalWriter(context.Background(), &backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down
10 changes: 1 addition & 9 deletions br/pkg/lightning/mydump/parquet_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"path/filepath"
"strconv"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -82,13 +81,6 @@ func (s testParquetParserSuite) TestParquetParser(c *C) {
}

func (s testParquetParserSuite) TestParquetVariousTypes(c *C) {
// those deprecated TIME/TIMESTAMP types depend on the local timezone!
prevTZ := time.Local
time.Local = time.FixedZone("UTC+8", 8*60*60)
defer func() {
time.Local = prevTZ
}()

type Test struct {
Date int32 `parquet:"name=date, type=DATE"`
TimeMillis int32 `parquet:"name=timemillis, type=TIME_MILLIS"`
Expand All @@ -114,7 +106,7 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) {

v := &Test{
Date: 18564, // 2020-10-29
TimeMillis: 62775123, // 17:26:15.123 (note all time are in UTC+8!)
TimeMillis: 62775123, // 17:26:15.123
TimeMicros: 62775123456, // 17:26:15.123
TimestampMillis: 1603963672356, // 2020-10-29T09:27:52.356Z
TimestampMicros: 1603963672356956, // 2020-10-29T09:27:52.356956Z
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
switch errors.Cause(err) { // nolint:errorlint
e := errors.Cause(err)
switch e { // nolint:errorlint
case berrors.ErrKVEpochNotMatch, berrors.ErrKVDownloadFailed, berrors.ErrKVIngestFailed:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
Expand All @@ -65,7 +66,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 0
bo.attempt = 0
default:
switch status.Code(err) {
switch status.Code(e) {
case codes.Unavailable, codes.Aborted:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ func (importer *FileImporter) Import(
log.Debug("failpoint restore-storage-error injected.", zap.String("msg", msg))
e = errors.Annotate(e, msg)
})
failpoint.Inject("restore-gRPC-error", func(_ failpoint.Value) {
log.Warn("the connection to TiKV has been cut by a neko, meow :3")
e = status.Error(codes.Unavailable, "the connection to TiKV has been cut by a neko, meow :3")
})
if e != nil {
remainFiles = remainFiles[i:]
return errors.Trace(e)
Expand Down
111 changes: 75 additions & 36 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/tikv/pd/pkg/codec"
Expand Down Expand Up @@ -294,6 +295,33 @@ func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*Regi
}
}

func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
// current pd can't guarantee the consistency of returned regions
if len(regions) == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endkey: %s",
redact.Key(startKey), redact.Key(endKey))
}

if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s",
redact.Key(startKey), redact.Key(regions[0].Region.StartKey))
} else if len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < startKey, startKey: %s, regionStartKey: %s",
redact.Key(endKey), redact.Key(regions[len(regions)-1].Region.EndKey))
}

cur := regions[0]
for _, r := range regions[1:] {
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s",
redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey))
}
cur = r
}

return nil
}

// PaginateScanRegion scan regions with a limit pagination and
// return all regions at once.
// It reduces max gRPC message size.
Expand All @@ -305,50 +333,61 @@ func PaginateScanRegion(
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}

regions := []*RegionInfo{}
scanStartKey := startKey
for {
batch, err := client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
return nil, errors.Trace(err)
}
regions = append(regions, batch...)
if len(batch) < limit {
// No more region
break
var regions []*RegionInfo
err := utils.WithRetry(ctx, func() error {
regions = []*RegionInfo{}
scanStartKey := startKey
for {
batch, err := client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
return errors.Trace(err)
}
regions = append(regions, batch...)
if len(batch) < limit {
// No more region
break
}
scanStartKey = batch[len(batch)-1].Region.GetEndKey()
if len(scanStartKey) == 0 ||
(len(endKey) > 0 && bytes.Compare(scanStartKey, endKey) >= 0) {
// All key space have scanned
break
}
}
scanStartKey = batch[len(batch)-1].Region.GetEndKey()
if len(scanStartKey) == 0 ||
(len(endKey) > 0 && bytes.Compare(scanStartKey, endKey) >= 0) {
// All key space have scanned
break
if err := checkRegionConsistency(startKey, endKey, regions); err != nil {
log.Warn("failed to scan region, retrying", logutil.ShortError(err))
return err
}
}
return nil
}, newScanRegionBackoffer())

// current pd can't guarantee the consistency of returned regions
if len(regions) == 0 {
return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endkey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
return regions, err
}

if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 {
return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(regions[0].Region.StartKey))
} else if len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < startKey, startKey: %s, regionStartKey: %s",
hex.EncodeToString(endKey), hex.EncodeToString(regions[len(regions)-1].Region.EndKey))
type scanRegionBackoffer struct {
attempt int
}

func newScanRegionBackoffer() utils.Backoffer {
return &scanRegionBackoffer{
attempt: 3,
}
}

cur := regions[0]
for _, r := range regions[1:] {
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s",
hex.EncodeToString(cur.Region.EndKey), hex.EncodeToString(r.Region.StartKey))
}
cur = r
// NextBackoff returns a duration to wait before retrying again
func (b *scanRegionBackoffer) NextBackoff(err error) time.Duration {
if berrors.ErrPDBatchScanRegion.Equal(err) {
// 500ms * 3 could be enough for splitting remain regions in the hole.
b.attempt--
return 500 * time.Millisecond
}
b.attempt = 0
return 0
}

return regions, nil
// Attempt returns the remain attempt times
func (b *scanRegionBackoffer) Attempt() int {
return b.attempt
}

// getSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of
Expand Down
25 changes: 25 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/spf13/pflag"
"go.uber.org/zap"
)
Expand All @@ -43,6 +44,7 @@ const (
maxRetries = 7
// max number of retries when meets error
maxErrorRetries = 3
ec2MetaAddress = "169.254.169.254"

// the maximum number of byte to read for seek.
maxSkipOffsetByRead = 1 << 16 // 64KB
Expand Down Expand Up @@ -736,6 +738,29 @@ type retryerWithLog struct {
client.DefaultRetryer
}

func isDeadlineExceedError(err error) bool {
// TODO find a better way.
// Known challenges:
//
// If we want to unwrap the r.Error:
// 1. the err should be an awserr.Error (let it be awsErr)
// 2. awsErr.OrigErr() should be an *url.Error (let it be urlErr).
// 3. urlErr.Err should be a http.httpError (which is private).
//
// If we want to reterive the error from the request context:
// The error of context in the HTTPRequest (i.e. r.HTTPRequest.Context().Err() ) is nil.
return strings.Contains(err.Error(), "context deadline exceeded")
}

func (rl retryerWithLog) ShouldRetry(r *request.Request) bool {
if isDeadlineExceedError(r.Error) && r.HTTPRequest.URL.Host == ec2MetaAddress {
// fast fail for unreachable linklocal address in EC2 containers.
log.Warn("failed to get EC2 metadata. skipping.", logutil.ShortError(r.Error))
return false
}
return rl.DefaultRetryer.ShouldRetry(r)
}

func (rl retryerWithLog) RetryRules(r *request.Request) time.Duration {
backoffTime := rl.DefaultRetryer.RetryRules(r)
if backoffTime > 0 {
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ for ct in limit lz4 zstd; do

# restore full
echo "restore with $ct backup start..."
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/restore-storage-error=1*return(\"connection refused\")"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/restore-storage-error=1*return(\"connection refused\");github.com/pingcap/tidb/br/pkg/restore/restore-gRPC-error=1*return(true)"
run_br restore full -s "local://$TEST_DIR/$DB-$ct" --pd $PD_ADDR --ratelimit 1024
export GO_FAILPOINTS=""

Expand Down
13 changes: 13 additions & 0 deletions cmd/explaintest/r/explain_generate_column_substitute.result
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,19 @@ desc format = 'brief' select b from t;
id estRows task access object operator info
IndexReader 10000.00 root index:IndexFullScan
└─IndexFullScan 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo
create table t01(a varchar(20));
insert into t01 values ("齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙");
alter table t01 add index eidx ((concat_ws('expression_index', a, 'test')));
select * from t01 use index (eidx) where (concat_ws('expression_index', a, 'test')) not like (concat_ws('expression_index', "齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙", 'test'));
a
insert into t01 values ("齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙");
select * from t01 use index (eidx) where (concat_ws('expression_index', a, 'test')) like (concat_ws('expression_index', "齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙", 'test'));
a
齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙
齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙
drop table if exists t1;
create table t1(a char, b varchar(20), c char, d varchar(20));
alter table t1 add index eidx ((export_set(3, a, c, ',', 5)));
create table t02 (a varchar(20));
insert into t02 values ('a'), ('b'), ('c');
select * from t02 where lower(a) < 'c';
Expand Down
10 changes: 10 additions & 0 deletions cmd/explaintest/t/explain_generate_column_substitute.test
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ create table t(a int, b int as (a+1), key((a+1)), key(b));
desc format = 'brief' select a+1 from t;
desc format = 'brief' select b from t;

create table t01(a varchar(20));
insert into t01 values ("齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙");
alter table t01 add index eidx ((concat_ws('expression_index', a, 'test')));
select * from t01 use index (eidx) where (concat_ws('expression_index', a, 'test')) not like (concat_ws('expression_index', "齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙", 'test'));
insert into t01 values ("齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙");
select * from t01 use index (eidx) where (concat_ws('expression_index', a, 'test')) like (concat_ws('expression_index', "齆斮聒蚆髙锐潊贩哨啅捸爖斥圱犳飁綴纜牖蚙", 'test'));

drop table if exists t1;
create table t1(a char, b varchar(20), c char, d varchar(20));
alter table t1 add index eidx ((export_set(3, a, c, ',', 5)));
create table t02 (a varchar(20));
insert into t02 values ('a'), ('b'), ('c');
select * from t02 where lower(a) < 'c';
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ type IsolationRead struct {
type Experimental struct {
// Whether enable global kill.
EnableGlobalKill bool `toml:"enable-global-kill" json:"-"`
// Whether enable charset feature.
EnableNewCharset bool `toml:"enable-new-charset" json:"-"`
}

var defTiKVCfg = tikvcfg.DefaultConfig()
Expand Down Expand Up @@ -670,6 +672,7 @@ var defaultConf = Config{
},
Experimental: Experimental{
EnableGlobalKill: false,
EnableNewCharset: false,
},
EnableCollectExecutionInfo: true,
EnableTelemetry: true,
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ deadlock-history-collect-retryable = true
require.Equal(t, uint64(30), conf.StoresRefreshInterval)
require.Equal(t, uint(123), conf.PessimisticTxn.DeadlockHistoryCapacity)
require.True(t, conf.PessimisticTxn.DeadlockHistoryCollectRetryable)
require.False(t, conf.Experimental.EnableNewCharset)

_, err = f.WriteString(`
[log.file]
Expand Down
4 changes: 0 additions & 4 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2951,10 +2951,6 @@ func (s *testIntegrationSuite3) TestCreateTemporaryTable(c *C) {
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

// Considering snapshot, local temporary table is always visible.
tk.MustExec("set @@tidb_snapshot = '2016-01-01 15:04:05.999999'")
tk.MustExec("select * from overlap")
}

func (s *testIntegrationSuite3) TestAvoidCreateViewOnLocalTemporaryTable(c *C) {
Expand Down
Loading

0 comments on commit 2291450

Please sign in to comment.