From a8821c8b06987d42b2955d660a356975cce91f53 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Fri, 20 Jul 2018 00:08:41 +0800 Subject: [PATCH 01/23] tidb tracing prototype --- distsql/distsql.go | 6 ++ executor/aggregate.go | 26 ++++++- executor/builder.go | 2 + executor/index_lookup_join.go | 6 ++ executor/join.go | 7 ++ executor/projection.go | 7 ++ executor/table_reader.go | 5 +- executor/trace.go | 123 ++++++++++++++++++++++++++++++++++ plan/planbuilder.go | 2 + plan/trace.go | 39 +++++++++++ session/session.go | 10 --- store/tikv/backoff.go | 7 +- util/tracing/tracer.go | 67 ++++++++++++++++++ util/tracing/tracer_test.go | 78 +++++++++++++++++++++ 14 files changed, 371 insertions(+), 14 deletions(-) create mode 100644 executor/trace.go create mode 100644 plan/trace.go create mode 100644 util/tracing/tracer.go create mode 100644 util/tracing/tracer_test.go diff --git a/distsql/distsql.go b/distsql/distsql.go index 45f8665fac2a7..1dc51d8667e43 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/tracing" "golang.org/x/net/context" ) @@ -35,9 +36,13 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie hook.(func(*kv.Request))(kvReq) } + child := tracing.ChildSpanFromContxt(ctx, "distsql_select") + defer child.Finish() + if !sctx.GetSessionVars().EnableStreaming { kvReq.Streaming = false } + resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars) if resp == nil { err := errors.New("client returns nil response") @@ -54,6 +59,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie }, nil } + child.LogKV("event", "finished sending rpc calls") return &selectResult{ label: "dag", resp: resp, diff --git a/executor/aggregate.go b/executor/aggregate.go index 5a1613a31768e..572587edcdd26 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -16,7 +16,10 @@ package executor import ( "sync" + "fmt" + "github.com/juju/errors" + "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -27,6 +30,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/mvmap" + "github.com/pingcap/tidb/util/tracing" "github.com/spaolacci/murmur3" "golang.org/x/net/context" ) @@ -64,6 +68,8 @@ type HashAggPartialWorker struct { // chk stores the input data from child, // and is reused by childExec and partial worker. chk *chunk.Chunk + + trace opentracing.Span } // HashAggFinalWorker indicates the final workers of parallel hash agg execution, @@ -79,6 +85,8 @@ type HashAggFinalWorker struct { inputCh chan *HashAggIntermData outputCh chan *AfFinalResult finalResultHolderCh chan *chunk.Chunk + + trace opentracing.Span } // AfFinalResult indicates aggregation functions final result. @@ -351,12 +359,14 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG if needShuffle { w.shuffleIntermData(sc, finalConcurrency) } + w.trace.Finish() waitGroup.Done() }() for { if !w.getChildInput() { return } + w.trace.LogKV("event", "update partial result") if err := w.updatePartialResult(ctx, sc, w.chk, len(w.aggCtxsMap)); err != nil { w.globalOutputCh <- &AfFinalResult{err: errors.Trace(err)} return @@ -369,6 +379,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) { inputIter := chunk.NewIterator4Chunk(chk) + w.trace.LogKV("event", "iterating chunk's data") for row := inputIter.Begin(); row != inputIter.End(); row = inputIter.Next() { groupKey, err := w.getGroupKey(sc, row) if err != nil { @@ -388,6 +399,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s // We only support parallel execution for single-machine, so process of encode and decode can be skipped. func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) { groupKeysSlice := make([][][]byte, finalConcurrency) + w.trace.LogKV("event", "shuffling interm data") for groupKey := range w.aggCtxsMap { groupKeyBytes := []byte(groupKey) finalWorkerIdx := int(murmur3.Sum32(groupKeyBytes)) % finalConcurrency @@ -457,6 +469,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sc *stmtctx.StatementContext) (er ok bool intermDataRowsBuffer [][]types.Datum ) + w.trace.LogKV("event", "consuming interm data") for { if input, ok = w.getPartialInput(); !ok { return nil @@ -527,6 +540,7 @@ func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) { func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) { defer func() { + w.trace.Finish() waitGroup.Done() }() sc := ctx.GetSessionVars().StmtCtx @@ -539,10 +553,14 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro // Next implements the Executor Next interface. func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() + child := tracing.ChildSpanFromContxt(ctx, "hash_agg_exec") + defer child.Finish() + if e.isUnparallelExec { return errors.Trace(e.unparallelExec(ctx, chk)) } - return errors.Trace(e.parallelExec(ctx, chk)) + err := e.parallelExec(ctx, chk) + return errors.Trace(err) } func (e *HashAggExec) fetchChildData(ctx context.Context) { @@ -596,7 +614,9 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) { partialWorkerWaitGroup := &sync.WaitGroup{} partialWorkerWaitGroup.Add(len(e.partialWorkers)) + for i := range e.partialWorkers { + e.partialWorkers[i].trace = tracing.ChildSpanFromContxt(ctx, fmt.Sprintf("hash_agg_exec_partial_worker_%d", i)) go e.partialWorkers[i].run(e.ctx, partialWorkerWaitGroup, len(e.finalWorkers)) } go e.waitPartialWorkerAndCloseOutputChs(partialWorkerWaitGroup) @@ -604,6 +624,7 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) { finalWorkerWaitGroup := &sync.WaitGroup{} finalWorkerWaitGroup.Add(len(e.finalWorkers)) for i := range e.finalWorkers { + e.finalWorkers[i].trace = tracing.ChildSpanFromContxt(ctx, fmt.Sprintf("hash_agg_exec_final_worker_%d", i)) go e.finalWorkers[i].run(e.ctx, finalWorkerWaitGroup) } go e.waitFinalWorkerAndCloseFinalOutput(finalWorkerWaitGroup) @@ -793,6 +814,8 @@ func (e *StreamAggExec) Close() error { func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() + child := tracing.ChildSpanFromContxt(ctx, "stream_agg_exec") + defer child.Finish() for !e.executed && chk.NumRows() < e.maxChunkSize { err := e.consumeOneGroup(ctx, chk) if err != nil { @@ -800,6 +823,7 @@ func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { return errors.Trace(err) } } + child.LogKV("event", "StreamAggExec is finished") return nil } diff --git a/executor/builder.go b/executor/builder.go index 7122e5f7a33a1..eadc782b1c1ba 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -89,6 +89,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor { return b.buildDelete(v) case *plan.Execute: return b.buildExecute(v) + case *plan.Trace: + return b.buildTrace(v) case *plan.Explain: return b.buildExplain(v) case *plan.PointGetPlan: diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 92378d80a1e07..1470843d03743 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" "github.com/pingcap/tidb/util/ranger" + "github.com/pingcap/tidb/util/tracing" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -189,6 +190,11 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork // Next implements the Executor interface. func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error { + sp := tracing.ChildSpanFromContxt(ctx, "index_lookup_join") + defer func() { + sp.LogKV("event", "index lookup join is finished.") + sp.Finish() + }() chk.Reset() e.joinResult.Reset() for { diff --git a/executor/join.go b/executor/join.go index 8eb1b953c5691..475bdc00713f0 100644 --- a/executor/join.go +++ b/executor/join.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" + "github.com/pingcap/tidb/util/tracing" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -184,7 +185,10 @@ func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyB // fetchOuterChunks get chunks from fetches chunks from the big table in a background goroutine // and sends the chunks to multiple channels which will be read by multiple join workers. func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) { + sp := tracing.SpanFromContext(ctx) defer func() { + sp.LogKV("event", "finishing fetchOuterChunks") + sp.Finish() for i := range e.outerResultChs { close(e.outerResultChs[i]) } @@ -471,6 +475,8 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu // step 1. fetch data from inner child and build a hash table; // step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers. func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) { + sp := tracing.ChildSpanFromContxt(ctx, "hash_join_exec") + defer sp.Finish() if !e.prepared { e.innerFinished = make(chan error, 1) go e.fetchInnerAndBuildHashTable(ctx) @@ -491,6 +497,7 @@ func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) { } chk.SwapColumns(result.chk) result.src <- result.chk + sp.LogKV("event", "hash join exec is finished") return nil } diff --git a/executor/projection.go b/executor/projection.go index 8df62681283a3..4aca4f6875828 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/tracing" "golang.org/x/net/context" ) @@ -61,6 +62,7 @@ type ProjectionExec struct { // Open implements the Executor Open interface. func (e *ProjectionExec) Open(ctx context.Context) error { + _ = tracing.ChildSpanFromContxt(ctx, "projection_exec") if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } @@ -139,6 +141,11 @@ func (e *ProjectionExec) Open(ctx context.Context) error { // +------------------------------+ +----------------------+ // func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { + sp := tracing.ChildSpanFromContxt(ctx, "projection_exec") + defer func() { + sp.LogKV("event", "projection_exec is finished") + sp.Finish() + }() chk.Reset() if e.isUnparallelExec() { return errors.Trace(e.unParallelExecute(ctx, chk)) diff --git a/executor/table_reader.go b/executor/table_reader.go index a04b5897213c8..2278d739740df 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/ranger" + "github.com/pingcap/tidb/util/tracing" tipb "github.com/pingcap/tipb/go-tipb" "golang.org/x/net/context" ) @@ -57,8 +58,8 @@ type TableReaderExecutor struct { // Open initialzes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { - span, ctx := startSpanFollowsContext(ctx, "executor.TableReader.Open") - defer span.Finish() + child := tracing.ChildSpanFromContxt(ctx, "table_reader_exec") + defer child.Finish() var err error if e.corColInFilter { diff --git a/executor/trace.go b/executor/trace.go new file mode 100644 index 0000000000000..52051b07613c6 --- /dev/null +++ b/executor/trace.go @@ -0,0 +1,123 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "fmt" + "time" + + "github.com/juju/errors" + "github.com/opentracing/basictracer-go" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/tracing" + "golang.org/x/net/context" +) + +var traceColumns = append([]*types.FieldType{}) + +// TraceExec represents a root executor of trace query. +type TraceExec struct { + baseExecutor + // CollectedSpans collects all span during execution. Span is appended via + // callback method which passes into tracer implementation. + CollectedSpans []basictracer.RawSpan + // exhausted being true means there is no more result. + exhausted bool + // plan is the real query plan and it is used for building real query's executor. + plan plan.Plan + // rootTrace represents root span which is father of all other span. + rootTrace opentracing.Span + + childrenResults []*chunk.Chunk +} + +// buildTrace builds a TraceExec for future executing. This method will be called +// at build(). +func (b *executorBuilder) buildTrace(v *plan.Trace) Executor { + e := &TraceExec{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + } + + pp, _ := v.StmtPlan.(plan.PhysicalPlan) + e.children = make([]Executor, 0, len(pp.Children())) + for _, child := range pp.Children() { + switch p := child.(type) { + case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg: + e.children = append(e.children, b.build(p)) + default: + panic(fmt.Sprintf("%v is not supported", child)) + } + + } + + return e +} + +// Open opens a trace executor and it will create a root trace span which will be +// used for the following span in a relationship of `ChildOf` or `FollowFrom`. +// for more details, you could refer to http://opentracing.io +func (e *TraceExec) Open(ctx context.Context) error { + e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) { + e.CollectedSpans = append(e.CollectedSpans, sp) + }) + ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) + for _, child := range e.children { + err := child.Open(ctx) + if err != nil { + return errors.Trace(err) + } + } + e.childrenResults = make([]*chunk.Chunk, 0, len(e.children)) + for _, child := range e.children { + e.childrenResults = append(e.childrenResults, child.newChunk()) + } + + return nil +} + +// Next executes real query and collects span later. +func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { + chk.Reset() + if e.exhausted { + return nil + } + + ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) + if len(e.children) > 0 { + if err := e.children[0].Next(ctx, e.childrenResults[0]); err != nil { + return errors.Trace(err) + } + } + + e.rootTrace.LogKV("event", "tracing completed") + e.rootTrace.Finish() + var timeVal string + for i, sp := range e.CollectedSpans { + spanStartTime := sp.Start + for _, entry := range sp.Logs { + chk.AppendString(0, entry.Timestamp.Format(time.RFC3339)) + timeVal = entry.Timestamp.Sub(spanStartTime).String() + chk.AppendString(1, timeVal) + chk.AppendString(2, sp.Operation) + chk.AppendInt64(3, int64(i)) + chk.AppendString(4, entry.Fields[0].String()) + } + } + e.exhausted = true + + return nil +} diff --git a/plan/planbuilder.go b/plan/planbuilder.go index a24fcfe6bedb4..9fa2d20a70462 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -141,6 +141,8 @@ func (b *planBuilder) build(node ast.Node) (Plan, error) { return b.buildExecute(x) case *ast.ExplainStmt: return b.buildExplain(x) + case *ast.TraceStmt: + return b.buildTrace(x) case *ast.InsertStmt: return b.buildInsert(x) case *ast.LoadDataStmt: diff --git a/plan/trace.go b/plan/trace.go new file mode 100644 index 0000000000000..9774dd1f31ec4 --- /dev/null +++ b/plan/trace.go @@ -0,0 +1,39 @@ +package plan + +import ( + "github.com/juju/errors" + "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/mysql" +) + +// Trace represents a trace plan. +type Trace struct { + baseSchemaProducer + + StmtPlan Plan +} + +// buildTrace builds a trace plan. Inside this method, it first optimize the +// underlying query and then constructs a schema, which will be used to constructs +// rows result. +func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { + if _, ok := trace.Stmt.(*ast.SelectStmt); !ok { + return nil, errors.New("trace only supports select query") + } + optimizedP, err := Optimize(b.ctx, trace.Stmt, b.is) + if err != nil { + return nil, errors.New("fail to optimize during build trace") + } + p := &Trace{StmtPlan: optimizedP} + + retFields := []string{"timestamp", "duration", "pos", "operation", "event"} + schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) + schema.Append(buildColumn("", "timestamp", mysql.TypeString, mysql.MaxBlobWidth)) + schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth)) + schema.Append(buildColumn("", "operation", mysql.TypeString, mysql.MaxBlobWidth)) + schema.Append(buildColumn("", "pos", mysql.TypeInt24, mysql.MaxBlobWidth)) + schema.Append(buildColumn("", "log", mysql.TypeString, mysql.MaxBlobWidth)) + p.SetSchema(schema) + return p, nil +} diff --git a/session/session.go b/session/session.go index d76313e581abd..b0e6b918d760b 100644 --- a/session/session.go +++ b/session/session.go @@ -375,11 +375,6 @@ func (s *session) doCommitWithRetry(ctx context.Context) error { } func (s *session) CommitTxn(ctx context.Context) error { - if span := opentracing.SpanFromContext(ctx); span != nil { - span = opentracing.StartSpan("session.CommitTxn", opentracing.ChildOf(span.Context())) - defer span.Finish() - } - err := s.doCommitWithRetry(ctx) label := metrics.LblOK if err != nil { @@ -390,11 +385,6 @@ func (s *session) CommitTxn(ctx context.Context) error { } func (s *session) RollbackTxn(ctx context.Context) error { - if span := opentracing.SpanFromContext(ctx); span != nil { - span = opentracing.StartSpan("session.RollbackTxn", opentracing.ChildOf(span.Context())) - defer span.Finish() - } - var err error if s.txn.Valid() { terror.Log(s.txn.Rollback()) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 84a4478fa0837..62e2f25abdc15 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/terror" + "github.com/pingcap/tidb/util/tracing" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -211,6 +212,8 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { default: } + child := tracing.ChildSpanFromContxt(b.ctx, "backoffer") + defer child.Finish() metrics.TiKVBackoffCounter.WithLabelValues(typ.String()).Inc() // Lazy initialize. if b.fn == nil { @@ -225,7 +228,9 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { b.totalSleep += f(b.ctx) b.types = append(b.types, typ) - log.Debugf("%v, retry later(totalSleep %dms, maxSleep %dms)", err, b.totalSleep, b.maxSleep) + log.Debugf("%v, retry later(totalsleep %dms, maxsleep %dms)", err, b.totalSleep, b.maxSleep) + child.LogKV("error", err.Error()) + child.LogKV("retry", fmt.Sprintf("retry later(totalsleep %dms, maxsleep %dms)", b.totalSleep, b.maxSleep)) b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) if b.maxSleep > 0 && b.totalSleep >= b.maxSleep { diff --git a/util/tracing/tracer.go b/util/tracing/tracer.go new file mode 100644 index 0000000000000..abab0cc599f6c --- /dev/null +++ b/util/tracing/tracer.go @@ -0,0 +1,67 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "github.com/opentracing/basictracer-go" + "github.com/opentracing/opentracing-go" + "golang.org/x/net/context" +) + +// TiDBTrace is set as Baggage on traces which are used for tidb tracing. +const TiDBTrace = "tr" + +// A CallbackRecorder immediately invokes itself on received trace spans. +type CallbackRecorder func(sp basictracer.RawSpan) + +// RecordSpan implements basictracer.SpanRecorder. +func (cr CallbackRecorder) RecordSpan(sp basictracer.RawSpan) { + cr(sp) +} + +// NewRecordedTrace returns a Span which records directly via the specified +// callback. +func NewRecordedTrace(opName string, callback func(sp basictracer.RawSpan)) opentracing.Span { + tr := basictracer.New(CallbackRecorder(callback)) + opentracing.SetGlobalTracer(tr) + sp := tr.StartSpan(opName) + sp.SetBaggageItem(TiDBTrace, "1") + return sp +} + +// noopSpan returns a Span which discards all operations. +func noopSpan() opentracing.Span { + return (opentracing.NoopTracer{}).StartSpan("DefaultSpan") +} + +// SpanFromContext returns the span obtained from the context or, if none is found, a new one started through tracer. +func SpanFromContext(ctx context.Context) (sp opentracing.Span) { + if sp = opentracing.SpanFromContext(ctx); sp == nil { + return noopSpan() + } + return sp +} + +// ChildSpanFromContxt return a non-nil span. If span can be got from ctx, then returned span is +// a child of such span. Otherwise, returned span is a noop span. +func ChildSpanFromContxt(ctx context.Context, opName string) (sp opentracing.Span) { + if sp := opentracing.SpanFromContext(ctx); sp != nil { + if _, ok := sp.Tracer().(opentracing.NoopTracer); !ok { + child := opentracing.StartSpan(opName, opentracing.ChildOf(sp.Context())) + opentracing.ContextWithSpan(ctx, child) + return child + } + } + return noopSpan() +} diff --git a/util/tracing/tracer_test.go b/util/tracing/tracer_test.go new file mode 100644 index 0000000000000..150aab02ba61b --- /dev/null +++ b/util/tracing/tracer_test.go @@ -0,0 +1,78 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing_test + +import ( + "testing" + + . "github.com/pingcap/check" + + basictracer "github.com/opentracing/basictracer-go" + "github.com/opentracing/opentracing-go" + "github.com/pingcap/tidb/util/tracing" + "golang.org/x/net/context" +) + +var _ = Suite(&testTraceSuite{}) + +func TestT(t *testing.T) { + TestingT(t) +} + +type testTraceSuite struct { +} + +func (s *testTraceSuite) TestSpanFromContext(c *C) { + ctx := context.Background() + noopSp := tracing.SpanFromContext(ctx) + // test noop span + _, ok := noopSp.Tracer().(opentracing.NoopTracer) + c.Assert(ok, IsTrue) + + // test tidb tracing + collectedSpan := make([]basictracer.RawSpan, 1) + sp := tracing.NewRecordedTrace("test", func(sp basictracer.RawSpan) { + collectedSpan = append(collectedSpan, sp) + }) + sp.Finish() + opentracing.ContextWithSpan(ctx, sp) + child := tracing.SpanFromContext(ctx) + child.Finish() + + // verify second span's operation is not nil, this way we can ensure + // callback logic works. + c.Assert(collectedSpan[0].Operation, NotNil) +} + +func (s *testTraceSuite) TestChildSpanFromContext(c *C) { + ctx := context.Background() + noopSp := tracing.ChildSpanFromContxt(ctx, "") + _, ok := noopSp.Tracer().(opentracing.NoopTracer) + c.Assert(ok, IsTrue) + + // test tidb tracing + collectedSpan := make([]basictracer.RawSpan, 1) + sp := tracing.NewRecordedTrace("test", func(sp basictracer.RawSpan) { + collectedSpan = append(collectedSpan, sp) + }) + sp.Finish() + opentracing.ContextWithSpan(ctx, sp) + child := tracing.ChildSpanFromContxt(ctx, "test_child") + child.Finish() + + // verify second span's operation is not nil, this way we can ensure + // callback logic works. + c.Assert(collectedSpan[1].Operation, NotNil) + +} From 92d17644badaa49c25b030f268efbbc0be5a4bcc Mon Sep 17 00:00:00 2001 From: zhexuany Date: Fri, 17 Aug 2018 14:02:35 +0800 Subject: [PATCH 02/23] add benchmark for noop --- util/tracing/noop_bench_test.go | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 util/tracing/noop_bench_test.go diff --git a/util/tracing/noop_bench_test.go b/util/tracing/noop_bench_test.go new file mode 100644 index 0000000000000..4216d501c4a69 --- /dev/null +++ b/util/tracing/noop_bench_test.go @@ -0,0 +1,50 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + + +package tracing + +import ( + "testing" + "context" +) + + +func BenchmarkNoopLogKV(b *testing.B) { + sp := noopSpan() + for i := 0; i < b.N; i++ { + sp.LogKV("event", "noop is finished") + } +} + +func BenchmarkNoopLogKVWithF(b *testing.B) { + sp := noopSpan() + for i := 0; i < b.N; i++ { + sp.LogKV("event", "this is format %s", "noop is finished") + } +} + + +func BenchmarkSpanFromContext(b *testing.B) { + ctx := context.TODO() + for i := 0 ; i < b.N; i++ { + SpanFromContext(ctx) + } +} + +func BenchmarkChildFromContext(b *testing.B) { + ctx := context.TODO() + for i := 0 ; i < b.N; i++ { + ChildSpanFromContxt(ctx, "child") + } +} From 3e61f1b6e79d2ffab53b13827bc2f3572f286c88 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Tue, 21 Aug 2018 13:50:49 +0800 Subject: [PATCH 03/23] remove old span system --- executor/distsql.go | 19 ---------- executor/trace.go | 63 ++++++++++++++++++++++++++------- plan/trace.go | 8 ++--- session/session.go | 16 --------- session/tidb.go | 12 ++----- session/txn.go | 6 +--- store/tikv/2pc.go | 15 -------- store/tikv/coprocessor.go | 6 ---- util/tracing/noop_bench_test.go | 3 +- util/tracing/trace_span.go | 17 +++++++++ 10 files changed, 77 insertions(+), 88 deletions(-) create mode 100644 util/tracing/trace_span.go diff --git a/executor/distsql.go b/executor/distsql.go index 373a5a81ca8a4..e71595a7954cc 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -23,7 +23,6 @@ import ( "unsafe" "github.com/juju/errors" - "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -210,18 +209,6 @@ func splitRanges(ranges []*ranger.Range, keepOrder bool) ([]*ranger.Range, []*ra return signedRanges, unsignedRanges } -// startSpanFollowContext is similar to opentracing.StartSpanFromContext, but the span reference use FollowsFrom option. -func startSpanFollowsContext(ctx context.Context, operationName string) (opentracing.Span, context.Context) { - span := opentracing.SpanFromContext(ctx) - if span != nil { - span = opentracing.StartSpan(operationName, opentracing.FollowsFrom(span.Context())) - } else { - span = opentracing.StartSpan(operationName) - } - - return span, opentracing.ContextWithSpan(ctx, span) -} - // rebuildIndexRanges will be called if there's correlated column in access conditions. We will rebuild the range // by substitute correlated column with the constant. func rebuildIndexRanges(ctx sessionctx.Context, is *plan.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) { @@ -298,9 +285,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { } func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { - span, ctx := startSpanFollowsContext(ctx, "executor.IndexReader.Open") - defer span.Finish() - var err error if e.corColInFilter { e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) @@ -403,9 +387,6 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaIndexLookupReader) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - span, ctx := startSpanFollowsContext(ctx, "executor.IndexLookUp.Open") - defer span.Finish() - e.finished = make(chan struct{}) e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) diff --git a/executor/trace.go b/executor/trace.go index 52051b07613c6..3363358169b6d 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -15,8 +15,8 @@ package executor import ( "fmt" - "time" + "bytes" "github.com/juju/errors" "github.com/opentracing/basictracer-go" opentracing "github.com/opentracing/opentracing-go" @@ -89,6 +89,17 @@ func (e *TraceExec) Open(ctx context.Context) error { return nil } +func getPrefix(idx int, suffix string, opName string) string { + var buf bytes.Buffer + for i := 0; i < 2*idx; i++ { + buf.WriteString(" ") + } + + buf.WriteString(suffix) + buf.WriteString(opName) + return buf.String() +} + // Next executes real query and collects span later. func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() @@ -105,19 +116,47 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { e.rootTrace.LogKV("event", "tracing completed") e.rootTrace.Finish() - var timeVal string - for i, sp := range e.CollectedSpans { - spanStartTime := sp.Start - for _, entry := range sp.Logs { - chk.AppendString(0, entry.Timestamp.Format(time.RFC3339)) - timeVal = entry.Timestamp.Sub(spanStartTime).String() - chk.AppendString(1, timeVal) - chk.AppendString(2, sp.Operation) - chk.AppendInt64(3, int64(i)) - chk.AppendString(4, entry.Fields[0].String()) + var rootSpan basictracer.RawSpan + treeSpans := make(map[uint64][]basictracer.RawSpan) + for _, sp := range e.CollectedSpans { + if spans, ok := treeSpans[sp.ParentSpanID]; ok { + treeSpans[sp.ParentSpanID] = append(spans, sp) + } else { + treeSpans[sp.ParentSpanID] = make([]basictracer.RawSpan, 0) + } + if sp.Context.SpanID == 0 { + rootSpan = sp } } - e.exhausted = true + // add root span here + dfsTree(rootSpan, treeSpans, "", chk) + e.exhausted = true return nil } + +func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, prefix string, chk *chunk.Chunk) { + // each span has a operation name, start time and duration. + // add two empty string to prefix + suffix := "" + hasChild := false + spans := tree[span.Context.SpanID] + if len(spans) > 0 { + hasChild = true + // prefix += "| " + suffix = "├─" + } else { + suffix = "└─" + } + fmt.Println("len of spans is ", len(spans)) + for _, sp := range spans { + chk.AppendString(0, prefix+suffix+sp.Operation) + chk.AppendString(1, sp.Duration.String()) + chk.AppendInt64(2, int64(sp.Context.SpanID)) + if hasChild { + dfsTree(sp, tree, prefix+"| ", chk) + } else { + dfsTree(sp, tree, prefix+" ", chk) + } + } +} diff --git a/plan/trace.go b/plan/trace.go index 9774dd1f31ec4..4a092f56ad155 100644 --- a/plan/trace.go +++ b/plan/trace.go @@ -27,13 +27,11 @@ func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { } p := &Trace{StmtPlan: optimizedP} - retFields := []string{"timestamp", "duration", "pos", "operation", "event"} + retFields := []string{"operation", "duration", "spanID"} schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) - schema.Append(buildColumn("", "timestamp", mysql.TypeString, mysql.MaxBlobWidth)) - schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth)) schema.Append(buildColumn("", "operation", mysql.TypeString, mysql.MaxBlobWidth)) - schema.Append(buildColumn("", "pos", mysql.TypeInt24, mysql.MaxBlobWidth)) - schema.Append(buildColumn("", "log", mysql.TypeString, mysql.MaxBlobWidth)) + schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth)) + schema.Append(buildColumn("", "spanID", mysql.TypeLong, mysql.MaxBlobWidth)) p.SetSchema(schema) return p, nil } diff --git a/session/session.go b/session/session.go index b0e6b918d760b..11f5641553d9a 100644 --- a/session/session.go +++ b/session/session.go @@ -29,7 +29,6 @@ import ( "github.com/juju/errors" "github.com/ngaut/pools" - "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" @@ -441,9 +440,6 @@ func (s *session) isRetryableError(err error) bool { } func (s *session) retry(ctx context.Context, maxCnt uint) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "retry") - defer span.Finish() - connID := s.sessionVars.ConnectionID if s.sessionVars.TxnCtx.ForUpdate { return errForUpdateCantRetry.GenByArgs(connID) @@ -535,10 +531,7 @@ func (s *session) sysSessionPool() *pools.ResourcePool { // Unlike normal Exec, it doesn't reset statement status, doesn't commit or rollback the current transaction // and doesn't write binlog. func (s *session) ExecRestrictedSQL(sctx sessionctx.Context, sql string) ([]chunk.Row, []*ast.ResultField, error) { - var span opentracing.Span ctx := context.TODO() - span, ctx = opentracing.StartSpanFromContext(ctx, "session.ExecRestrictedSQL") - defer span.Finish() // Use special session to execute the sql. tmp, err := s.sysSessionPool().Get() @@ -702,10 +695,6 @@ func (s *session) SetGlobalSysVar(name, value string) error { } func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string) ([]ast.StmtNode, error) { - if span := opentracing.SpanFromContext(ctx); span != nil { - span1 := opentracing.StartSpan("session.ParseSQL", opentracing.ChildOf(span.Context())) - defer span1.Finish() - } s.parser.SetSQLMode(s.sessionVars.SQLMode) return s.parser.Parse(sql, charset, collation) } @@ -760,11 +749,6 @@ func (s *session) Execute(ctx context.Context, sql string) (recordSets []ast.Rec } func (s *session) execute(ctx context.Context, sql string) (recordSets []ast.RecordSet, err error) { - if span := opentracing.SpanFromContext(ctx); span != nil { - span, ctx = opentracing.StartSpanFromContext(ctx, "session.Execute") - defer span.Finish() - } - s.PrepareTxnCtx(ctx) connID := s.sessionVars.ConnectionID err = s.loadCommonGlobalVariablesIfNeeded() diff --git a/session/tidb.go b/session/tidb.go index df6ce605d2a68..0a50470813ab3 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -24,7 +24,6 @@ import ( "time" "github.com/juju/errors" - "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -143,15 +142,10 @@ func Compile(ctx context.Context, sctx sessionctx.Context, stmtNode ast.StmtNode // runStmt executes the ast.Statement and commit or rollback the current transaction. func runStmt(ctx context.Context, sctx sessionctx.Context, s ast.Statement) (ast.RecordSet, error) { - span, ctx1 := opentracing.StartSpanFromContext(ctx, "runStmt") - span.LogKV("sql", s.OriginText()) - defer span.Finish() - var err error var rs ast.RecordSet se := sctx.(*session) rs, err = s.Exec(ctx) - span.SetTag("txn.id", se.sessionVars.TxnCtx.StartTS) // All the history should be added here. se.GetSessionVars().TxnCtx.StatementCount++ if !s.IsReadOnly() { @@ -169,17 +163,17 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s ast.Statement) (ast if !se.sessionVars.InTxn() { if err != nil { log.Info("RollbackTxn for ddl/autocommit error.") - err1 := se.RollbackTxn(ctx1) + err1 := se.RollbackTxn(ctx) terror.Log(errors.Trace(err1)) } else { - err = se.CommitTxn(ctx1) + err = se.CommitTxn(ctx) } } else { // If the user insert, insert, insert ... but never commit, TiDB would OOM. // So we limit the statement count in a transaction here. history := GetHistory(sctx) if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) { - err1 := se.RollbackTxn(ctx1) + err1 := se.RollbackTxn(ctx) terror.Log(errors.Trace(err1)) return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", history.Count(), sctx.GetSessionVars().IsAutocommit()) diff --git a/session/txn.go b/session/txn.go index 85a64cd77836e..575596b09eb89 100644 --- a/session/txn.go +++ b/session/txn.go @@ -15,7 +15,6 @@ package session import ( "github.com/juju/errors" - "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" @@ -232,12 +231,10 @@ func mergeToDirtyDB(dirtyDB *executor.DirtyDB, op dirtyTableOperation) { type txnFuture struct { future oracle.Future store kv.Storage - span opentracing.Span } func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() - tf.span.Finish() if err == nil { return tf.store.BeginWithStartTS(startTS) } @@ -247,10 +244,9 @@ func (tf *txnFuture) wait() (kv.Transaction, error) { } func (s *session) getTxnFuture(ctx context.Context) *txnFuture { - span, ctx := opentracing.StartSpanFromContext(ctx, "session.getTxnFuture") oracleStore := s.store.GetOracle() tsFuture := oracleStore.GetTimestampAsync(ctx) - return &txnFuture{tsFuture, s.store, span} + return &txnFuture{tsFuture, s.store} } // StmtCommit implements the sessionctx.Context interface. diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 71e5cff4cf114..3fa61f1b405aa 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -21,7 +21,6 @@ import ( "time" "github.com/juju/errors" - "github.com/opentracing/opentracing-go" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" @@ -585,20 +584,6 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { } }() - span := opentracing.SpanFromContext(ctx) - if span != nil { - span = opentracing.StartSpan("twoPhaseCommit.execute", opentracing.ChildOf(span.Context())) - } else { - // If we lost the trace information, make a new one for 2PC commit. - span = opentracing.StartSpan("twoPhaseCommit.execute") - } - defer span.Finish() - - // I'm not sure is it safe to cancel 2pc commit process at any time, - // So use a new Background() context instead of inherit the ctx, this is by design, - // to avoid the cancel signal from parent context. - ctx = opentracing.ContextWithSpan(context.Background(), span) - binlogChan := c.prewriteBinlog() err := c.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars), c.keys) if binlogChan != nil { diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 0549d619fcd36..f8f5fe78cc7dd 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -24,7 +24,6 @@ import ( "time" "github.com/juju/errors" - "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" @@ -406,11 +405,6 @@ const minLogCopTaskTime = 300 * time.Millisecond // run is a worker function that get a copTask from channel, handle it and // send the result back. func (worker *copIteratorWorker) run(ctx context.Context) { - if span := opentracing.SpanFromContext(ctx); span != nil { - span, ctx = opentracing.StartSpanFromContext(ctx, "copIteratorWorker.run") - defer span.Finish() - } - defer worker.wg.Done() for task := range worker.taskCh { respCh := worker.respChan diff --git a/util/tracing/noop_bench_test.go b/util/tracing/noop_bench_test.go index 4216d501c4a69..98394d511e804 100644 --- a/util/tracing/noop_bench_test.go +++ b/util/tracing/noop_bench_test.go @@ -17,6 +17,7 @@ package tracing import ( "testing" "context" + "fmt" ) @@ -30,7 +31,7 @@ func BenchmarkNoopLogKV(b *testing.B) { func BenchmarkNoopLogKVWithF(b *testing.B) { sp := noopSpan() for i := 0; i < b.N; i++ { - sp.LogKV("event", "this is format %s", "noop is finished") + sp.LogKV("event", fmt.Sprintf("this is format %s", "noop is finished")) } } diff --git a/util/tracing/trace_span.go b/util/tracing/trace_span.go new file mode 100644 index 0000000000000..261158da329ba --- /dev/null +++ b/util/tracing/trace_span.go @@ -0,0 +1,17 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +type spanGroup struct { +} From 93b420afc794786913db5487b3d64ea4f23a450b Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 23 Aug 2018 13:54:09 +0800 Subject: [PATCH 04/23] rename file and add tree relationship tesdt --- util/tracing/{tracer.go => util.go} | 7 ++-- util/tracing/{tracer_test.go => util_test.go} | 37 +++++++++++++++++-- 2 files changed, 36 insertions(+), 8 deletions(-) rename util/tracing/{tracer.go => util.go} (92%) rename util/tracing/{tracer_test.go => util_test.go} (65%) diff --git a/util/tracing/tracer.go b/util/tracing/util.go similarity index 92% rename from util/tracing/tracer.go rename to util/tracing/util.go index abab0cc599f6c..f22be09d6f67d 100644 --- a/util/tracing/tracer.go +++ b/util/tracing/util.go @@ -55,13 +55,12 @@ func SpanFromContext(ctx context.Context) (sp opentracing.Span) { // ChildSpanFromContxt return a non-nil span. If span can be got from ctx, then returned span is // a child of such span. Otherwise, returned span is a noop span. -func ChildSpanFromContxt(ctx context.Context, opName string) (sp opentracing.Span) { +func ChildSpanFromContxt(ctx context.Context, opName string) (opentracing.Span, context.Context) { if sp := opentracing.SpanFromContext(ctx); sp != nil { if _, ok := sp.Tracer().(opentracing.NoopTracer); !ok { child := opentracing.StartSpan(opName, opentracing.ChildOf(sp.Context())) - opentracing.ContextWithSpan(ctx, child) - return child + return child, opentracing.ContextWithSpan(ctx, child) } } - return noopSpan() + return noopSpan(), ctx } diff --git a/util/tracing/tracer_test.go b/util/tracing/util_test.go similarity index 65% rename from util/tracing/tracer_test.go rename to util/tracing/util_test.go index 150aab02ba61b..4e6472b571fca 100644 --- a/util/tracing/tracer_test.go +++ b/util/tracing/util_test.go @@ -34,7 +34,7 @@ type testTraceSuite struct { } func (s *testTraceSuite) TestSpanFromContext(c *C) { - ctx := context.Background() + ctx := context.TODO() noopSp := tracing.SpanFromContext(ctx) // test noop span _, ok := noopSp.Tracer().(opentracing.NoopTracer) @@ -56,8 +56,8 @@ func (s *testTraceSuite) TestSpanFromContext(c *C) { } func (s *testTraceSuite) TestChildSpanFromContext(c *C) { - ctx := context.Background() - noopSp := tracing.ChildSpanFromContxt(ctx, "") + ctx := context.TODO() + noopSp, _ := tracing.ChildSpanFromContxt(ctx, "") _, ok := noopSp.Tracer().(opentracing.NoopTracer) c.Assert(ok, IsTrue) @@ -68,7 +68,7 @@ func (s *testTraceSuite) TestChildSpanFromContext(c *C) { }) sp.Finish() opentracing.ContextWithSpan(ctx, sp) - child := tracing.ChildSpanFromContxt(ctx, "test_child") + child, _ := tracing.ChildSpanFromContxt(ctx, "test_child") child.Finish() // verify second span's operation is not nil, this way we can ensure @@ -76,3 +76,32 @@ func (s *testTraceSuite) TestChildSpanFromContext(c *C) { c.Assert(collectedSpan[1].Operation, NotNil) } + +func (s *testTraceSuite) TestTreeRelationship(c *C) { + var collectedSpans []basictracer.RawSpan + ctx := context.TODO() + // first start a root span + sp1 := tracing.NewRecordedTrace("test", func(sp basictracer.RawSpan) { + collectedSpans = append(collectedSpans, sp) + }) + + // then store such root span into context + ctx = opentracing.ContextWithSpan(ctx, sp1) + + // create children span from context + sp2, ctx := tracing.ChildSpanFromContxt(ctx, "parent") + sp3, _ := tracing.ChildSpanFromContxt(ctx, "child") + + // notify span that we are about to reach end of journey. + sp1.Finish() + sp2.Finish() + sp3.Finish() + + // ensure length of collectedSpans is greather than 0 + c.Assert(len(collectedSpans), Greater, 0) + if len(collectedSpans) > 0 { + c.Assert(collectedSpans[0].Operation, Equals, "test") + c.Assert(collectedSpans[1].Operation, Equals, "parent") + c.Assert(collectedSpans[2].Operation, Equals, "child") + } +} From b8abe7e6e63621337210edd6e29967049d70c5ce Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 23 Aug 2018 18:33:30 +0800 Subject: [PATCH 05/23] tree format --- distsql/distsql.go | 4 -- executor/aggregate.go | 25 +----------- executor/index_lookup_join.go | 6 --- executor/join.go | 7 ---- executor/projection.go | 7 ---- executor/show.go | 24 +++++------ executor/table_reader.go | 12 ++---- executor/trace.go | 71 ++++++++++++++------------------- plan/trace.go | 4 +- store/tikv/backoff.go | 4 +- util/tracing/noop_bench_test.go | 18 +++++---- util/tracing/trace_span.go | 17 -------- util/tracing/util_test.go | 4 ++ 13 files changed, 64 insertions(+), 139 deletions(-) delete mode 100644 util/tracing/trace_span.go diff --git a/distsql/distsql.go b/distsql/distsql.go index 1dc51d8667e43..1c74fba7b5ff4 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/tracing" "golang.org/x/net/context" ) @@ -36,9 +35,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie hook.(func(*kv.Request))(kvReq) } - child := tracing.ChildSpanFromContxt(ctx, "distsql_select") - defer child.Finish() - if !sctx.GetSessionVars().EnableStreaming { kvReq.Streaming = false } diff --git a/executor/aggregate.go b/executor/aggregate.go index 572587edcdd26..a2b0c21c46ff3 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -16,10 +16,7 @@ package executor import ( "sync" - "fmt" - "github.com/juju/errors" - "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -30,7 +27,6 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/mvmap" - "github.com/pingcap/tidb/util/tracing" "github.com/spaolacci/murmur3" "golang.org/x/net/context" ) @@ -68,8 +64,6 @@ type HashAggPartialWorker struct { // chk stores the input data from child, // and is reused by childExec and partial worker. chk *chunk.Chunk - - trace opentracing.Span } // HashAggFinalWorker indicates the final workers of parallel hash agg execution, @@ -85,8 +79,6 @@ type HashAggFinalWorker struct { inputCh chan *HashAggIntermData outputCh chan *AfFinalResult finalResultHolderCh chan *chunk.Chunk - - trace opentracing.Span } // AfFinalResult indicates aggregation functions final result. @@ -359,14 +351,12 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG if needShuffle { w.shuffleIntermData(sc, finalConcurrency) } - w.trace.Finish() waitGroup.Done() }() for { if !w.getChildInput() { return } - w.trace.LogKV("event", "update partial result") if err := w.updatePartialResult(ctx, sc, w.chk, len(w.aggCtxsMap)); err != nil { w.globalOutputCh <- &AfFinalResult{err: errors.Trace(err)} return @@ -379,7 +369,6 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) { inputIter := chunk.NewIterator4Chunk(chk) - w.trace.LogKV("event", "iterating chunk's data") for row := inputIter.Begin(); row != inputIter.End(); row = inputIter.Next() { groupKey, err := w.getGroupKey(sc, row) if err != nil { @@ -399,7 +388,6 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s // We only support parallel execution for single-machine, so process of encode and decode can be skipped. func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) { groupKeysSlice := make([][][]byte, finalConcurrency) - w.trace.LogKV("event", "shuffling interm data") for groupKey := range w.aggCtxsMap { groupKeyBytes := []byte(groupKey) finalWorkerIdx := int(murmur3.Sum32(groupKeyBytes)) % finalConcurrency @@ -469,7 +457,6 @@ func (w *HashAggFinalWorker) consumeIntermData(sc *stmtctx.StatementContext) (er ok bool intermDataRowsBuffer [][]types.Datum ) - w.trace.LogKV("event", "consuming interm data") for { if input, ok = w.getPartialInput(); !ok { return nil @@ -540,7 +527,6 @@ func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) { func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) { defer func() { - w.trace.Finish() waitGroup.Done() }() sc := ctx.GetSessionVars().StmtCtx @@ -553,14 +539,11 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro // Next implements the Executor Next interface. func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() - child := tracing.ChildSpanFromContxt(ctx, "hash_agg_exec") - defer child.Finish() if e.isUnparallelExec { return errors.Trace(e.unparallelExec(ctx, chk)) } - err := e.parallelExec(ctx, chk) - return errors.Trace(err) + return errors.Trace(e.parallelExec(ctx, chk)) } func (e *HashAggExec) fetchChildData(ctx context.Context) { @@ -614,9 +597,7 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) { partialWorkerWaitGroup := &sync.WaitGroup{} partialWorkerWaitGroup.Add(len(e.partialWorkers)) - for i := range e.partialWorkers { - e.partialWorkers[i].trace = tracing.ChildSpanFromContxt(ctx, fmt.Sprintf("hash_agg_exec_partial_worker_%d", i)) go e.partialWorkers[i].run(e.ctx, partialWorkerWaitGroup, len(e.finalWorkers)) } go e.waitPartialWorkerAndCloseOutputChs(partialWorkerWaitGroup) @@ -624,7 +605,6 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) { finalWorkerWaitGroup := &sync.WaitGroup{} finalWorkerWaitGroup.Add(len(e.finalWorkers)) for i := range e.finalWorkers { - e.finalWorkers[i].trace = tracing.ChildSpanFromContxt(ctx, fmt.Sprintf("hash_agg_exec_final_worker_%d", i)) go e.finalWorkers[i].run(e.ctx, finalWorkerWaitGroup) } go e.waitFinalWorkerAndCloseFinalOutput(finalWorkerWaitGroup) @@ -814,8 +794,6 @@ func (e *StreamAggExec) Close() error { func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() - child := tracing.ChildSpanFromContxt(ctx, "stream_agg_exec") - defer child.Finish() for !e.executed && chk.NumRows() < e.maxChunkSize { err := e.consumeOneGroup(ctx, chk) if err != nil { @@ -823,7 +801,6 @@ func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { return errors.Trace(err) } } - child.LogKV("event", "StreamAggExec is finished") return nil } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 1470843d03743..92378d80a1e07 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" "github.com/pingcap/tidb/util/ranger" - "github.com/pingcap/tidb/util/tracing" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -190,11 +189,6 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork // Next implements the Executor interface. func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error { - sp := tracing.ChildSpanFromContxt(ctx, "index_lookup_join") - defer func() { - sp.LogKV("event", "index lookup join is finished.") - sp.Finish() - }() chk.Reset() e.joinResult.Reset() for { diff --git a/executor/join.go b/executor/join.go index 475bdc00713f0..8eb1b953c5691 100644 --- a/executor/join.go +++ b/executor/join.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" - "github.com/pingcap/tidb/util/tracing" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -185,10 +184,7 @@ func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyB // fetchOuterChunks get chunks from fetches chunks from the big table in a background goroutine // and sends the chunks to multiple channels which will be read by multiple join workers. func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) { - sp := tracing.SpanFromContext(ctx) defer func() { - sp.LogKV("event", "finishing fetchOuterChunks") - sp.Finish() for i := range e.outerResultChs { close(e.outerResultChs[i]) } @@ -475,8 +471,6 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu // step 1. fetch data from inner child and build a hash table; // step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers. func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) { - sp := tracing.ChildSpanFromContxt(ctx, "hash_join_exec") - defer sp.Finish() if !e.prepared { e.innerFinished = make(chan error, 1) go e.fetchInnerAndBuildHashTable(ctx) @@ -497,7 +491,6 @@ func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) { } chk.SwapColumns(result.chk) result.src <- result.chk - sp.LogKV("event", "hash join exec is finished") return nil } diff --git a/executor/projection.go b/executor/projection.go index 4aca4f6875828..8df62681283a3 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/tracing" "golang.org/x/net/context" ) @@ -62,7 +61,6 @@ type ProjectionExec struct { // Open implements the Executor Open interface. func (e *ProjectionExec) Open(ctx context.Context) error { - _ = tracing.ChildSpanFromContxt(ctx, "projection_exec") if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } @@ -141,11 +139,6 @@ func (e *ProjectionExec) Open(ctx context.Context) error { // +------------------------------+ +----------------------+ // func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { - sp := tracing.ChildSpanFromContxt(ctx, "projection_exec") - defer func() { - sp.LogKV("event", "projection_exec is finished") - sp.Finish() - }() chk.Reset() if e.isUnparallelExec() { return errors.Trace(e.unParallelExecute(ctx, chk)) diff --git a/executor/show.go b/executor/show.go index 6f074811dedd9..ac7de894ba0ab 100644 --- a/executor/show.go +++ b/executor/show.go @@ -356,19 +356,19 @@ func (e *ShowExec) fetchShowIndex() error { subPart = col.Length } e.appendRow([]interface{}{ - tb.Meta().Name.O, // Table - nonUniq, // Non_unique - idx.Meta().Name.O, // Key_name - i + 1, // Seq_in_index - col.Name.O, // Column_name - "A", // Collation - 0, // Cardinality - subPart, // Sub_part - nil, // Packed - "YES", // Null + tb.Meta().Name.O, // Table + nonUniq, // Non_unique + idx.Meta().Name.O, // Key_name + i + 1, // Seq_in_index + col.Name.O, // Column_name + "A", // Collation + 0, // Cardinality + subPart, // Sub_part + nil, // Packed + "YES", // Null idx.Meta().Tp.String(), // Index_type - "", // Comment - idx.Meta().Comment, // Index_comment + "", // Comment + idx.Meta().Comment, // Index_comment }) } } diff --git a/executor/table_reader.go b/executor/table_reader.go index 2278d739740df..45d977121701f 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/ranger" - "github.com/pingcap/tidb/util/tracing" tipb "github.com/pingcap/tipb/go-tipb" "golang.org/x/net/context" ) @@ -58,9 +57,6 @@ type TableReaderExecutor struct { // Open initialzes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { - child := tracing.ChildSpanFromContxt(ctx, "table_reader_exec") - defer child.Finish() - var err error if e.corColInFilter { e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) @@ -102,11 +98,11 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error { - err := e.resultHandler.nextChunk(ctx, chk) - if err != nil { + if err := e.resultHandler.nextChunk(ctx, chk); err != nil { e.feedback.Invalidate() + return err } - return errors.Trace(err) + return errors.Trace(nil) } // Close implements the Executor Close interface. @@ -116,7 +112,7 @@ func (e *TableReaderExecutor) Close() error { return errors.Trace(err) } -// buildResp first build request and send it to tikv using distsql.Select. It uses SelectResut returned by the callee +// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder diff --git a/executor/trace.go b/executor/trace.go index 3363358169b6d..f3ad95dffd135 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -15,8 +15,8 @@ package executor import ( "fmt" + "time" - "bytes" "github.com/juju/errors" "github.com/opentracing/basictracer-go" opentracing "github.com/opentracing/opentracing-go" @@ -27,8 +27,6 @@ import ( "golang.org/x/net/context" ) -var traceColumns = append([]*types.FieldType{}) - // TraceExec represents a root executor of trace query. type TraceExec struct { baseExecutor @@ -53,10 +51,11 @@ func (b *executorBuilder) buildTrace(v *plan.Trace) Executor { } pp, _ := v.StmtPlan.(plan.PhysicalPlan) + fmt.Println(pp.ExplainInfo()) e.children = make([]Executor, 0, len(pp.Children())) for _, child := range pp.Children() { switch p := child.(type) { - case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg: + case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg, *plan.PhysicalSort: e.children = append(e.children, b.build(p)) default: panic(fmt.Sprintf("%v is not supported", child)) @@ -74,7 +73,8 @@ func (e *TraceExec) Open(ctx context.Context) error { e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) { e.CollectedSpans = append(e.CollectedSpans, sp) }) - ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) + // we actually don't care when underlying executor started. We only care how + // much time was spent for _, child := range e.children { err := child.Open(ctx) if err != nil { @@ -89,17 +89,6 @@ func (e *TraceExec) Open(ctx context.Context) error { return nil } -func getPrefix(idx int, suffix string, opName string) string { - var buf bytes.Buffer - for i := 0; i < 2*idx; i++ { - buf.WriteString(" ") - } - - buf.WriteString(suffix) - buf.WriteString(opName) - return buf.String() -} - // Next executes real query and collects span later. func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() @@ -107,6 +96,7 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { return nil } + // store span into context ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) if len(e.children) > 0 { if err := e.children[0].Next(ctx, e.childrenResults[0]); err != nil { @@ -117,46 +107,43 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { e.rootTrace.LogKV("event", "tracing completed") e.rootTrace.Finish() var rootSpan basictracer.RawSpan + treeSpans := make(map[uint64][]basictracer.RawSpan) for _, sp := range e.CollectedSpans { - if spans, ok := treeSpans[sp.ParentSpanID]; ok { - treeSpans[sp.ParentSpanID] = append(spans, sp) - } else { - treeSpans[sp.ParentSpanID] = make([]basictracer.RawSpan, 0) - } - if sp.Context.SpanID == 0 { + treeSpans[sp.ParentSpanID] = append(treeSpans[sp.ParentSpanID], sp) + // if a span's parentSpanID is 0, then it is root span + // this is by design + if sp.ParentSpanID == 0 { rootSpan = sp } } - // add root span here - dfsTree(rootSpan, treeSpans, "", chk) + dfsTree(rootSpan, treeSpans, "", false, chk) e.exhausted = true return nil } -func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, prefix string, chk *chunk.Chunk) { - // each span has a operation name, start time and duration. - // add two empty string to prefix +func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, prefix string, isLast bool, chk *chunk.Chunk) { suffix := "" - hasChild := false spans := tree[span.Context.SpanID] - if len(spans) > 0 { - hasChild = true - // prefix += "| " - suffix = "├─" + var newPrefix string + if span.ParentSpanID == 0 { + newPrefix = prefix } else { - suffix = "└─" - } - fmt.Println("len of spans is ", len(spans)) - for _, sp := range spans { - chk.AppendString(0, prefix+suffix+sp.Operation) - chk.AppendString(1, sp.Duration.String()) - chk.AppendInt64(2, int64(sp.Context.SpanID)) - if hasChild { - dfsTree(sp, tree, prefix+"| ", chk) + if len(tree[span.ParentSpanID]) > 0 && !isLast { + suffix = "├─" + newPrefix = prefix + "│ " } else { - dfsTree(sp, tree, prefix+" ", chk) + suffix = "└─" + newPrefix = prefix + " " } } + + chk.AppendString(0, prefix+suffix+span.Operation) + chk.AppendString(1, span.Start.Format(time.StampNano)) + chk.AppendString(2, span.Duration.String()) + + for i, sp := range spans { + dfsTree(sp, tree, newPrefix, i == (len(spans))-1 /*last element of array*/, chk) + } } diff --git a/plan/trace.go b/plan/trace.go index 4a092f56ad155..adb9641d1fde9 100644 --- a/plan/trace.go +++ b/plan/trace.go @@ -21,6 +21,7 @@ func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { if _, ok := trace.Stmt.(*ast.SelectStmt); !ok { return nil, errors.New("trace only supports select query") } + optimizedP, err := Optimize(b.ctx, trace.Stmt, b.is) if err != nil { return nil, errors.New("fail to optimize during build trace") @@ -30,8 +31,9 @@ func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { retFields := []string{"operation", "duration", "spanID"} schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) schema.Append(buildColumn("", "operation", mysql.TypeString, mysql.MaxBlobWidth)) + + schema.Append(buildColumn("", "startTS", mysql.TypeString, mysql.MaxBlobWidth)) schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth)) - schema.Append(buildColumn("", "spanID", mysql.TypeLong, mysql.MaxBlobWidth)) p.SetSchema(schema) return p, nil } diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 62e2f25abdc15..57ee87d9fcdb3 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/terror" - "github.com/pingcap/tidb/util/tracing" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -212,8 +211,7 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { default: } - child := tracing.ChildSpanFromContxt(b.ctx, "backoffer") - defer child.Finish() + b.ctx = ctx metrics.TiKVBackoffCounter.WithLabelValues(typ.String()).Inc() // Lazy initialize. if b.fn == nil { diff --git a/util/tracing/noop_bench_test.go b/util/tracing/noop_bench_test.go index 98394d511e804..76b940231c195 100644 --- a/util/tracing/noop_bench_test.go +++ b/util/tracing/noop_bench_test.go @@ -11,41 +11,43 @@ // See the License for the specific language governing permissions and // limitations under the License. - package tracing import ( - "testing" "context" "fmt" + "testing" ) - +// BenchmarkNoopLogKV benchs the cost of noop's `LogKV`. func BenchmarkNoopLogKV(b *testing.B) { sp := noopSpan() - for i := 0; i < b.N; i++ { + for i := 0; i < b.N; i++ { sp.LogKV("event", "noop is finished") } } +// BenchmarkNoopLogKVWithF benchs the the cosst of noop's `LogKV` when +// used with `fmt.Sprintf` func BenchmarkNoopLogKVWithF(b *testing.B) { sp := noopSpan() - for i := 0; i < b.N; i++ { + for i := 0; i < b.N; i++ { sp.LogKV("event", fmt.Sprintf("this is format %s", "noop is finished")) } } - +// BenchmarkSpanFromContext benchs the cost of `SpanFromContext`. func BenchmarkSpanFromContext(b *testing.B) { ctx := context.TODO() - for i := 0 ; i < b.N; i++ { + for i := 0; i < b.N; i++ { SpanFromContext(ctx) } } +// BenchmarkChildFromContext benchs the cost of `ChildSpanFromContxt`. func BenchmarkChildFromContext(b *testing.B) { ctx := context.TODO() - for i := 0 ; i < b.N; i++ { + for i := 0; i < b.N; i++ { ChildSpanFromContxt(ctx, "child") } } diff --git a/util/tracing/trace_span.go b/util/tracing/trace_span.go deleted file mode 100644 index 261158da329ba..0000000000000 --- a/util/tracing/trace_span.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tracing - -type spanGroup struct { -} diff --git a/util/tracing/util_test.go b/util/tracing/util_test.go index 4e6472b571fca..ede39506407d4 100644 --- a/util/tracing/util_test.go +++ b/util/tracing/util_test.go @@ -103,5 +103,9 @@ func (s *testTraceSuite) TestTreeRelationship(c *C) { c.Assert(collectedSpans[0].Operation, Equals, "test") c.Assert(collectedSpans[1].Operation, Equals, "parent") c.Assert(collectedSpans[2].Operation, Equals, "child") + // check tree relationship + // sp1 is parent of sp2. And sp2 is parent of sp3. + c.Assert(collectedSpans[1].ParentSpanID, Equals, collectedSpans[0].Context.SpanID) + c.Assert(collectedSpans[2].ParentSpanID, Equals, collectedSpans[1].Context.SpanID) } } From 7a9d6db86211bc52ae580a97a932089c24f601fc Mon Sep 17 00:00:00 2001 From: zhexuany Date: Wed, 29 Aug 2018 15:40:47 +0800 Subject: [PATCH 06/23] remvoe useless code --- distsql/distsql.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/distsql/distsql.go b/distsql/distsql.go index 1c74fba7b5ff4..45f8665fac2a7 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -38,7 +38,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie if !sctx.GetSessionVars().EnableStreaming { kvReq.Streaming = false } - resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars) if resp == nil { err := errors.New("client returns nil response") @@ -55,7 +54,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie }, nil } - child.LogKV("event", "finished sending rpc calls") return &selectResult{ label: "dag", resp: resp, From b46a7278db5c50f24a2524ee230333f7e8be7ddb Mon Sep 17 00:00:00 2001 From: zhexuany Date: Wed, 29 Aug 2018 15:46:33 +0800 Subject: [PATCH 07/23] fix fmt issue --- executor/aggregate.go | 1 - executor/trace.go | 1 - infoschema/tables.go | 56 +++--- parser/misc.go | 392 +++++++++++++++++++++--------------------- store/tikv/backoff.go | 3 - 5 files changed, 224 insertions(+), 229 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index a2b0c21c46ff3..5a1613a31768e 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -539,7 +539,6 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro // Next implements the Executor Next interface. func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() - if e.isUnparallelExec { return errors.Trace(e.unparallelExec(ctx, chk)) } diff --git a/executor/trace.go b/executor/trace.go index f3ad95dffd135..6c2c8101e1a1d 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -21,7 +21,6 @@ import ( "github.com/opentracing/basictracer-go" opentracing "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/plan" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/tracing" "golang.org/x/net/context" diff --git a/infoschema/tables.go b/infoschema/tables.go index 02346da318ec8..20fa0958436c7 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -837,19 +837,19 @@ func dataForColumnsInTable(schema *model.DBInfo, tbl *model.TableInfo) [][]types columnDefault, // COLUMN_DEFAULT columnDesc.Null, // IS_NULLABLE types.TypeToStr(col.Tp, col.Charset), // DATA_TYPE - colLen, // CHARACTER_MAXIMUM_LENGTH - colLen, // CHARACTER_OCTET_LENGTH - decimal, // NUMERIC_PRECISION - 0, // NUMERIC_SCALE - 0, // DATETIME_PRECISION - col.Charset, // CHARACTER_SET_NAME - col.Collate, // COLLATION_NAME - columnType, // COLUMN_TYPE - columnDesc.Key, // COLUMN_KEY - columnDesc.Extra, // EXTRA - "select,insert,update,references", // PRIVILEGES - columnDesc.Comment, // COLUMN_COMMENT - col.GeneratedExprString, // GENERATION_EXPRESSION + colLen, // CHARACTER_MAXIMUM_LENGTH + colLen, // CHARACTER_OCTET_LENGTH + decimal, // NUMERIC_PRECISION + 0, // NUMERIC_SCALE + 0, // DATETIME_PRECISION + col.Charset, // CHARACTER_SET_NAME + col.Collate, // COLLATION_NAME + columnType, // COLUMN_TYPE + columnDesc.Key, // COLUMN_KEY + columnDesc.Extra, // EXTRA + "select,insert,update,references", // PRIVILEGES + columnDesc.Comment, // COLUMN_COMMENT + col.GeneratedExprString, // GENERATION_EXPRESSION ) // In mysql, 'character_set_name' and 'collation_name' are setted to null when column type is non-varchar or non-blob in information_schema. if col.Tp != mysql.TypeVarchar && col.Tp != mysql.TypeBlob { @@ -993,24 +993,24 @@ func dataForTableConstraints(schemas []*model.DBInfo) [][]types.Datum { func dataForPseudoProfiling() [][]types.Datum { var rows [][]types.Datum row := types.MakeDatums( - 0, // QUERY_ID - 0, // SEQ - "", // STATE + 0, // QUERY_ID + 0, // SEQ + "", // STATE types.NewDecFromInt(0), // DURATION types.NewDecFromInt(0), // CPU_USER types.NewDecFromInt(0), // CPU_SYSTEM - 0, // CONTEXT_VOLUNTARY - 0, // CONTEXT_INVOLUNTARY - 0, // BLOCK_OPS_IN - 0, // BLOCK_OPS_OUT - 0, // MESSAGES_SENT - 0, // MESSAGES_RECEIVED - 0, // PAGE_FAULTS_MAJOR - 0, // PAGE_FAULTS_MINOR - 0, // SWAPS - 0, // SOURCE_FUNCTION - 0, // SOURCE_FILE - 0, // SOURCE_LINE + 0, // CONTEXT_VOLUNTARY + 0, // CONTEXT_INVOLUNTARY + 0, // BLOCK_OPS_IN + 0, // BLOCK_OPS_OUT + 0, // MESSAGES_SENT + 0, // MESSAGES_RECEIVED + 0, // PAGE_FAULTS_MAJOR + 0, // PAGE_FAULTS_MINOR + 0, // SWAPS + 0, // SOURCE_FUNCTION + 0, // SOURCE_FILE + 0, // SOURCE_LINE ) rows = append(rows, row) return rows diff --git a/parser/misc.go b/parser/misc.go index e0183805bccc0..e4eb3a2c441ee 100644 --- a/parser/misc.go +++ b/parser/misc.go @@ -133,202 +133,202 @@ func init() { } var tokenMap = map[string]int{ - "ACTION": action, - "ADD": add, - "ADDDATE": addDate, - "ADMIN": admin, - "AFTER": after, - "ALL": all, - "ALGORITHM": algorithm, - "ALTER": alter, - "ALWAYS": always, - "ANALYZE": analyze, - "AND": and, - "ANY": any, - "AS": as, - "ASC": asc, - "ASCII": ascii, - "AUTO_INCREMENT": autoIncrement, - "AVG": avg, - "AVG_ROW_LENGTH": avgRowLength, - "BEGIN": begin, - "BETWEEN": between, - "BIGINT": bigIntType, - "BINARY": binaryType, - "BINLOG": binlog, - "BIT": bitType, - "BIT_AND": bitAnd, - "BIT_OR": bitOr, - "BIT_XOR": bitXor, - "BLOB": blobType, - "BOOL": boolType, - "BOOLEAN": booleanType, - "BOTH": both, - "BTREE": btree, - "BY": by, - "BYTE": byteType, - "CANCEL": cancel, - "CASCADE": cascade, - "CASCADED": cascaded, - "CASE": caseKwd, - "CAST": cast, - "CHANGE": change, - "CHAR": charType, - "CHARACTER": character, - "CHARSET": charsetKwd, - "CHECK": check, - "CHECKSUM": checksum, - "CLEANUP": cleanup, - "CLIENT": client, - "COALESCE": coalesce, - "COLLATE": collate, - "COLLATION": collation, - "COLUMN": column, - "COLUMNS": columns, - "COMMENT": comment, - "COMMIT": commit, - "COMMITTED": committed, - "COMPACT": compact, - "COMPRESSED": compressed, - "COMPRESSION": compression, - "CONNECTION": connection, - "CONSISTENT": consistent, - "CONSTRAINT": constraint, - "CONVERT": convert, - "COPY": copyKwd, - "COUNT": count, - "CREATE": create, - "CROSS": cross, - "CURRENT_DATE": currentDate, - "CURRENT_TIME": currentTime, - "CURRENT_TIMESTAMP": currentTs, - "CURRENT_USER": currentUser, - "CURTIME": curTime, - "DATA": data, - "DATABASE": database, - "DATABASES": databases, - "DATE": dateType, - "DATE_ADD": dateAdd, - "DATE_SUB": dateSub, - "DATETIME": datetimeType, - "DAY": day, - "DAY_HOUR": dayHour, - "DAY_MICROSECOND": dayMicrosecond, - "DAY_MINUTE": dayMinute, - "DAY_SECOND": daySecond, - "DDL": ddl, - "DEALLOCATE": deallocate, - "DEC": decimalType, - "DECIMAL": decimalType, - "DEFAULT": defaultKwd, - "DEFINER": definer, - "DELAY_KEY_WRITE": delayKeyWrite, - "DELAYED": delayed, - "DELETE": deleteKwd, - "DESC": desc, - "DESCRIBE": describe, - "DISABLE": disable, - "DISTINCT": distinct, - "DISTINCTROW": distinct, - "DIV": div, - "DO": do, - "DOUBLE": doubleType, - "DROP": drop, - "DUAL": dual, - "DUPLICATE": duplicate, - "DYNAMIC": dynamic, - "ELSE": elseKwd, - "ENABLE": enable, - "ENCLOSED": enclosed, - "END": end, - "ENGINE": engine, - "ENGINES": engines, - "ENUM": enum, - "ESCAPE": escape, - "ESCAPED": escaped, - "EVENT": event, - "EVENTS": events, - "EXCLUSIVE": exclusive, - "EXECUTE": execute, - "EXISTS": exists, - "EXPLAIN": explain, - "EXTRACT": extract, - "FALSE": falseKwd, - "FIELDS": fields, - "FIRST": first, - "FIXED": fixed, - "FLOAT": floatType, - "FLUSH": flush, - "FOR": forKwd, - "FORCE": force, - "FOREIGN": foreign, - "FORMAT": format, - "FROM": from, - "FULL": full, - "FULLTEXT": fulltext, - "FUNCTION": function, - "GENERATED": generated, - "GET_FORMAT": getFormat, - "GLOBAL": global, - "GRANT": grant, - "GRANTS": grants, - "GROUP": group, - "GROUP_CONCAT": groupConcat, - "HASH": hash, - "HAVING": having, - "HIGH_PRIORITY": highPriority, - "HOUR": hour, - "HOUR_MICROSECOND": hourMicrosecond, - "HOUR_MINUTE": hourMinute, - "HOUR_SECOND": hourSecond, - "IDENTIFIED": identified, - "IF": ifKwd, - "IGNORE": ignore, - "IN": in, - "INDEX": index, - "INDEXES": indexes, - "INFILE": infile, - "INNER": inner, - "INPLACE": inplace, - "INSERT": insert, - "INT": intType, - "INT1": int1Type, - "INT2": int2Type, - "INT3": int3Type, - "INT4": int4Type, - "INT8": int8Type, - "INTEGER": integerType, - "INTERVAL": interval, - "INTO": into, - "INVOKER": invoker, - "IS": is, - "ISOLATION": isolation, - "JOBS": jobs, - "JOB": job, - "JOIN": join, - "JSON": jsonType, - "KEY": key, - "KEY_BLOCK_SIZE": keyBlockSize, - "KEYS": keys, - "KILL": kill, - "LEADING": leading, - "LEFT": left, - "LESS": less, - "LEVEL": level, - "LIKE": like, - "LIMIT": limit, - "LINES": lines, - "LOAD": load, - "LOCAL": local, - "LOCALTIME": localTime, - "LOCALTIMESTAMP": localTs, - "LOCK": lock, - "LONG": long, - "LONGBLOB": longblobType, - "LONGTEXT": longtextType, - "LOW_PRIORITY": lowPriority, - "MASTER": master, - "MAX": max, + "ACTION": action, + "ADD": add, + "ADDDATE": addDate, + "ADMIN": admin, + "AFTER": after, + "ALL": all, + "ALGORITHM": algorithm, + "ALTER": alter, + "ALWAYS": always, + "ANALYZE": analyze, + "AND": and, + "ANY": any, + "AS": as, + "ASC": asc, + "ASCII": ascii, + "AUTO_INCREMENT": autoIncrement, + "AVG": avg, + "AVG_ROW_LENGTH": avgRowLength, + "BEGIN": begin, + "BETWEEN": between, + "BIGINT": bigIntType, + "BINARY": binaryType, + "BINLOG": binlog, + "BIT": bitType, + "BIT_AND": bitAnd, + "BIT_OR": bitOr, + "BIT_XOR": bitXor, + "BLOB": blobType, + "BOOL": boolType, + "BOOLEAN": booleanType, + "BOTH": both, + "BTREE": btree, + "BY": by, + "BYTE": byteType, + "CANCEL": cancel, + "CASCADE": cascade, + "CASCADED": cascaded, + "CASE": caseKwd, + "CAST": cast, + "CHANGE": change, + "CHAR": charType, + "CHARACTER": character, + "CHARSET": charsetKwd, + "CHECK": check, + "CHECKSUM": checksum, + "CLEANUP": cleanup, + "CLIENT": client, + "COALESCE": coalesce, + "COLLATE": collate, + "COLLATION": collation, + "COLUMN": column, + "COLUMNS": columns, + "COMMENT": comment, + "COMMIT": commit, + "COMMITTED": committed, + "COMPACT": compact, + "COMPRESSED": compressed, + "COMPRESSION": compression, + "CONNECTION": connection, + "CONSISTENT": consistent, + "CONSTRAINT": constraint, + "CONVERT": convert, + "COPY": copyKwd, + "COUNT": count, + "CREATE": create, + "CROSS": cross, + "CURRENT_DATE": currentDate, + "CURRENT_TIME": currentTime, + "CURRENT_TIMESTAMP": currentTs, + "CURRENT_USER": currentUser, + "CURTIME": curTime, + "DATA": data, + "DATABASE": database, + "DATABASES": databases, + "DATE": dateType, + "DATE_ADD": dateAdd, + "DATE_SUB": dateSub, + "DATETIME": datetimeType, + "DAY": day, + "DAY_HOUR": dayHour, + "DAY_MICROSECOND": dayMicrosecond, + "DAY_MINUTE": dayMinute, + "DAY_SECOND": daySecond, + "DDL": ddl, + "DEALLOCATE": deallocate, + "DEC": decimalType, + "DECIMAL": decimalType, + "DEFAULT": defaultKwd, + "DEFINER": definer, + "DELAY_KEY_WRITE": delayKeyWrite, + "DELAYED": delayed, + "DELETE": deleteKwd, + "DESC": desc, + "DESCRIBE": describe, + "DISABLE": disable, + "DISTINCT": distinct, + "DISTINCTROW": distinct, + "DIV": div, + "DO": do, + "DOUBLE": doubleType, + "DROP": drop, + "DUAL": dual, + "DUPLICATE": duplicate, + "DYNAMIC": dynamic, + "ELSE": elseKwd, + "ENABLE": enable, + "ENCLOSED": enclosed, + "END": end, + "ENGINE": engine, + "ENGINES": engines, + "ENUM": enum, + "ESCAPE": escape, + "ESCAPED": escaped, + "EVENT": event, + "EVENTS": events, + "EXCLUSIVE": exclusive, + "EXECUTE": execute, + "EXISTS": exists, + "EXPLAIN": explain, + "EXTRACT": extract, + "FALSE": falseKwd, + "FIELDS": fields, + "FIRST": first, + "FIXED": fixed, + "FLOAT": floatType, + "FLUSH": flush, + "FOR": forKwd, + "FORCE": force, + "FOREIGN": foreign, + "FORMAT": format, + "FROM": from, + "FULL": full, + "FULLTEXT": fulltext, + "FUNCTION": function, + "GENERATED": generated, + "GET_FORMAT": getFormat, + "GLOBAL": global, + "GRANT": grant, + "GRANTS": grants, + "GROUP": group, + "GROUP_CONCAT": groupConcat, + "HASH": hash, + "HAVING": having, + "HIGH_PRIORITY": highPriority, + "HOUR": hour, + "HOUR_MICROSECOND": hourMicrosecond, + "HOUR_MINUTE": hourMinute, + "HOUR_SECOND": hourSecond, + "IDENTIFIED": identified, + "IF": ifKwd, + "IGNORE": ignore, + "IN": in, + "INDEX": index, + "INDEXES": indexes, + "INFILE": infile, + "INNER": inner, + "INPLACE": inplace, + "INSERT": insert, + "INT": intType, + "INT1": int1Type, + "INT2": int2Type, + "INT3": int3Type, + "INT4": int4Type, + "INT8": int8Type, + "INTEGER": integerType, + "INTERVAL": interval, + "INTO": into, + "INVOKER": invoker, + "IS": is, + "ISOLATION": isolation, + "JOBS": jobs, + "JOB": job, + "JOIN": join, + "JSON": jsonType, + "KEY": key, + "KEY_BLOCK_SIZE": keyBlockSize, + "KEYS": keys, + "KILL": kill, + "LEADING": leading, + "LEFT": left, + "LESS": less, + "LEVEL": level, + "LIKE": like, + "LIMIT": limit, + "LINES": lines, + "LOAD": load, + "LOCAL": local, + "LOCALTIME": localTime, + "LOCALTIMESTAMP": localTs, + "LOCK": lock, + "LONG": long, + "LONGBLOB": longblobType, + "LONGTEXT": longtextType, + "LOW_PRIORITY": lowPriority, + "MASTER": master, + "MAX": max, "MAX_CONNECTIONS_PER_HOUR": maxConnectionsPerHour, "MAX_EXECUTION_TIME": maxExecutionTime, "MAX_QUERIES_PER_HOUR": maxQueriesPerHour, diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 57ee87d9fcdb3..e02c87ef1235e 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -211,7 +211,6 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { default: } - b.ctx = ctx metrics.TiKVBackoffCounter.WithLabelValues(typ.String()).Inc() // Lazy initialize. if b.fn == nil { @@ -227,8 +226,6 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { b.types = append(b.types, typ) log.Debugf("%v, retry later(totalsleep %dms, maxsleep %dms)", err, b.totalSleep, b.maxSleep) - child.LogKV("error", err.Error()) - child.LogKV("retry", fmt.Sprintf("retry later(totalsleep %dms, maxsleep %dms)", b.totalSleep, b.maxSleep)) b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) if b.maxSleep > 0 && b.totalSleep >= b.maxSleep { From 84b9d78e279968fd737858bc7955d6deb978c21b Mon Sep 17 00:00:00 2001 From: zhexuany Date: Wed, 29 Aug 2018 15:51:56 +0800 Subject: [PATCH 08/23] iterate all data until chk's row count is zero --- executor/trace.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/executor/trace.go b/executor/trace.go index 6c2c8101e1a1d..f41bfd2cf71aa 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -50,7 +50,6 @@ func (b *executorBuilder) buildTrace(v *plan.Trace) Executor { } pp, _ := v.StmtPlan.(plan.PhysicalPlan) - fmt.Println(pp.ExplainInfo()) e.children = make([]Executor, 0, len(pp.Children())) for _, child := range pp.Children() { switch p := child.(type) { @@ -98,8 +97,10 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { // store span into context ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) if len(e.children) > 0 { - if err := e.children[0].Next(ctx, e.childrenResults[0]); err != nil { - return errors.Trace(err) + for e.childrenResults[0].NumRows() != 0 { + if err := e.children[0].Next(ctx, e.childrenResults[0]); err != nil { + return errors.Trace(err) + } } } From 3fe695126c7158b2a61c67b8f23b8d56da7fb50d Mon Sep 17 00:00:00 2001 From: zhexuany Date: Wed, 29 Aug 2018 16:02:04 +0800 Subject: [PATCH 09/23] chk has 0 row count, it will not enter for loop --- executor/trace.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/executor/trace.go b/executor/trace.go index f41bfd2cf71aa..5964a7ffbf431 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -97,10 +97,13 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { // store span into context ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) if len(e.children) > 0 { - for e.childrenResults[0].NumRows() != 0 { + for { if err := e.children[0].Next(ctx, e.childrenResults[0]); err != nil { return errors.Trace(err) } + if e.childrenResults[0].NumRows() != 0 { + break + } } } From 31961e643411ca5e3d3477b9d9f05c26439aaef0 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Wed, 29 Aug 2018 20:13:00 +0800 Subject: [PATCH 10/23] undo format by go1.11 --- executor/show.go | 24 +-- infoschema/tables.go | 56 +++---- parser/misc.go | 392 +++++++++++++++++++++---------------------- 3 files changed, 236 insertions(+), 236 deletions(-) diff --git a/executor/show.go b/executor/show.go index ac7de894ba0ab..6f074811dedd9 100644 --- a/executor/show.go +++ b/executor/show.go @@ -356,19 +356,19 @@ func (e *ShowExec) fetchShowIndex() error { subPart = col.Length } e.appendRow([]interface{}{ - tb.Meta().Name.O, // Table - nonUniq, // Non_unique - idx.Meta().Name.O, // Key_name - i + 1, // Seq_in_index - col.Name.O, // Column_name - "A", // Collation - 0, // Cardinality - subPart, // Sub_part - nil, // Packed - "YES", // Null + tb.Meta().Name.O, // Table + nonUniq, // Non_unique + idx.Meta().Name.O, // Key_name + i + 1, // Seq_in_index + col.Name.O, // Column_name + "A", // Collation + 0, // Cardinality + subPart, // Sub_part + nil, // Packed + "YES", // Null idx.Meta().Tp.String(), // Index_type - "", // Comment - idx.Meta().Comment, // Index_comment + "", // Comment + idx.Meta().Comment, // Index_comment }) } } diff --git a/infoschema/tables.go b/infoschema/tables.go index 20fa0958436c7..02346da318ec8 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -837,19 +837,19 @@ func dataForColumnsInTable(schema *model.DBInfo, tbl *model.TableInfo) [][]types columnDefault, // COLUMN_DEFAULT columnDesc.Null, // IS_NULLABLE types.TypeToStr(col.Tp, col.Charset), // DATA_TYPE - colLen, // CHARACTER_MAXIMUM_LENGTH - colLen, // CHARACTER_OCTET_LENGTH - decimal, // NUMERIC_PRECISION - 0, // NUMERIC_SCALE - 0, // DATETIME_PRECISION - col.Charset, // CHARACTER_SET_NAME - col.Collate, // COLLATION_NAME - columnType, // COLUMN_TYPE - columnDesc.Key, // COLUMN_KEY - columnDesc.Extra, // EXTRA - "select,insert,update,references", // PRIVILEGES - columnDesc.Comment, // COLUMN_COMMENT - col.GeneratedExprString, // GENERATION_EXPRESSION + colLen, // CHARACTER_MAXIMUM_LENGTH + colLen, // CHARACTER_OCTET_LENGTH + decimal, // NUMERIC_PRECISION + 0, // NUMERIC_SCALE + 0, // DATETIME_PRECISION + col.Charset, // CHARACTER_SET_NAME + col.Collate, // COLLATION_NAME + columnType, // COLUMN_TYPE + columnDesc.Key, // COLUMN_KEY + columnDesc.Extra, // EXTRA + "select,insert,update,references", // PRIVILEGES + columnDesc.Comment, // COLUMN_COMMENT + col.GeneratedExprString, // GENERATION_EXPRESSION ) // In mysql, 'character_set_name' and 'collation_name' are setted to null when column type is non-varchar or non-blob in information_schema. if col.Tp != mysql.TypeVarchar && col.Tp != mysql.TypeBlob { @@ -993,24 +993,24 @@ func dataForTableConstraints(schemas []*model.DBInfo) [][]types.Datum { func dataForPseudoProfiling() [][]types.Datum { var rows [][]types.Datum row := types.MakeDatums( - 0, // QUERY_ID - 0, // SEQ - "", // STATE + 0, // QUERY_ID + 0, // SEQ + "", // STATE types.NewDecFromInt(0), // DURATION types.NewDecFromInt(0), // CPU_USER types.NewDecFromInt(0), // CPU_SYSTEM - 0, // CONTEXT_VOLUNTARY - 0, // CONTEXT_INVOLUNTARY - 0, // BLOCK_OPS_IN - 0, // BLOCK_OPS_OUT - 0, // MESSAGES_SENT - 0, // MESSAGES_RECEIVED - 0, // PAGE_FAULTS_MAJOR - 0, // PAGE_FAULTS_MINOR - 0, // SWAPS - 0, // SOURCE_FUNCTION - 0, // SOURCE_FILE - 0, // SOURCE_LINE + 0, // CONTEXT_VOLUNTARY + 0, // CONTEXT_INVOLUNTARY + 0, // BLOCK_OPS_IN + 0, // BLOCK_OPS_OUT + 0, // MESSAGES_SENT + 0, // MESSAGES_RECEIVED + 0, // PAGE_FAULTS_MAJOR + 0, // PAGE_FAULTS_MINOR + 0, // SWAPS + 0, // SOURCE_FUNCTION + 0, // SOURCE_FILE + 0, // SOURCE_LINE ) rows = append(rows, row) return rows diff --git a/parser/misc.go b/parser/misc.go index e4eb3a2c441ee..e0183805bccc0 100644 --- a/parser/misc.go +++ b/parser/misc.go @@ -133,202 +133,202 @@ func init() { } var tokenMap = map[string]int{ - "ACTION": action, - "ADD": add, - "ADDDATE": addDate, - "ADMIN": admin, - "AFTER": after, - "ALL": all, - "ALGORITHM": algorithm, - "ALTER": alter, - "ALWAYS": always, - "ANALYZE": analyze, - "AND": and, - "ANY": any, - "AS": as, - "ASC": asc, - "ASCII": ascii, - "AUTO_INCREMENT": autoIncrement, - "AVG": avg, - "AVG_ROW_LENGTH": avgRowLength, - "BEGIN": begin, - "BETWEEN": between, - "BIGINT": bigIntType, - "BINARY": binaryType, - "BINLOG": binlog, - "BIT": bitType, - "BIT_AND": bitAnd, - "BIT_OR": bitOr, - "BIT_XOR": bitXor, - "BLOB": blobType, - "BOOL": boolType, - "BOOLEAN": booleanType, - "BOTH": both, - "BTREE": btree, - "BY": by, - "BYTE": byteType, - "CANCEL": cancel, - "CASCADE": cascade, - "CASCADED": cascaded, - "CASE": caseKwd, - "CAST": cast, - "CHANGE": change, - "CHAR": charType, - "CHARACTER": character, - "CHARSET": charsetKwd, - "CHECK": check, - "CHECKSUM": checksum, - "CLEANUP": cleanup, - "CLIENT": client, - "COALESCE": coalesce, - "COLLATE": collate, - "COLLATION": collation, - "COLUMN": column, - "COLUMNS": columns, - "COMMENT": comment, - "COMMIT": commit, - "COMMITTED": committed, - "COMPACT": compact, - "COMPRESSED": compressed, - "COMPRESSION": compression, - "CONNECTION": connection, - "CONSISTENT": consistent, - "CONSTRAINT": constraint, - "CONVERT": convert, - "COPY": copyKwd, - "COUNT": count, - "CREATE": create, - "CROSS": cross, - "CURRENT_DATE": currentDate, - "CURRENT_TIME": currentTime, - "CURRENT_TIMESTAMP": currentTs, - "CURRENT_USER": currentUser, - "CURTIME": curTime, - "DATA": data, - "DATABASE": database, - "DATABASES": databases, - "DATE": dateType, - "DATE_ADD": dateAdd, - "DATE_SUB": dateSub, - "DATETIME": datetimeType, - "DAY": day, - "DAY_HOUR": dayHour, - "DAY_MICROSECOND": dayMicrosecond, - "DAY_MINUTE": dayMinute, - "DAY_SECOND": daySecond, - "DDL": ddl, - "DEALLOCATE": deallocate, - "DEC": decimalType, - "DECIMAL": decimalType, - "DEFAULT": defaultKwd, - "DEFINER": definer, - "DELAY_KEY_WRITE": delayKeyWrite, - "DELAYED": delayed, - "DELETE": deleteKwd, - "DESC": desc, - "DESCRIBE": describe, - "DISABLE": disable, - "DISTINCT": distinct, - "DISTINCTROW": distinct, - "DIV": div, - "DO": do, - "DOUBLE": doubleType, - "DROP": drop, - "DUAL": dual, - "DUPLICATE": duplicate, - "DYNAMIC": dynamic, - "ELSE": elseKwd, - "ENABLE": enable, - "ENCLOSED": enclosed, - "END": end, - "ENGINE": engine, - "ENGINES": engines, - "ENUM": enum, - "ESCAPE": escape, - "ESCAPED": escaped, - "EVENT": event, - "EVENTS": events, - "EXCLUSIVE": exclusive, - "EXECUTE": execute, - "EXISTS": exists, - "EXPLAIN": explain, - "EXTRACT": extract, - "FALSE": falseKwd, - "FIELDS": fields, - "FIRST": first, - "FIXED": fixed, - "FLOAT": floatType, - "FLUSH": flush, - "FOR": forKwd, - "FORCE": force, - "FOREIGN": foreign, - "FORMAT": format, - "FROM": from, - "FULL": full, - "FULLTEXT": fulltext, - "FUNCTION": function, - "GENERATED": generated, - "GET_FORMAT": getFormat, - "GLOBAL": global, - "GRANT": grant, - "GRANTS": grants, - "GROUP": group, - "GROUP_CONCAT": groupConcat, - "HASH": hash, - "HAVING": having, - "HIGH_PRIORITY": highPriority, - "HOUR": hour, - "HOUR_MICROSECOND": hourMicrosecond, - "HOUR_MINUTE": hourMinute, - "HOUR_SECOND": hourSecond, - "IDENTIFIED": identified, - "IF": ifKwd, - "IGNORE": ignore, - "IN": in, - "INDEX": index, - "INDEXES": indexes, - "INFILE": infile, - "INNER": inner, - "INPLACE": inplace, - "INSERT": insert, - "INT": intType, - "INT1": int1Type, - "INT2": int2Type, - "INT3": int3Type, - "INT4": int4Type, - "INT8": int8Type, - "INTEGER": integerType, - "INTERVAL": interval, - "INTO": into, - "INVOKER": invoker, - "IS": is, - "ISOLATION": isolation, - "JOBS": jobs, - "JOB": job, - "JOIN": join, - "JSON": jsonType, - "KEY": key, - "KEY_BLOCK_SIZE": keyBlockSize, - "KEYS": keys, - "KILL": kill, - "LEADING": leading, - "LEFT": left, - "LESS": less, - "LEVEL": level, - "LIKE": like, - "LIMIT": limit, - "LINES": lines, - "LOAD": load, - "LOCAL": local, - "LOCALTIME": localTime, - "LOCALTIMESTAMP": localTs, - "LOCK": lock, - "LONG": long, - "LONGBLOB": longblobType, - "LONGTEXT": longtextType, - "LOW_PRIORITY": lowPriority, - "MASTER": master, - "MAX": max, + "ACTION": action, + "ADD": add, + "ADDDATE": addDate, + "ADMIN": admin, + "AFTER": after, + "ALL": all, + "ALGORITHM": algorithm, + "ALTER": alter, + "ALWAYS": always, + "ANALYZE": analyze, + "AND": and, + "ANY": any, + "AS": as, + "ASC": asc, + "ASCII": ascii, + "AUTO_INCREMENT": autoIncrement, + "AVG": avg, + "AVG_ROW_LENGTH": avgRowLength, + "BEGIN": begin, + "BETWEEN": between, + "BIGINT": bigIntType, + "BINARY": binaryType, + "BINLOG": binlog, + "BIT": bitType, + "BIT_AND": bitAnd, + "BIT_OR": bitOr, + "BIT_XOR": bitXor, + "BLOB": blobType, + "BOOL": boolType, + "BOOLEAN": booleanType, + "BOTH": both, + "BTREE": btree, + "BY": by, + "BYTE": byteType, + "CANCEL": cancel, + "CASCADE": cascade, + "CASCADED": cascaded, + "CASE": caseKwd, + "CAST": cast, + "CHANGE": change, + "CHAR": charType, + "CHARACTER": character, + "CHARSET": charsetKwd, + "CHECK": check, + "CHECKSUM": checksum, + "CLEANUP": cleanup, + "CLIENT": client, + "COALESCE": coalesce, + "COLLATE": collate, + "COLLATION": collation, + "COLUMN": column, + "COLUMNS": columns, + "COMMENT": comment, + "COMMIT": commit, + "COMMITTED": committed, + "COMPACT": compact, + "COMPRESSED": compressed, + "COMPRESSION": compression, + "CONNECTION": connection, + "CONSISTENT": consistent, + "CONSTRAINT": constraint, + "CONVERT": convert, + "COPY": copyKwd, + "COUNT": count, + "CREATE": create, + "CROSS": cross, + "CURRENT_DATE": currentDate, + "CURRENT_TIME": currentTime, + "CURRENT_TIMESTAMP": currentTs, + "CURRENT_USER": currentUser, + "CURTIME": curTime, + "DATA": data, + "DATABASE": database, + "DATABASES": databases, + "DATE": dateType, + "DATE_ADD": dateAdd, + "DATE_SUB": dateSub, + "DATETIME": datetimeType, + "DAY": day, + "DAY_HOUR": dayHour, + "DAY_MICROSECOND": dayMicrosecond, + "DAY_MINUTE": dayMinute, + "DAY_SECOND": daySecond, + "DDL": ddl, + "DEALLOCATE": deallocate, + "DEC": decimalType, + "DECIMAL": decimalType, + "DEFAULT": defaultKwd, + "DEFINER": definer, + "DELAY_KEY_WRITE": delayKeyWrite, + "DELAYED": delayed, + "DELETE": deleteKwd, + "DESC": desc, + "DESCRIBE": describe, + "DISABLE": disable, + "DISTINCT": distinct, + "DISTINCTROW": distinct, + "DIV": div, + "DO": do, + "DOUBLE": doubleType, + "DROP": drop, + "DUAL": dual, + "DUPLICATE": duplicate, + "DYNAMIC": dynamic, + "ELSE": elseKwd, + "ENABLE": enable, + "ENCLOSED": enclosed, + "END": end, + "ENGINE": engine, + "ENGINES": engines, + "ENUM": enum, + "ESCAPE": escape, + "ESCAPED": escaped, + "EVENT": event, + "EVENTS": events, + "EXCLUSIVE": exclusive, + "EXECUTE": execute, + "EXISTS": exists, + "EXPLAIN": explain, + "EXTRACT": extract, + "FALSE": falseKwd, + "FIELDS": fields, + "FIRST": first, + "FIXED": fixed, + "FLOAT": floatType, + "FLUSH": flush, + "FOR": forKwd, + "FORCE": force, + "FOREIGN": foreign, + "FORMAT": format, + "FROM": from, + "FULL": full, + "FULLTEXT": fulltext, + "FUNCTION": function, + "GENERATED": generated, + "GET_FORMAT": getFormat, + "GLOBAL": global, + "GRANT": grant, + "GRANTS": grants, + "GROUP": group, + "GROUP_CONCAT": groupConcat, + "HASH": hash, + "HAVING": having, + "HIGH_PRIORITY": highPriority, + "HOUR": hour, + "HOUR_MICROSECOND": hourMicrosecond, + "HOUR_MINUTE": hourMinute, + "HOUR_SECOND": hourSecond, + "IDENTIFIED": identified, + "IF": ifKwd, + "IGNORE": ignore, + "IN": in, + "INDEX": index, + "INDEXES": indexes, + "INFILE": infile, + "INNER": inner, + "INPLACE": inplace, + "INSERT": insert, + "INT": intType, + "INT1": int1Type, + "INT2": int2Type, + "INT3": int3Type, + "INT4": int4Type, + "INT8": int8Type, + "INTEGER": integerType, + "INTERVAL": interval, + "INTO": into, + "INVOKER": invoker, + "IS": is, + "ISOLATION": isolation, + "JOBS": jobs, + "JOB": job, + "JOIN": join, + "JSON": jsonType, + "KEY": key, + "KEY_BLOCK_SIZE": keyBlockSize, + "KEYS": keys, + "KILL": kill, + "LEADING": leading, + "LEFT": left, + "LESS": less, + "LEVEL": level, + "LIKE": like, + "LIMIT": limit, + "LINES": lines, + "LOAD": load, + "LOCAL": local, + "LOCALTIME": localTime, + "LOCALTIMESTAMP": localTs, + "LOCK": lock, + "LONG": long, + "LONGBLOB": longblobType, + "LONGTEXT": longtextType, + "LOW_PRIORITY": lowPriority, + "MASTER": master, + "MAX": max, "MAX_CONNECTIONS_PER_HOUR": maxConnectionsPerHour, "MAX_EXECUTION_TIME": maxExecutionTime, "MAX_QUERIES_PER_HOUR": maxQueriesPerHour, From 70f0d9b861be79a20ea00a02b3aa7e2f7ae12a9d Mon Sep 17 00:00:00 2001 From: zhexuany Date: Wed, 29 Aug 2018 23:30:54 +0800 Subject: [PATCH 11/23] address comments --- executor/trace.go | 2 +- plan/trace.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/trace.go b/executor/trace.go index 5964a7ffbf431..1199c5be544e3 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -56,7 +56,7 @@ func (b *executorBuilder) buildTrace(v *plan.Trace) Executor { case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg, *plan.PhysicalSort: e.children = append(e.children, b.build(p)) default: - panic(fmt.Sprintf("%v is not supported", child)) + b.err = errors.Errorf("%v is not supported", child) } } diff --git a/plan/trace.go b/plan/trace.go index adb9641d1fde9..d7db81eb23636 100644 --- a/plan/trace.go +++ b/plan/trace.go @@ -24,7 +24,7 @@ func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { optimizedP, err := Optimize(b.ctx, trace.Stmt, b.is) if err != nil { - return nil, errors.New("fail to optimize during build trace") + return nil, err } p := &Trace{StmtPlan: optimizedP} From cb366aa20f219260f34ac54854cd8c0db57eff23 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 30 Aug 2018 11:02:28 +0800 Subject: [PATCH 12/23] remove unecessary import --- executor/trace.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/trace.go b/executor/trace.go index 1199c5be544e3..f39f2db4f7660 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -14,7 +14,6 @@ package executor import ( - "fmt" "time" "github.com/juju/errors" From 46e681e1ff9e3e5d46f2dca5e2a41d6825830320 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 30 Aug 2018 14:07:02 +0800 Subject: [PATCH 13/23] add more tests --- executor/trace_test.go | 33 +++++++++++++++++++++++++++++++++ util/tracing/util_test.go | 16 ++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 executor/trace_test.go diff --git a/executor/trace_test.go b/executor/trace_test.go new file mode 100644 index 0000000000000..7a9d2b9d68965 --- /dev/null +++ b/executor/trace_test.go @@ -0,0 +1,33 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/testkit" +) + +type testTraceExec struct{} + +func (s *testTraceExec) SetupSuite(c *C) { + +} + +func (s *testSuite) TestTraceExec(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + testSQL := `create table trace (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int default 1);` + tk.MustExec(testSQL) + tk.MustExec("trace select * from trace where id = 0;") +} diff --git a/util/tracing/util_test.go b/util/tracing/util_test.go index ede39506407d4..229334d661ecb 100644 --- a/util/tracing/util_test.go +++ b/util/tracing/util_test.go @@ -77,6 +77,22 @@ func (s *testTraceSuite) TestChildSpanFromContext(c *C) { } +func (s *testTraceSuite) TestCreateSapnBeforeSetupGlobalTracer(c *C) { + var collectedSpans []basictracer.RawSpan + sp := opentracing.StartSpan("before") + sp.Finish() + + // first start a root span + sp1 := tracing.NewRecordedTrace("test", func(sp basictracer.RawSpan) { + collectedSpans = append(collectedSpans, sp) + }) + sp1.Finish() + + // sp is a span started before we setup global tracer; hence such span will be + // droped. + c.Assert(len(collectedSpans), Equals, 1) +} + func (s *testTraceSuite) TestTreeRelationship(c *C) { var collectedSpans []basictracer.RawSpan ctx := context.TODO() From e0d7cbacc87158f94a9d3f8e9b2f5d80defe2bb2 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 30 Aug 2018 14:08:39 +0800 Subject: [PATCH 14/23] add a todo --- executor/trace_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/trace_test.go b/executor/trace_test.go index 7a9d2b9d68965..b7da14e511463 100644 --- a/executor/trace_test.go +++ b/executor/trace_test.go @@ -29,5 +29,6 @@ func (s *testSuite) TestTraceExec(c *C) { tk.MustExec("use test") testSQL := `create table trace (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int default 1);` tk.MustExec(testSQL) + // TODO: check result later in another PR. tk.MustExec("trace select * from trace where id = 0;") } From 227626bbe3ac5534d650911e8a90bb707f2d9424 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 30 Aug 2018 17:17:09 +0800 Subject: [PATCH 15/23] address comments --- executor/trace.go | 1 - plan/planbuilder.go | 24 ++++++++++++++++++++++++ plan/trace.go | 24 ------------------------ 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/executor/trace.go b/executor/trace.go index f39f2db4f7660..99b3776bfbea9 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -57,7 +57,6 @@ func (b *executorBuilder) buildTrace(v *plan.Trace) Executor { default: b.err = errors.Errorf("%v is not supported", child) } - } return e diff --git a/plan/planbuilder.go b/plan/planbuilder.go index c1daa61fb5460..6b62ce1d5292b 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -1361,6 +1361,30 @@ func (b *planBuilder) buildDDL(node ast.DDLNode) Plan { return p } +// buildTrace builds a trace plan. Inside this method, it first optimize the +// underlying query and then constructs a schema, which will be used to constructs +// rows result. +func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { + if _, ok := trace.Stmt.(*ast.SelectStmt); !ok { + return nil, errors.New("trace only supports select query") + } + + stmtPlan, err := Optimize(b.ctx, trace.Stmt, b.is) + if err != nil { + return nil, err + } + p := &Trace{StmtPlan: stmtPlan} + + retFields := []string{"operation", "duration", "spanID"} + schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) + schema.Append(buildColumn("", "operation", mysql.TypeString, mysql.MaxBlobWidth)) + + schema.Append(buildColumn("", "startTS", mysql.TypeString, mysql.MaxBlobWidth)) + schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth)) + p.SetSchema(schema) + return p, nil +} + func (b *planBuilder) buildExplain(explain *ast.ExplainStmt) (Plan, error) { if show, ok := explain.Stmt.(*ast.ShowStmt); ok { return b.buildShow(show) diff --git a/plan/trace.go b/plan/trace.go index d7db81eb23636..50401714364b5 100644 --- a/plan/trace.go +++ b/plan/trace.go @@ -13,27 +13,3 @@ type Trace struct { StmtPlan Plan } - -// buildTrace builds a trace plan. Inside this method, it first optimize the -// underlying query and then constructs a schema, which will be used to constructs -// rows result. -func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { - if _, ok := trace.Stmt.(*ast.SelectStmt); !ok { - return nil, errors.New("trace only supports select query") - } - - optimizedP, err := Optimize(b.ctx, trace.Stmt, b.is) - if err != nil { - return nil, err - } - p := &Trace{StmtPlan: optimizedP} - - retFields := []string{"operation", "duration", "spanID"} - schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) - schema.Append(buildColumn("", "operation", mysql.TypeString, mysql.MaxBlobWidth)) - - schema.Append(buildColumn("", "startTS", mysql.TypeString, mysql.MaxBlobWidth)) - schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth)) - p.SetSchema(schema) - return p, nil -} From 03c3f7abe343bd70668a1e720f4ab126827e2977 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 30 Aug 2018 17:18:46 +0800 Subject: [PATCH 16/23] remvoe extra line --- executor/trace_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/trace_test.go b/executor/trace_test.go index b7da14e511463..9f8d96f5d8537 100644 --- a/executor/trace_test.go +++ b/executor/trace_test.go @@ -21,7 +21,6 @@ import ( type testTraceExec struct{} func (s *testTraceExec) SetupSuite(c *C) { - } func (s *testSuite) TestTraceExec(c *C) { From 507b567477b922ca88bcfd49b8ce25398ee3d524 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 30 Aug 2018 17:20:20 +0800 Subject: [PATCH 17/23] move buildTrace to builder.go --- executor/builder.go | 21 +++++++++++++++++++++ executor/trace.go | 21 --------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index d65abe6a61e87..76dc61a3f0469 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -621,6 +621,27 @@ func (b *executorBuilder) buildDDL(v *plan.DDL) Executor { return e } +// buildTrace builds a TraceExec for future executing. This method will be called +// at build(). +func (b *executorBuilder) buildTrace(v *plan.Trace) Executor { + e := &TraceExec{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + } + + pp, _ := v.StmtPlan.(plan.PhysicalPlan) + e.children = make([]Executor, 0, len(pp.Children())) + for _, child := range pp.Children() { + switch p := child.(type) { + case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg, *plan.PhysicalSort: + e.children = append(e.children, b.build(p)) + default: + b.err = errors.Errorf("%v is not supported", child) + } + } + + return e +} + // buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`. func (b *executorBuilder) buildExplain(v *plan.Explain) Executor { e := &ExplainExec{ diff --git a/executor/trace.go b/executor/trace.go index 99b3776bfbea9..6711fedbb20db 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -41,27 +41,6 @@ type TraceExec struct { childrenResults []*chunk.Chunk } -// buildTrace builds a TraceExec for future executing. This method will be called -// at build(). -func (b *executorBuilder) buildTrace(v *plan.Trace) Executor { - e := &TraceExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - } - - pp, _ := v.StmtPlan.(plan.PhysicalPlan) - e.children = make([]Executor, 0, len(pp.Children())) - for _, child := range pp.Children() { - switch p := child.(type) { - case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg, *plan.PhysicalSort: - e.children = append(e.children, b.build(p)) - default: - b.err = errors.Errorf("%v is not supported", child) - } - } - - return e -} - // Open opens a trace executor and it will create a root trace span which will be // used for the following span in a relationship of `ChildOf` or `FollowFrom`. // for more details, you could refer to http://opentracing.io From 0ccae95726edbf3a3dccd059979bfe4dc64c0283 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 30 Aug 2018 17:24:41 +0800 Subject: [PATCH 18/23] remove extra imports from trace.go --- plan/trace.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/plan/trace.go b/plan/trace.go index 50401714364b5..2b033d741cb7b 100644 --- a/plan/trace.go +++ b/plan/trace.go @@ -1,12 +1,5 @@ package plan -import ( - "github.com/juju/errors" - "github.com/pingcap/tidb/ast" - "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/mysql" -) - // Trace represents a trace plan. type Trace struct { baseSchemaProducer From e2cb6ff5a87b8da3450d0100e7805ab5b03263a3 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Thu, 30 Aug 2018 22:38:51 +0800 Subject: [PATCH 19/23] record optimize time in traceExec's Next --- executor/builder.go | 17 +++-------------- executor/trace.go | 25 +++++++++++++++++++++++-- plan/planbuilder.go | 6 +----- plan/trace.go | 6 +++++- util/tracing/util_test.go | 15 +++++++++++++++ 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 76dc61a3f0469..aed3694fdde3a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -624,22 +624,11 @@ func (b *executorBuilder) buildDDL(v *plan.DDL) Executor { // buildTrace builds a TraceExec for future executing. This method will be called // at build(). func (b *executorBuilder) buildTrace(v *plan.Trace) Executor { - e := &TraceExec{ + return &TraceExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + stmtNode: v.StmtNode, + builder: b, } - - pp, _ := v.StmtPlan.(plan.PhysicalPlan) - e.children = make([]Executor, 0, len(pp.Children())) - for _, child := range pp.Children() { - switch p := child.(type) { - case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg, *plan.PhysicalSort: - e.children = append(e.children, b.build(p)) - default: - b.err = errors.Errorf("%v is not supported", child) - } - } - - return e } // buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`. diff --git a/executor/trace.go b/executor/trace.go index 6711fedbb20db..4d90b28863b0e 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -19,6 +19,7 @@ import ( "github.com/juju/errors" "github.com/opentracing/basictracer-go" opentracing "github.com/opentracing/opentracing-go" + "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/plan" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/tracing" @@ -33,12 +34,14 @@ type TraceExec struct { CollectedSpans []basictracer.RawSpan // exhausted being true means there is no more result. exhausted bool - // plan is the real query plan and it is used for building real query's executor. - plan plan.Plan + // stmtNode is the real query ast tree and it is used for building real query's plan. + stmtNode ast.StmtNode // rootTrace represents root span which is father of all other span. rootTrace opentracing.Span childrenResults []*chunk.Chunk + + builder *executorBuilder } // Open opens a trace executor and it will create a root trace span which will be @@ -71,8 +74,26 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { return nil } + // record how much time was spent for optimizeing plan + optimizeSp := e.rootTrace.Tracer().StartSpan("plan_optimize", opentracing.FollowsFrom(e.rootTrace.Context())) + stmtPlan, err := plan.Optimize(e.builder.ctx, e.stmtNode, e.builder.is) + if err != nil { + return err + } + optimizeSp.Finish() + pp, _ := stmtPlan.(plan.PhysicalPlan) + for _, child := range pp.Children() { + switch p := child.(type) { + case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg, *plan.PhysicalSort: + e.children = append(e.children, e.builder.build(p)) + default: + return errors.Errorf("%v is not supported", child) + } + } + // store span into context ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) + if len(e.children) > 0 { for { if err := e.children[0].Next(ctx, e.childrenResults[0]); err != nil { diff --git a/plan/planbuilder.go b/plan/planbuilder.go index 6b62ce1d5292b..3b64ffcb3f164 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -1369,11 +1369,7 @@ func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { return nil, errors.New("trace only supports select query") } - stmtPlan, err := Optimize(b.ctx, trace.Stmt, b.is) - if err != nil { - return nil, err - } - p := &Trace{StmtPlan: stmtPlan} + p := &Trace{StmtNode: trace.Stmt} retFields := []string{"operation", "duration", "spanID"} schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) diff --git a/plan/trace.go b/plan/trace.go index 2b033d741cb7b..8b08336d7a509 100644 --- a/plan/trace.go +++ b/plan/trace.go @@ -1,8 +1,12 @@ package plan +import ( + "github.com/pingcap/tidb/ast" +) + // Trace represents a trace plan. type Trace struct { baseSchemaProducer - StmtPlan Plan + StmtNode ast.StmtNode } diff --git a/util/tracing/util_test.go b/util/tracing/util_test.go index 229334d661ecb..4cbac84f1aaa6 100644 --- a/util/tracing/util_test.go +++ b/util/tracing/util_test.go @@ -77,6 +77,21 @@ func (s *testTraceSuite) TestChildSpanFromContext(c *C) { } +func (s *testTraceSuite) TestFollowFrom(c *C) { + var collectedSpans []basictracer.RawSpan + // first start a root span + sp1 := tracing.NewRecordedTrace("test", func(sp basictracer.RawSpan) { + collectedSpans = append(collectedSpans, sp) + }) + sp2 := sp1.Tracer().StartSpan("follow_from", opentracing.FollowsFrom(sp1.Context())) + sp1.Finish() + sp2.Finish() + c.Assert(collectedSpans[1].Operation, Equals, "follow_from") + c.Assert(collectedSpans[1].ParentSpanID, Not(Equals), uint64(0)) + // only root span has 0 parent id + c.Assert(collectedSpans[0].ParentSpanID, Equals, uint64(0)) +} + func (s *testTraceSuite) TestCreateSapnBeforeSetupGlobalTracer(c *C) { var collectedSpans []basictracer.RawSpan sp := opentracing.StartSpan("before") From 9f5b5bf67bc19ff1af97e114835b2e2d80f08cc7 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Fri, 31 Aug 2018 01:03:42 +0800 Subject: [PATCH 20/23] open child in next rather in open --- executor/trace.go | 39 ++++++++++++++++----------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/executor/trace.go b/executor/trace.go index 4d90b28863b0e..9bc7fc33643f6 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -44,29 +44,6 @@ type TraceExec struct { builder *executorBuilder } -// Open opens a trace executor and it will create a root trace span which will be -// used for the following span in a relationship of `ChildOf` or `FollowFrom`. -// for more details, you could refer to http://opentracing.io -func (e *TraceExec) Open(ctx context.Context) error { - e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) { - e.CollectedSpans = append(e.CollectedSpans, sp) - }) - // we actually don't care when underlying executor started. We only care how - // much time was spent - for _, child := range e.children { - err := child.Open(ctx) - if err != nil { - return errors.Trace(err) - } - } - e.childrenResults = make([]*chunk.Chunk, 0, len(e.children)) - for _, child := range e.children { - e.childrenResults = append(e.childrenResults, child.newChunk()) - } - - return nil -} - // Next executes real query and collects span later. func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() @@ -91,6 +68,22 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { } } + e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) { + e.CollectedSpans = append(e.CollectedSpans, sp) + }) + // we actually don't care when underlying executor started. We only care how + // much time was spent + for _, child := range e.children { + err := child.Open(ctx) + if err != nil { + return errors.Trace(err) + } + } + e.childrenResults = make([]*chunk.Chunk, 0, len(e.children)) + for _, child := range e.children { + e.childrenResults = append(e.childrenResults, child.newChunk()) + } + // store span into context ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) From e49d58a38c5a4ccfee42d4263ccbbf2920e9fc24 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Fri, 31 Aug 2018 01:42:44 +0800 Subject: [PATCH 21/23] fix optimize logic mistake --- executor/trace.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/executor/trace.go b/executor/trace.go index 9bc7fc33643f6..d86f6b0675ba5 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -58,16 +58,15 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { return err } optimizeSp.Finish() - pp, _ := stmtPlan.(plan.PhysicalPlan) - for _, child := range pp.Children() { - switch p := child.(type) { - case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg, *plan.PhysicalSort: - e.children = append(e.children, e.builder.build(p)) - default: - return errors.Errorf("%v is not supported", child) - } + + pp, ok := stmtPlan.(plan.PhysicalPlan) + if !ok { + return errors.New("cannot cast logical plan to physical plan") } + // append select executor to trace executor + e.children = append(e.children, e.builder.build(pp)) + e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) { e.CollectedSpans = append(e.CollectedSpans, sp) }) From da0e06b6e36c65240f68a13acfecf1cf446ea542 Mon Sep 17 00:00:00 2001 From: zhexuany Date: Fri, 31 Aug 2018 02:16:34 +0800 Subject: [PATCH 22/23] address comment --- executor/trace.go | 33 +++++++++++---------------------- 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/executor/trace.go b/executor/trace.go index d86f6b0675ba5..7b1909aac3b1e 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -39,8 +39,6 @@ type TraceExec struct { // rootTrace represents root span which is father of all other span. rootTrace opentracing.Span - childrenResults []*chunk.Chunk - builder *executorBuilder } @@ -65,35 +63,26 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { } // append select executor to trace executor - e.children = append(e.children, e.builder.build(pp)) + stmtExec := e.builder.build(pp) e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) { e.CollectedSpans = append(e.CollectedSpans, sp) }) - // we actually don't care when underlying executor started. We only care how - // much time was spent - for _, child := range e.children { - err := child.Open(ctx) - if err != nil { - return errors.Trace(err) - } - } - e.childrenResults = make([]*chunk.Chunk, 0, len(e.children)) - for _, child := range e.children { - e.childrenResults = append(e.childrenResults, child.newChunk()) + err = stmtExec.Open(ctx) + if err != nil { + return errors.Trace(err) } + stmtExecChk := stmtExec.newChunk() // store span into context ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) - if len(e.children) > 0 { - for { - if err := e.children[0].Next(ctx, e.childrenResults[0]); err != nil { - return errors.Trace(err) - } - if e.childrenResults[0].NumRows() != 0 { - break - } + for { + if err := stmtExec.Next(ctx, stmtExecChk); err != nil { + return errors.Trace(err) + } + if stmtExecChk.NumRows() != 0 { + break } } From 2c6aa09a99690106b578b1d9b198d549e7bfa85f Mon Sep 17 00:00:00 2001 From: zhexuany Date: Fri, 31 Aug 2018 02:20:20 +0800 Subject: [PATCH 23/23] use == not != --- executor/trace.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/trace.go b/executor/trace.go index 7b1909aac3b1e..23d1cd570e778 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -81,7 +81,7 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { if err := stmtExec.Next(ctx, stmtExecChk); err != nil { return errors.Trace(err) } - if stmtExecChk.NumRows() != 0 { + if stmtExecChk.NumRows() == 0 { break } }