Skip to content

Commit

Permalink
Merge branch 'master' into zimuxia/ctc-p
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Dec 28, 2021
2 parents a6f36c1 + abb6582 commit 7f98a78
Show file tree
Hide file tree
Showing 23 changed files with 498 additions and 34 deletions.
9 changes: 8 additions & 1 deletion .golangci_br.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ linters:
- exhaustivestruct
- exhaustive
- godot
- gosec
- errorlint
- wrapcheck
- gomoddirectives
Expand Down Expand Up @@ -81,3 +80,11 @@ linters-settings:

issues:
exclude-rules:
- path: br/tests/
linters:
- gosec
- errcheck
- path: _test\.go
linters:
- gosec

4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error
for _, col := range cols {
if mysql.HasPriKeyFlag(col.Flag) {
incrementalBits := autoRandomIncrementBits(col, int(meta.AutoRandomBits))
autoRandomBits := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63n(1<<meta.AutoRandomBits) << incrementalBits
autoRandomBits := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63n(1<<meta.AutoRandomBits) << incrementalBits // nolint:gosec
autoIDFn = func(id int64) int64 {
return autoRandomBits | id
}
break
}
}
} else if meta.ShardRowIDBits > 0 {
rd := rand.New(rand.NewSource(options.AutoRandomSeed))
rd := rand.New(rand.NewSource(options.AutoRandomSeed)) // nolint:gosec
mask := int64(1)<<meta.ShardRowIDBits - 1
shift := autoid.RowIDBitLength - meta.ShardRowIDBits - 1
autoIDFn = func(id int64) int64 {
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ func (cpdb *FileCheckpointsDB) save() error {
// because `os.WriteFile` is not atomic, directly write into it may reset the file
// to an empty file if write is not finished.
tmpPath := cpdb.path + ".tmp"
if err := os.WriteFile(tmpPath, serialized, 0o644); err != nil {
if err := os.WriteFile(tmpPath, serialized, 0o644); err != nil { // nolint:gosec
return errors.Trace(err)
}
if err := os.Rename(tmpPath, cpdb.path); err != nil {
Expand Down Expand Up @@ -1301,6 +1301,8 @@ func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[
// 1. table status is earlier than CheckpointStatusIndexImported, and
// 2. engine status is earlier than CheckpointStatusImported, and
// 3. chunk has been read

// nolint:gosec
query := fmt.Sprintf(`
SELECT DISTINCT t.table_name, c.engine_id
FROM %s.%s t, %s.%s c, %s.%s e
Expand Down Expand Up @@ -1386,7 +1388,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
colName = columnTableName
aliasedColName = "t.table_name"
}

// nolint:gosec
selectQuery := fmt.Sprintf(`
SELECT
t.table_name,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/common/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func ToTLSConfig(caPath, certPath, keyPath string) (*tls.Config, error) {
return nil, errors.New("failed to append ca certs")
}

return &tls.Config{
return &tls.Config{ // nolint:gosec
Certificates: certificates,
RootCAs: certPool,
NextProtos: []string{"h2", "http/1.1"}, // specify `h2` to let Go use HTTP/2.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ func CleanupMetas(ctx context.Context, cfg *config.Config, tableName string) err
func UnsafeCloseEngine(ctx context.Context, importer backend.Backend, engine string) (*backend.ClosedEngine, error) {
if index := strings.LastIndexByte(engine, ':'); index >= 0 {
tableName := engine[:index]
engineID, err := strconv.Atoi(engine[index+1:])
engineID, err := strconv.Atoi(engine[index+1:]) // nolint:gosec
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
11 changes: 6 additions & 5 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}
needAutoID := common.TableHasAutoRowID(m.tr.tableInfo.Core) || m.tr.tableInfo.Core.GetAutoIncrementColInfo() != nil || m.tr.tableInfo.Core.ContainsAutoRandomBits()
err = exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName)
query := fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName) // nolint:gosec
rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -381,6 +381,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
needChecksum = true
needRemoteDupe = true
err = exec.Transact(ctx, "checksum pre-check", func(ctx context.Context, tx *sql.Tx) error {
// nolint:gosec
query := fmt.Sprintf("SELECT task_id, total_kvs_base, total_bytes_base, checksum_base, total_kvs, total_bytes, checksum, status, has_duplicates from %s WHERE table_id = ? FOR UPDATE", m.tableName)
rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID)
if err != nil {
Expand Down Expand Up @@ -593,7 +594,7 @@ func (m *dbTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
// avoid override existing metadata if the meta is already inserted.
exist := false
err := exec.Transact(ctx, "check whether this task has started before", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID)
query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID) // nolint:gosec
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.Annotate(err, "fetch task meta failed")
Expand Down Expand Up @@ -635,7 +636,7 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
return errors.Annotate(err, "enable pessimistic transaction failed")
}
return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName)
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName) // nolint:gosec
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.Annotate(err, "fetch task metas failed")
Expand Down Expand Up @@ -695,7 +696,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
paused := false
var pausedCfg storedCfgs
err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName)
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) // nolint:gosec
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.Annotate(err, "fetch task meta failed")
Expand Down Expand Up @@ -821,7 +822,7 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool
switchBack := true
allFinished := finished
err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName)
query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) // nolint:gosec
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.Annotate(err, "fetch task meta failed")
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func waitUntilServerOnline(addr string, statusPort uint) string {
// connect http status
statusURL := fmt.Sprintf("http://127.0.0.1:%d/status", statusPort)
for retry = 0; retry < retryTime; retry++ {
resp, err := http.Get(statusURL) // nolint:noctx
resp, err := http.Get(statusURL) // nolint:noctx,gosec
if err == nil {
// Ignore errors.
_, _ = io.ReadAll(resp.Body)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/storage/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func dfsCommand(args ...string) (*exec.Cmd, error) {
}
cmd = append(cmd, bin, "dfs")
cmd = append(cmd, args...)
//nolint:gosec
return exec.Command(cmd[0], cmd[1:]...), nil
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/utils/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// #nosec
// register HTTP handler for /debug/pprof
"net/http"
_ "net/http/pprof"
_ "net/http/pprof" // nolint:gosec

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down
22 changes: 13 additions & 9 deletions cmd/explaintest/r/explain_indexmerge.result
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@ create index td on t (d);
load stats 's/explain_indexmerge_stats_t.json';
explain format = 'brief' select * from t where a < 50 or b < 50;
id estRows task access object operator info
TableReader 98.00 root data:Selection
└─Selection 98.00 cop[tikv] or(lt(test.t.a, 50), lt(test.t.b, 50))
└─TableFullScan 5000000.00 cop[tikv] table:t keep order:false
IndexMerge 98.00 root
├─TableRangeScan(Build) 49.00 cop[tikv] table:t range:[-inf,50), keep order:false
├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false
└─TableRowIDScan(Probe) 98.00 cop[tikv] table:t keep order:false
explain format = 'brief' select * from t where (a < 50 or b < 50) and f > 100;
id estRows task access object operator info
TableReader 98.00 root data:Selection
└─Selection 98.00 cop[tikv] gt(test.t.f, 100), or(lt(test.t.a, 50), lt(test.t.b, 50))
└─TableFullScan 5000000.00 cop[tikv] table:t keep order:false
IndexMerge 98.00 root
├─TableRangeScan(Build) 49.00 cop[tikv] table:t range:[-inf,50), keep order:false
├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false
└─Selection(Probe) 98.00 cop[tikv] gt(test.t.f, 100)
└─TableRowIDScan 98.00 cop[tikv] table:t keep order:false
explain format = 'brief' select * from t where b < 50 or c < 50;
id estRows task access object operator info
TableReader 98.00 root data:Selection
└─Selection 98.00 cop[tikv] or(lt(test.t.b, 50), lt(test.t.c, 50))
└─TableFullScan 5000000.00 cop[tikv] table:t keep order:false
IndexMerge 98.00 root
├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false
├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tc(c) range:[-inf,50), keep order:false
└─TableRowIDScan(Probe) 98.00 cop[tikv] table:t keep order:false
set session tidb_enable_index_merge = on;
explain format = 'brief' select * from t where a < 50 or b < 50;
id estRows task access object operator info
Expand Down
19 changes: 19 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/domainutil"
Expand Down Expand Up @@ -89,6 +90,7 @@ type Domain struct {
cancel context.CancelFunc
indexUsageSyncLease time.Duration
planReplayer *planReplayer
expiredTimeStamp4PC types.Time

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -335,6 +337,22 @@ func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) {
return meta.NewSnapshotMeta(snapshot), nil
}

// ExpiredTimeStamp4PC gets expiredTimeStamp4PC from domain.
func (do *Domain) ExpiredTimeStamp4PC() types.Time {
do.m.Lock()
defer do.m.Unlock()

return do.expiredTimeStamp4PC
}

// SetExpiredTimeStamp4PC sets the expiredTimeStamp4PC from domain.
func (do *Domain) SetExpiredTimeStamp4PC(time types.Time) {
do.m.Lock()
defer do.m.Unlock()

do.expiredTimeStamp4PC = time
}

// DDL gets DDL from domain.
func (do *Domain) DDL() ddl.DDL {
return do.ddl
Expand Down Expand Up @@ -712,6 +730,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
onClose: onClose,
renewLeaseCh: make(chan func(), 10),
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
}

do.SchemaValidator = NewSchemaValidator(ddlLease, do)
Expand Down
35 changes: 34 additions & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
case *ast.ShutdownStmt:
err = e.executeShutdown(x)
case *ast.AdminStmt:
err = e.executeAdminReloadStatistics(x)
err = e.executeAdmin(x)
}
e.done = true
return err
Expand Down Expand Up @@ -1659,6 +1660,16 @@ func asyncDelayShutdown(p *os.Process, delay time.Duration) {
}
}

func (e *SimpleExec) executeAdmin(s *ast.AdminStmt) error {
switch s.Tp {
case ast.AdminReloadStatistics:
return e.executeAdminReloadStatistics(s)
case ast.AdminFlushPlanCache:
return e.executeAdminFlushPlanCache(s)
}
return nil
}

func (e *SimpleExec) executeAdminReloadStatistics(s *ast.AdminStmt) error {
if s.Tp != ast.AdminReloadStatistics {
return errors.New("This AdminStmt is not ADMIN RELOAD STATS_EXTENDED")
Expand All @@ -1668,3 +1679,25 @@ func (e *SimpleExec) executeAdminReloadStatistics(s *ast.AdminStmt) error {
}
return domain.GetDomain(e.ctx).StatsHandle().ReloadExtendedStatistics()
}

func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error {
if s.Tp != ast.AdminFlushPlanCache {
return errors.New("This AdminStmt is not ADMIN FLUSH PLAN_CACHE")
}
if s.StatementScope == ast.StatementScopeGlobal {
return errors.New("Do not support the 'admin flush global scope.'")
}
if !plannercore.PreparedPlanCacheEnabled() {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("The plan cache is disable. So there no need to flush the plan cache"))
return nil
}
now := types.NewTime(types.FromGoTime(time.Now().In(e.ctx.GetSessionVars().StmtCtx.TimeZone)), mysql.TypeTimestamp, 3)
e.ctx.GetSessionVars().LastUpdateTime4PC = now
e.ctx.PreparedPlanCache().DeleteAll()
if s.StatementScope == ast.StatementScopeInstance {
// Record the timestamp. When other sessions want to use the plan cache,
// it will check the timestamp first to decide whether the plan cache should be flushed.
domain.GetDomain(e.ctx).SetExpiredTimeStamp4PC(now)
}
return nil
}
10 changes: 10 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
}
prepared.SchemaVersion = is.SchemaMetaVersion()
}
// If the lastUpdateTime less than expiredTimeStamp4PC,
// it means other sessions have executed 'admin flush instance plan_cache'.
// So we need to clear the current session's plan cache.
// And update lastUpdateTime to the newest one.
expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC()
if prepared.UseCache && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 {
sctx.PreparedPlanCache().DeleteAll()
prepared.CachedPlan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}
err = e.getPhysicalPlan(ctx, sctx, is, preparedObj)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2203,6 +2203,10 @@ func (s *testIntegrationSuite) TestOptimizeHintOnPartitionTable(c *C) {
partition p1 values less than(11),
partition p2 values less than(16));`)
tk.MustExec(`insert into t values (1,1,"1"), (2,2,"2"), (8,8,"8"), (11,11,"11"), (15,15,"15")`)
tk.MustExec("set @@tidb_enable_index_merge = off")
defer func() {
tk.MustExec("set @@tidb_enable_index_merge = on")
}()

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
Expand Down
2 changes: 2 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,8 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan,
return &AdminResetTelemetryID{}, nil
case ast.AdminReloadStatistics:
return &Simple{Statement: as}, nil
case ast.AdminFlushPlanCache:
return &Simple{Statement: as}, nil
default:
return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as)
}
Expand Down
Loading

0 comments on commit 7f98a78

Please sign in to comment.