Skip to content

Commit

Permalink
executor: remove unused structure RecordBatch (pingcap#10891) (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and coocood committed Jun 27, 2019
1 parent 8efbe62 commit 46c38e1
Show file tree
Hide file tree
Showing 74 changed files with 250 additions and 277 deletions.
10 changes: 5 additions & 5 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *testSuite) TestGlobalBinding(c *C) {

rs, err := tk.Exec("show global bindings")
c.Assert(err, IsNil)
chk := rs.NewRecordBatch()
chk := rs.NewChunk()
err = rs.Next(context.TODO(), chk)
c.Check(err, IsNil)
c.Check(chk.NumRows(), Equals, 1)
Expand Down Expand Up @@ -212,7 +212,7 @@ func (s *testSuite) TestGlobalBinding(c *C) {

rs, err = tk.Exec("show global bindings")
c.Assert(err, IsNil)
chk = rs.NewRecordBatch()
chk = rs.NewChunk()
err = rs.Next(context.TODO(), chk)
c.Check(err, IsNil)
c.Check(chk.NumRows(), Equals, 0)
Expand Down Expand Up @@ -255,14 +255,14 @@ func (s *testSuite) TestSessionBinding(c *C) {

rs, err := tk.Exec("show global bindings")
c.Assert(err, IsNil)
chk := rs.NewRecordBatch()
chk := rs.NewChunk()
err = rs.Next(context.TODO(), chk)
c.Check(err, IsNil)
c.Check(chk.NumRows(), Equals, 0)

rs, err = tk.Exec("show session bindings")
c.Assert(err, IsNil)
chk = rs.NewRecordBatch()
chk = rs.NewChunk()
err = rs.Next(context.TODO(), chk)
c.Check(err, IsNil)
c.Check(chk.NumRows(), Equals, 1)
Expand Down Expand Up @@ -423,7 +423,7 @@ func (s *testSuite) TestErrorBind(c *C) {

rs, err := tk.Exec("show global bindings")
c.Assert(err, IsNil)
chk := rs.NewRecordBatch()
chk := rs.NewChunk()
err = rs.Next(context.TODO(), chk)
c.Check(err, IsNil)
c.Check(chk.NumRows(), Equals, 0)
Expand Down
2 changes: 1 addition & 1 deletion cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ut *benchDB) mustExec(sql string) {
if len(rss) > 0 {
ctx := context.Background()
rs := rss[0]
req := rs.NewRecordBatch()
req := rs.NewChunk()
for {
err := rs.Next(ctx, req)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
// Make sure that the table's data has not been deleted.
rs, err := s.se.Execute(context.Background(), "select count(*) from t")
c.Assert(err, IsNil)
req := rs[0].NewRecordBatch()
req := rs[0].NewChunk()
err = rs[0].Next(context.Background(), req)
c.Assert(err, IsNil)
c.Assert(req.NumRows() == 0, IsFalse)
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
// Make sure that the table's data has not been deleted.
rs, err = s.se.Execute(context.Background(), "select count(*) from tx")
c.Assert(err, IsNil)
req = rs[0].NewRecordBatch()
req = rs[0].NewChunk()
err = rs[0].Next(context.Background(), req)
c.Assert(err, IsNil)
c.Assert(req.NumRows() == 0, IsFalse)
Expand Down
4 changes: 2 additions & 2 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint u
}

rs := rss[0]
req := rs.NewRecordBatch()
it := chunk.NewIterator4Chunk(req.Chunk)
req := rs.NewChunk()
it := chunk.NewIterator4Chunk(req)
for {
err = rs.Next(context.TODO(), req)
if err != nil {
Expand Down
23 changes: 11 additions & 12 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
// The reason we need update is that chunk with 0 rows indicating we already finished current query, we need prepare for
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("recordSet.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -124,9 +124,9 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
return nil
}

// NewRecordBatch create a recordBatch base on top-level executor's newFirstChunk().
func (a *recordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(newFirstChunk(a.executor))
// NewChunk create a chunk base on top-level executor's newFirstChunk().
func (a *recordSet) NewChunk() *chunk.Chunk {
return newFirstChunk(a.executor)
}

func (a *recordSet) Close() error {
Expand Down Expand Up @@ -296,8 +296,7 @@ func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
return c.fields
}

func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
chk := req.Chunk
func (c *chunkRowRecordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
for !chk.IsFull() && c.idx < len(c.rows) {
chk.AppendRow(c.rows[c.idx])
Expand All @@ -306,8 +305,8 @@ func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) er
return nil
}

func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(newFirstChunk(c.e))
func (c *chunkRowRecordSet) NewChunk() *chunk.Chunk {
return newFirstChunk(c.e)
}

func (c *chunkRowRecordSet) Close() error {
Expand Down Expand Up @@ -339,7 +338,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor
var rows []chunk.Row
var err error
fields := rs.Fields()
req := rs.NewRecordBatch()
req := rs.NewChunk()
for {
err = rs.Next(ctx, req)
if err != nil {
Expand All @@ -349,11 +348,11 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor
if req.NumRows() == 0 {
return &chunkRowRecordSet{rows: rows, fields: fields, e: e}, nil
}
iter := chunk.NewIterator4Chunk(req.Chunk)
iter := chunk.NewIterator4Chunk(req)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
req.Chunk = chunk.Renew(req.Chunk, a.Ctx.GetSessionVars().MaxChunkSize)
req = chunk.Renew(req, a.Ctx.GetSessionVars().MaxChunkSize)
}
return nil, err
}
Expand Down Expand Up @@ -385,7 +384,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
a.logAudit()
}()

err = Next(ctx, e, chunk.NewRecordBatch(newFirstChunk(e)))
err = Next(ctx, e, newFirstChunk(e))
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type CheckIndexRangeExec struct {
}

// Next implements the Executor Next interface.
func (e *CheckIndexRangeExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *CheckIndexRangeExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
handleIdx := e.schema.Len() - 1
for {
Expand Down Expand Up @@ -446,7 +446,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

// Next implements the Executor Next interface.
func (e *RecoverIndexExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *RecoverIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
Expand Down Expand Up @@ -582,7 +582,7 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e
}

// Next implements the Executor Next interface.
func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
Expand Down
16 changes: 8 additions & 8 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro
}

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *HashAggExec) Next(ctx context.Context, req *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("hashagg.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -525,9 +525,9 @@ func (e *HashAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
}
req.Reset()
if e.isUnparallelExec {
return e.unparallelExec(ctx, req.Chunk)
return e.unparallelExec(ctx, req)
}
return e.parallelExec(ctx, req.Chunk)
return e.parallelExec(ctx, req)
}

func (e *HashAggExec) fetchChildData(ctx context.Context) {
Expand Down Expand Up @@ -555,7 +555,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) {
}
chk = input.chk
}
err = Next(ctx, e.children[0], chunk.NewRecordBatch(chk))
err = Next(ctx, e.children[0], chk)
if err != nil {
e.finalOutputCh <- &AfFinalResult{err: err}
return
Expand Down Expand Up @@ -681,7 +681,7 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := Next(ctx, e.children[0], chunk.NewRecordBatch(e.childResult))
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return err
}
Expand Down Expand Up @@ -794,7 +794,7 @@ func (e *StreamAggExec) Close() error {
}

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *StreamAggExec) Next(ctx context.Context, req *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("streamAgg.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -805,7 +805,7 @@ func (e *StreamAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error
}
req.Reset()
for !e.executed && !req.IsFull() {
err := e.consumeOneGroup(ctx, req.Chunk)
err := e.consumeOneGroup(ctx, req)
if err != nil {
e.executed = true
return err
Expand Down Expand Up @@ -870,7 +870,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return err
}

err = Next(ctx, e.children[0], chunk.NewRecordBatch(e.childResult))
err = Next(ctx, e.children[0], e.childResult)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ const (
)

// Next implements the Executor Next interface.
func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
concurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return err
Expand Down
7 changes: 3 additions & 4 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ func (mds *mockDataSource) prepareChunks() {
mds.chunkPtr = 0
}

func (mds *mockDataSource) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (mds *mockDataSource) Next(ctx context.Context, req *chunk.Chunk) error {
if mds.chunkPtr >= len(mds.chunks) {
req.Reset()
return nil
}
dataChk := mds.chunks[mds.chunkPtr]
dataChk.SwapColumns(req.Chunk)
dataChk.SwapColumns(req)
mds.chunkPtr++
return nil
}
Expand Down Expand Up @@ -269,9 +269,8 @@ func benchmarkAggExecWithCase(b *testing.B, casTest *aggTestCase) {
if err := aggExec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
batch := chunk.NewRecordBatch(chk)
for {
if err := aggExec.Next(tmpCtx, batch); err != nil {
if err := aggExec.Next(tmpCtx, chk); err != nil {
b.Fatal(b)
}
if chk.NumRows() == 0 {
Expand Down
2 changes: 1 addition & 1 deletion executor/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SQLBindExec struct {
}

// Next implements the Executor Next interface.
func (e *SQLBindExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *SQLBindExec) Next(ctx context.Context, req *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("SQLBindExec.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand Down
2 changes: 1 addition & 1 deletion executor/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ChangeExec struct {
}

// Next implements the Executor Next interface.
func (e *ChangeExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error {
kind := strings.ToLower(e.NodeType)
urls := config.GetGlobalConfig().Path
registry, err := createRegistry(urls)
Expand Down
2 changes: 1 addition & 1 deletion executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *ChecksumTableExec) Open(ctx context.Context) error {
}

// Next implements the Executor Next interface.
func (e *ChecksumTableExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *ChecksumTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (e *DDLExec) toErr(err error) error {
}

// Next implements the Executor Next interface.
func (e *DDLExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error) {
func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
if e.done {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (s *testSuite3) TestCreateTable(c *C) {
rs, err := tk.Exec(`desc issue312_1`)
c.Assert(err, IsNil)
ctx := context.Background()
req := rs.NewRecordBatch()
it := chunk.NewIterator4Chunk(req.Chunk)
req := rs.NewChunk()
it := chunk.NewIterator4Chunk(req)
for {
err1 := rs.Next(ctx, req)
c.Assert(err1, IsNil)
Expand All @@ -107,8 +107,8 @@ func (s *testSuite3) TestCreateTable(c *C) {
}
rs, err = tk.Exec(`desc issue312_2`)
c.Assert(err, IsNil)
req = rs.NewRecordBatch()
it = chunk.NewIterator4Chunk(req.Chunk)
req = rs.NewChunk()
it = chunk.NewIterator4Chunk(req)
for {
err1 := rs.Next(ctx, req)
c.Assert(err1, IsNil)
Expand Down Expand Up @@ -260,7 +260,7 @@ func (s *testSuite3) TestAlterTableAddColumn(c *C) {
now := time.Now().Add(-time.Duration(1 * time.Millisecond)).Format(types.TimeFormat)
r, err := tk.Exec("select c2 from alter_test")
c.Assert(err, IsNil)
req := r.NewRecordBatch()
req := r.NewChunk()
err = r.Next(context.Background(), req)
c.Assert(err, IsNil)
row := req.GetRow(0)
Expand Down
6 changes: 3 additions & 3 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type DeleteExec struct {
}

// Next implements the Executor Next interface.
func (e *DeleteExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
func (e *DeleteExec) Next(ctx context.Context, req *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("delete.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand Down Expand Up @@ -105,7 +105,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
for {
iter := chunk.NewIterator4Chunk(chk)

err := Next(ctx, e.children[0], chunk.NewRecordBatch(chk))
err := Next(ctx, e.children[0], chk)
if err != nil {
return err
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
chk := newFirstChunk(e.children[0])
for {
iter := chunk.NewIterator4Chunk(chk)
err := Next(ctx, e.children[0], chunk.NewRecordBatch(chk))
err := Next(ctx, e.children[0], chk)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 46c38e1

Please sign in to comment.