Skip to content

Commit

Permalink
Merge branch 'master' into rm-serial-executor
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Dec 15, 2021
2 parents 6530bad + 64f7309 commit 96bc5ab
Show file tree
Hide file tree
Showing 55 changed files with 614 additions and 343 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ linters:
- rowserrcheck
- unconvert
- makezero
- durationcheck

linters-settings:
staticcheck:
Expand Down
2 changes: 1 addition & 1 deletion bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ func runSQL(ctx context.Context, sctx sessionctx.Context, sql string, resultChan
}

// HandleEvolvePlanTask tries to evolve one plan task.
// It only handle one tasks once because we want each task could use the latest parameters.
// It only processes one task at a time because we want each task to use the latest parameters.
func (h *BindHandle) HandleEvolvePlanTask(sctx sessionctx.Context, adminEvolve bool) error {
originalSQL, db, binding := h.getOnePendingVerifyJob()
if originalSQL == "" {
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand All @@ -284,7 +283,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,19 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
return errors.Annotate(err, "create storage failed")
}

// return expectedErr means at least meet one file
expectedErr := errors.New("Stop Iter")
walkErr := s.WalkDir(ctx, &storage.WalkOption{ListCount: 1}, func(string, int64) error {
// return an error when meet the first regular file to break the walk loop
return expectedErr
})
if !errors.ErrorEqual(walkErr, expectedErr) {
if walkErr == nil {
return errors.Errorf("data-source-dir '%s' doesn't exist or contains no files", taskCfg.Mydumper.SourceDir)
}
return errors.Annotatef(walkErr, "visit data-source-dir '%s' failed", taskCfg.Mydumper.SourceDir)
}

loadTask := log.L().Begin(zap.InfoLevel, "load data source")
var mdl *mydump.MDLoader
mdl, err = mydump.NewMyDumpLoaderWithStore(ctx, taskCfg, s)
Expand Down
35 changes: 29 additions & 6 deletions br/pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"fmt"
"io"
"net"
"net/http"
"net/http/pprof"
"net/url"
Expand Down Expand Up @@ -86,18 +87,40 @@ func NewCluster() (*Cluster, error) {

// Start runs a mock cluster.
func (mock *Cluster) Start() error {
statusURL, err := url.Parse(tempurl.Alloc())
if err != nil {
return errors.Trace(err)
var (
err error
statusURL *url.URL
addrURL *url.URL
)
for i := 0; i < 10; i++ {
// retry 10 times to get available port
statusURL, err = url.Parse(tempurl.Alloc())
if err != nil {
return errors.Trace(err)
}
listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%s", statusURL.Port()))
if err == nil {
// release port listening
listen.Close()
break
}
}
statusPort, err := strconv.ParseInt(statusURL.Port(), 10, 32)
if err != nil {
return errors.Trace(err)
}

addrURL, err := url.Parse(tempurl.Alloc())
if err != nil {
return errors.Trace(err)
for i := 0; i < 10; i++ {
addrURL, err = url.Parse(tempurl.Alloc())
if err != nil {
return errors.Trace(err)
}
listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%s", addrURL.Port()))
if err == nil {
// release port listening
listen.Close()
break
}
}
addrPort, err := strconv.ParseInt(addrURL.Port(), 10, 32)
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,6 @@ func newGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage
// so we need find sst in slash directory
gcs.Prefix += "//"
}
// TODO remove it after BR remove cfg skip-check-path
if !opts.SkipCheckPath {
// check bucket exists
_, err = bucket.Attrs(ctx)
if err != nil {
return nil, errors.Annotatef(err, "gcs://%s/%s", gcs.Bucket, gcs.Prefix)
}
}
return &gcsStorage{gcs: gcs, bucket: bucket}, nil
}

Expand Down
8 changes: 0 additions & 8 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,6 @@ func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (*S3Storag
}

c := s3.New(ses)
// TODO remove it after BR remove cfg skip-check-path
if !opts.SkipCheckPath {
err = checkS3Bucket(c, &qs)
if err != nil {
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "Bucket %s is not accessible: %v", qs.Bucket, err)
}
}

if len(qs.Prefix) > 0 && !strings.HasSuffix(qs.Prefix, "/") {
qs.Prefix += "/"
}
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ func (s *s3Suite) TestS3Storage(c *C) {
_, err := New(ctx, s3, &ExternalStorageOptions{
SendCredentials: test.sendCredential,
CheckPermissions: test.hackPermission,
SkipCheckPath: true,
})
if test.errReturn {
c.Assert(err, NotNil)
Expand Down Expand Up @@ -414,7 +413,7 @@ func (s *s3Suite) TestS3Storage(c *C) {
func (s *s3Suite) TestS3URI(c *C) {
backend, err := ParseBackend("s3://bucket/prefix/", nil)
c.Assert(err, IsNil)
storage, err := New(context.Background(), backend, &ExternalStorageOptions{SkipCheckPath: true})
storage, err := New(context.Background(), backend, &ExternalStorageOptions{})
c.Assert(err, IsNil)
c.Assert(storage.URI(), Equals, "s3://bucket/prefix/")
}
Expand Down
13 changes: 0 additions & 13 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,6 @@ type ExternalStorageOptions struct {
// NoCredentials means that no cloud credentials are supplied to BR
NoCredentials bool

// SkipCheckPath marks whether to skip checking path's existence.
//
// This should only be set to true in testing, to avoid interacting with the
// real world.
// When this field is false (i.e. path checking is enabled), the New()
// function will ensure the path referred by the backend exists by
// recursively creating the folders. This will also throw an error if such
// operation is impossible (e.g. when the bucket storing the path is missing).

// deprecated: use checkPermissions and specify the checkPermission instead.
SkipCheckPath bool

// HTTPClient to use. The created storage may ignore this field if it is not
// directly using HTTP (e.g. the local storage).
HTTPClient *http.Client
Expand All @@ -148,7 +136,6 @@ type ExternalStorageOptions struct {
func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds bool) (ExternalStorage, error) {
return New(ctx, backend, &ExternalStorageOptions{
SendCredentials: sendCreds,
SkipCheckPath: false,
HTTPClient: nil,
})
}
Expand Down
1 change: 0 additions & 1 deletion br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
SkipCheckPath: cfg.SkipCheckPath,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return errors.Trace(err)
Expand Down
1 change: 0 additions & 1 deletion br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
SkipCheckPath: cfg.SkipCheckPath,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return errors.Trace(err)
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if cfg.SkipCheckPath, err = flags.GetBool(flagSkipCheckPath); err != nil {
return errors.Trace(err)
}
if cfg.SkipCheckPath {
log.L().Info("--skip-check-path is deprecated, need explicitly set it anymore")
}

if err = cfg.parseCipherInfo(flags); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -548,7 +551,6 @@ func storageOpts(cfg *Config) *storage.ExternalStorageOptions {
return &storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
SkipCheckPath: cfg.SkipCheckPath,
}
}

Expand Down
1 change: 0 additions & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
SkipCheckPath: cfg.SkipCheckPath,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return errors.Trace(err)
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/trace/tracing_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ func jobA(ctx context.Context) {
ctx = opentracing.ContextWithSpan(ctx, span1)
}
jobB(ctx)
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}

func jobB(ctx context.Context) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("jobB", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}

func TestSpan(t *testing.T) {
Expand All @@ -46,7 +46,7 @@ func TestSpan(t *testing.T) {
require.NoError(t, err)
s := string(content)
// possible result:
// "jobA 22:02:02.545296 20.621764ms\n"
// " └─jobB 22:02:02.545297 10.293444ms\n"
require.Regexp(t, `^jobA.*2[0-9]\.[0-9]+ms\n └─jobB.*1[0-9]\.[0-9]+ms\n$`, s)
// "jobA 22:02:02.545296 200.621764ms\n"
// " └─jobB 22:02:02.545297 100.293444ms\n"
require.Regexp(t, `^jobA.*20[0-9]\.[0-9]+ms\n └─jobB.*10[0-9]\.[0-9]+ms\n$`, s)
}
14 changes: 14 additions & 0 deletions br/tests/lightning_s3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ _EOF_
run_sql "DROP DATABASE IF EXISTS $DB;"
run_sql "DROP TABLE IF EXISTS $DB.$TABLE;"

# test not exist path
rm -f $TEST_DIR/lightning.log
SOURCE_DIR="s3://$BUCKET/not-exist-path?endpoint=http%3A//127.0.0.1%3A9900&access_key=$MINIO_ACCESS_KEY&secret_access_key=$MINIO_SECRET_KEY&force_path_style=true"
! run_lightning -d $SOURCE_DIR --backend local 2> /dev/null
grep -Eq "data-source-dir .* doesn't exist or contains no files" $TEST_DIR/lightning.log

# test empty dir
rm -f $TEST_DIR/lightning.log
emptyPath=empty-bucket/empty-path
mkdir -p $DBPATH/$emptyPath
SOURCE_DIR="s3://$emptyPath/not-exist-path?endpoint=http%3A//127.0.0.1%3A9900&access_key=$MINIO_ACCESS_KEY&secret_access_key=$MINIO_SECRET_KEY&force_path_style=true"
! run_lightning -d $SOURCE_DIR --backend local 2> /dev/null
grep -Eq "data-source-dir .* doesn't exist or contains no files" $TEST_DIR/lightning.log

SOURCE_DIR="s3://$BUCKET/?endpoint=http%3A//127.0.0.1%3A9900&access_key=$MINIO_ACCESS_KEY&secret_access_key=$MINIO_SECRET_KEY&force_path_style=true"
run_lightning -d $SOURCE_DIR --backend local 2> /dev/null
run_sql "SELECT count(*), sum(i) FROM \`$DB\`.$TABLE"
Expand Down
43 changes: 40 additions & 3 deletions cmd/explaintest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ type tester struct {
resultFD *os.File
// ctx is used for Compile sql statement
ctx sessionctx.Context

// outputLen, lastQuery, lastLine and lastResult are used for checking the correctness of last query.
// See https://github.com/pingcap/tidb/issues/29475 for details.
outputLen int
lastText string
lastQuery query
lastResult []byte
}

func newTester(name string) *tester {
Expand Down Expand Up @@ -172,6 +179,10 @@ LOOP:
}
}

if err = t.checkLastResult(); err != nil {
return errors.Annotate(err, fmt.Sprintf("sql:%v", t.lastQuery.Query))
}

return t.flushResult()
}

Expand Down Expand Up @@ -365,7 +376,7 @@ func (t *tester) execute(query query) error {
t.singleQuery = false

if err != nil {
return errors.Trace(errors.Errorf("run \"%v\" at line %d err %v", st.Text(), query.Line, err))
return errors.Trace(errors.Errorf("run \"%v\" at line %d err %v", qText, query.Line, err))
}

if !record && !create {
Expand All @@ -374,11 +385,15 @@ func (t *tester) execute(query query) error {

buf := make([]byte, t.buf.Len()-offset)
if _, err = t.resultFD.ReadAt(buf, int64(offset)); !(err == nil || err == io.EOF) {
return errors.Trace(errors.Errorf("run \"%v\" at line %d err, we got \n%s\nbut read result err %s", st.Text(), query.Line, gotBuf, err))
return errors.Trace(errors.Errorf("run \"%v\" at line %d err, we got \n%s\nbut read result err %s", qText, query.Line, gotBuf, err))
}
if !bytes.Equal(gotBuf, buf) {
return errors.Trace(errors.Errorf("run \"%v\" at line %d err, we need:\n%s\nbut got:\n%s\n", query.Query, query.Line, buf, gotBuf))
return errors.Trace(errors.Errorf("run \"%v\" at line %d err, we need:\n%s\nbut got:\n%s\n", qText, query.Line, buf, gotBuf))
}
t.outputLen = t.buf.Len()
t.lastText = qText
t.lastQuery = query
t.lastResult = gotBuf
}
}
return errors.Trace(err)
Expand Down Expand Up @@ -566,6 +581,28 @@ func (t *tester) resultFileName() string {
return fmt.Sprintf("./r/%s.result", t.name)
}

func (t *tester) checkLastResult() error {
if record || create || t.outputLen == 0 {
return nil
}
fi, err := t.resultFD.Stat()
if err != nil {
return err
}
size := fi.Size()
if size == int64(t.outputLen) {
return nil
}
buf := make([]byte, int(size)-t.outputLen+len(t.lastResult))
if _, err = t.resultFD.ReadAt(buf, int64(t.outputLen-len(t.lastResult))); !(err == nil || err == io.EOF) {
return errors.Trace(errors.Errorf("run \"%v\" at line %d err, we got \n%s\nbut read result err %s", t.lastText, t.lastQuery.Line, t.lastResult, err))
}
if !bytes.Equal(t.lastResult, buf) {
return errors.Trace(errors.Errorf("run \"%v\" at line %d err, we need:\n%s\nbut got:\n%s\n", t.lastText, t.lastQuery.Line, buf, t.lastResult))
}
return nil
}

func loadAllTests() ([]string, error) {
// tests must be in t folder
files, err := os.ReadDir("./t")
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
}
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.cancelFunc = nil
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
Expand Down Expand Up @@ -311,7 +312,6 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
func (e *IndexNestedLoopHashJoin) Close() error {
if e.cancelFunc != nil {
e.cancelFunc()
e.cancelFunc = nil
}
if e.resultCh != nil {
for range e.resultCh {
Expand Down
1 change: 1 addition & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.cancelFunc = nil
e.startWorkers(ctx)
return nil
}
Expand Down
Loading

0 comments on commit 96bc5ab

Please sign in to comment.