Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into sessionctx-code-i…
Browse files Browse the repository at this point in the history
…mprovements

* upstream/master: (56 commits)
  expression: make a copy when retrieving json path expression from cache (pingcap#38285)
  executor: make unit test TestIssue21732 stable (pingcap#38278)
  variables: enable variable hook to access storage (pingcap#38227)
  docs: update roadmap description (pingcap#38253)
  systable: Adapt with TiFlash system table update (pingcap#38191)
  planner, util/ranger: restrict mem usage for index join inner ranges (pingcap#38129)
  planner: separate cost model ver1/ver2 into different files (part 2) (pingcap#38273)
  metrics: Add IOPs, Disk Throughput to the overview dashboard (pingcap#38270)
  planner: refine index join range display for clustered index (pingcap#38259)
  *: delete `mInFlashbackCluster` related codes (pingcap#38241)
  session: turn the transaction to pessimistic mode when retrying (pingcap#38130)
  executor: enable race for executor/aggfuncs (pingcap#38262)
  planner: reopen CTE in apply only when correlate (pingcap#38214)
  planner: enable revive (pingcap#36649)
  *: add foreign key constraint check when execute update statement (pingcap#38156)
  *: DATA RACE in the temptable.AttachLocalTemporaryTableInfoSchema (pingcap#38237)
  executor,sessionctx: enable coprocessor paging and make TestCoprocessorPagingSize stable (pingcap#38161)
  domain: fix data race in the Domain.bindHandle (pingcap#38223)
  test: add information if test failed (pingcap#38234)
  preprocessor, planner: inline processing by default for single-consumer cte (pingcap#37800)
  ...
  • Loading branch information
morgo committed Oct 4, 2022
2 parents 6f889a6 + 51a6684 commit 06bcee9
Show file tree
Hide file tree
Showing 186 changed files with 16,132 additions and 11,093 deletions.
19 changes: 17 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,13 @@ def go_deps():
sum = "h1:0Vihzu20St42/UDsvZGdNE6jak7oi/UOeMzwMPHkgFY=",
version = "v3.2.0+incompatible",
)
go_repository(
name = "com_github_jarcoal_httpmock",
build_file_proto_mode = "disable",
importpath = "github.com/jarcoal/httpmock",
sum = "h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc=",
version = "v1.2.0",
)

go_repository(
name = "com_github_jcmturner_aescts_v2",
Expand Down Expand Up @@ -2326,6 +2333,14 @@ def go_deps():
sum = "h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=",
version = "v1.0.1",
)
go_repository(
name = "com_github_maxatome_go_testdeep",
build_file_proto_mode = "disable",
importpath = "github.com/maxatome/go-testdeep",
sum = "h1:Tgh5efyCYyJFGUYiT0qxBSIDeXw0F5zSoatlou685kk=",
version = "v1.11.0",
)

go_repository(
name = "com_github_mbilski_exhaustivestruct",
build_file_proto_mode = "disable",
Expand Down Expand Up @@ -4453,8 +4468,8 @@ def go_deps():
name = "org_uber_go_goleak",
build_file_proto_mode = "disable_global",
importpath = "go.uber.org/goleak",
sum = "h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=",
version = "v1.1.12",
sum = "h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=",
version = "v1.2.0",
)
go_repository(
name = "org_uber_go_multierr",
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dev: checklist check explaintest gogenerate br_unit_test test_part_parser_dev ut
# Install the check tools.
check-setup:tools/bin/revive

check: check-parallel lint tidy testSuite errdoc bazel_all_build
check: parser_yacc check-parallel lint tidy testSuite errdoc bazel_all_build

fmt:
@echo "gofmt (simplify)"
Expand Down
4 changes: 2 additions & 2 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ func TestBindCTEMerge(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id int)")
require.True(t, tk.HasPlan("with cte as (select * from t1) select * from cte", "CTEFullScan"))
require.False(t, tk.HasPlan("with cte as (select /*+ MERGE() */ * from t1) select * from cte", "CTEFullScan"))
require.True(t, tk.HasPlan("with cte as (select * from t1) select * from cte a, cte b", "CTEFullScan"))
require.False(t, tk.HasPlan("with cte as (select /*+ MERGE() */ * from t1) select * from cte a, cte b", "CTEFullScan"))
tk.MustExec(`
create global binding for
with cte as (select * from t1) select * from cte
Expand Down
4 changes: 2 additions & 2 deletions bindinfo/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,9 @@ func TestIssue25505(t *testing.T) {
spmMap["with recursive `cte` ( `a` ) as ( select ? union select `a` + ? from `test` . `t1` where `a` > ? ) select * from `cte`"] =
"WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` > 5) SELECT /*+ hash_agg(@`sel_1`), use_index(@`sel_3` `test`.`t1` `idx_b`)*/ * FROM `cte`"
spmMap["with `cte` as ( with `cte1` as ( select * from `test` . `t2` where `a` > ? and `b` > ? ) select * from `cte1` ) select * from `cte` join `test` . `t1` on `t1` . `a` = `cte` . `a`"] =
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` > 1 AND `b` > 1) SELECT * FROM `cte1`) SELECT /*+ inl_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` `idx_ab`), use_index(@`sel_3` `test`.`t2` `idx_ab`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` > 1 AND `b` > 1) SELECT * FROM `cte1`) SELECT /*+ use_index(@`sel_3` `test`.`t2` `idx_ab`), use_index(@`sel_1` `test`.`t1` `idx_ab`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
spmMap["with `cte` as ( with `cte1` as ( select * from `test` . `t2` where `a` = ? and `b` = ? ) select * from `cte1` ) select * from `cte` join `test` . `t1` on `t1` . `a` = `cte` . `a`"] =
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` = 1 AND `b` = 1) SELECT * FROM `cte1`) SELECT /*+ inl_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` `idx_a`), use_index(@`sel_3` `test`.`t2` `idx_a`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` = 1 AND `b` = 1) SELECT * FROM `cte1`) SELECT /*+ use_index(@`sel_3` `test`.`t2` `idx_a`), use_index(@`sel_1` `test`.`t1` `idx_a`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"

tk.MustExec("with cte as (with cte1 as (select /*+use_index(t2 idx_a)*/ * from t2 where a = 1 and b = 1) select * from cte1) select /*+use_index(t1 idx_a)*/ * from cte join t1 on t1.a=cte.a;")
tk.MustExec("with cte as (with cte1 as (select /*+use_index(t2 idx_a)*/ * from t2 where a = 1 and b = 1) select * from cte1) select /*+use_index(t1 idx_a)*/ * from cte join t1 on t1.a=cte.a;")
Expand Down
31 changes: 19 additions & 12 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,27 @@ type bindRecordUpdate struct {
// NewBindHandle creates a new BindHandle.
func NewBindHandle(ctx sessionctx.Context) *BindHandle {
handle := &BindHandle{}
handle.sctx.Context = ctx
handle.bindInfo.Value.Store(newBindCache())
handle.bindInfo.parser = parser.New()
handle.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
return handle.DropBindRecord(record.OriginalSQL, record.Db, &record.Bindings[0])
}
handle.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
handle.Reset(ctx)
return handle
}

// Reset is to reset the BindHandle and clean old info.
func (h *BindHandle) Reset(ctx sessionctx.Context) {
h.bindInfo.Lock()
defer h.bindInfo.Unlock()
h.sctx.Context = ctx
h.bindInfo.Value.Store(newBindCache())
h.bindInfo.parser = parser.New()
h.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
return h.DropBindRecord(record.OriginalSQL, record.Db, &record.Bindings[0])
}
h.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
// BindSQL has already been validated when coming here, so we use nil sctx parameter.
return handle.AddBindRecord(nil, record)
return h.AddBindRecord(nil, record)
}
variable.RegisterStatistics(handle)
return handle
variable.RegisterStatistics(h)
}

// Update updates the global sql bind cache.
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,11 @@ func NewSession(options *SessionOptions, logger log.Logger) sessionctx.Context {
}

func newSession(options *SessionOptions, logger log.Logger) *session {
s := &session{
values: make(map[fmt.Stringer]interface{}, 1),
}
sqlMode := options.SQLMode
vars := variable.NewSessionVars()
vars := variable.NewSessionVars(s)
vars.SkipUTF8Check = true
vars.StmtCtx.InInsertStmt = true
vars.StmtCtx.BatchCheck = true
Expand Down Expand Up @@ -289,10 +292,7 @@ func newSession(options *SessionOptions, logger log.Logger) *session {
log.ShortError(err))
}
vars.TxnCtx = nil
s := &session{
vars: vars,
values: make(map[fmt.Stringer]interface{}, 1),
}
s.vars = vars
s.txn.kvPairs = &KvPairs{}

return s
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
flaky = True,
deps = [
":config",
"//br/pkg/lightning/common",
"//parser/mysql",
"@com_github_burntsushi_toml//:toml",
"@com_github_stretchr_testify//require",
Expand Down
13 changes: 10 additions & 3 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const (
var (
supportedStorageTypes = []string{"file", "local", "s3", "noop", "gcs", "gs"}

DefaultFilter = []string{
defaultFilter = []string{
"*.*",
"!mysql.*",
"!sys.*",
Expand All @@ -109,6 +109,13 @@ var (
}
)

// GetDefaultFilter gets the default table filter used in Lightning.
// It clones the original default filter,
// so that the original value won't be changed when the returned slice's element is changed.
func GetDefaultFilter() []string {
return append([]string{}, defaultFilter...)
}

type DBStore struct {
Host string `toml:"host" json:"host"`
Port int `toml:"port" json:"port"`
Expand Down Expand Up @@ -715,7 +722,7 @@ func NewConfig() *Config {
},
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Filter: DefaultFilter,
Filter: GetDefaultFilter(),
DataCharacterSet: defaultCSVDataCharacterSet,
DataInvalidCharReplace: string(defaultCSVDataInvalidCharReplace),
},
Expand Down Expand Up @@ -890,7 +897,7 @@ func (cfg *Config) Adjust(ctx context.Context) error {
// mydumper.filter and black-white-list cannot co-exist.
if cfg.HasLegacyBlackWhiteList() {
log.L().Warn("the config `black-white-list` has been deprecated, please replace with `mydumper.filter`")
if !common.StringSliceEqual(cfg.Mydumper.Filter, DefaultFilter) {
if !common.StringSliceEqual(cfg.Mydumper.Filter, defaultFilter) {
return common.ErrInvalidConfig.GenWithStack("`mydumper.filter` and `black-white-list` cannot be simultaneously defined")
}
}
Expand Down
43 changes: 41 additions & 2 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -750,7 +751,7 @@ func TestCronEncodeDecode(t *testing.T) {
func TestAdjustWithLegacyBlackWhiteList(t *testing.T) {
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
require.Equal(t, config.DefaultFilter, cfg.Mydumper.Filter)
require.Equal(t, config.GetDefaultFilter(), cfg.Mydumper.Filter)
require.False(t, cfg.HasLegacyBlackWhiteList())

ctx := context.Background()
Expand All @@ -762,7 +763,7 @@ func TestAdjustWithLegacyBlackWhiteList(t *testing.T) {
cfg.BWList.DoDBs = []string{"test"}
require.EqualError(t, cfg.Adjust(ctx), "[Lightning:Config:ErrInvalidConfig]`mydumper.filter` and `black-white-list` cannot be simultaneously defined")

cfg.Mydumper.Filter = config.DefaultFilter
cfg.Mydumper.Filter = config.GetDefaultFilter()
require.NoError(t, cfg.Adjust(ctx))
require.True(t, cfg.HasLegacyBlackWhiteList())
}
Expand Down Expand Up @@ -955,3 +956,41 @@ func TestCheckAndAdjustForLocalBackend(t *testing.T) {
cfg.TikvImporter.SortedKVDir = base
require.NoError(t, cfg.CheckAndAdjustForLocalBackend())
}

func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
originalDefaultCfg := append([]string{}, config.GetDefaultFilter()...)
cfg1 := config.NewConfig()
require.NoError(t, cfg1.LoadFromTOML([]byte(`
[mydumper]
filter = ["db1.tbl1", "db2.*", "!db2.tbl1"]
`)))
require.Equal(t, 3, len(cfg1.Mydumper.Filter))
require.True(t, common.StringSliceEqual(
cfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

cfg2 := config.NewConfig()
require.True(t, common.StringSliceEqual(
cfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

gCfg1, err := config.LoadGlobalConfig([]string{"-f", "db1.tbl1", "-f", "db2.*", "-f", "!db2.tbl1"}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

gCfg2, err := config.LoadGlobalConfig([]string{}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewGlobalConfig() *GlobalConfig {
LogLevel: "error",
},
Mydumper: GlobalMydumper{
Filter: DefaultFilter,
Filter: GetDefaultFilter(),
},
TikvImporter: GlobalImporter{
Backend: "",
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ go_test(
"tsheap_test.go",
],
flaky = True,
race = "on",
shard_count = 20,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
34 changes: 17 additions & 17 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"testing"

backup "github.com/pingcap/kvproto/pkg/brpb"
Expand Down Expand Up @@ -61,7 +62,7 @@ type region struct {
leader uint64
epoch uint64
id uint64
checkpoint uint64
checkpoint atomic.Uint64

fsim flushSimulator
}
Expand Down Expand Up @@ -93,13 +94,13 @@ func overlaps(a, b kv.KeyRange) bool {

func (r *region) splitAt(newID uint64, k string) *region {
newRegion := &region{
rng: kv.KeyRange{StartKey: []byte(k), EndKey: r.rng.EndKey},
leader: r.leader,
epoch: r.epoch + 1,
id: newID,
checkpoint: r.checkpoint,
fsim: r.fsim.fork(),
rng: kv.KeyRange{StartKey: []byte(k), EndKey: r.rng.EndKey},
leader: r.leader,
epoch: r.epoch + 1,
id: newID,
fsim: r.fsim.fork(),
}
newRegion.checkpoint.Store(r.checkpoint.Load())
r.rng.EndKey = []byte(k)
r.epoch += 1
r.fsim = r.fsim.fork()
Expand Down Expand Up @@ -151,7 +152,7 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
continue
}
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
Checkpoint: region.checkpoint,
Checkpoint: region.checkpoint.Load(),
Region: &logbackup.RegionIdentity{
Id: region.id,
EpochVersion: region.epoch,
Expand Down Expand Up @@ -315,9 +316,9 @@ func (f *fakeCluster) advanceCheckpoints() uint64 {
f.updateRegion(r.id, func(r *region) {
// The current implementation assumes that the server never returns checkpoint with value 0.
// This assumption is true for the TiKV implementation, simulating it here.
r.checkpoint += rand.Uint64()%256 + 1
if r.checkpoint < minCheckpoint {
minCheckpoint = r.checkpoint
cp := r.checkpoint.Add(rand.Uint64()%256 + 1)
if cp < minCheckpoint {
minCheckpoint = cp
}
r.fsim.flushedEpoch = 0
})
Expand All @@ -340,11 +341,10 @@ func createFakeCluster(t *testing.T, n int, simEnabled bool) *fakeCluster {
stores = append(stores, s)
}
initialRegion := &region{
rng: kv.KeyRange{},
leader: stores[0].id,
epoch: 0,
id: c.idAlloc(),
checkpoint: 0,
rng: kv.KeyRange{},
leader: stores[0].id,
epoch: 0,
id: c.idAlloc(),
fsim: flushSimulator{
enabled: simEnabled,
},
Expand All @@ -367,7 +367,7 @@ func (r *region) String() string {
r.epoch,
hex.EncodeToString(r.rng.StartKey),
hex.EncodeToString(r.rng.EndKey),
r.checkpoint,
r.checkpoint.Load(),
r.leader,
r.fsim.flushedEpoch)
}
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/streamhelper/daemon/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "daemon",
srcs = [
"interface.go",
"owner_daemon.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/streamhelper/daemon",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//owner",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "daemon_test",
srcs = ["owner_daemon_test.go"],
flaky = True,
deps = [
":daemon",
"//owner",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
],
)
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/streamhelper",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/daemon",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/version",
Expand Down
Loading

0 comments on commit 06bcee9

Please sign in to comment.