Skip to content

Commit

Permalink
executor: refactor union scan and dirty table (#11702)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and sre-bot committed Sep 12, 2019
1 parent c2901fe commit fb683be
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 164 deletions.
116 changes: 103 additions & 13 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/set"
)

type memIndexReader struct {
Expand All @@ -36,12 +37,9 @@ type memIndexReader struct {
desc bool
conditions []expression.Expression
addedRows [][]types.Datum
addedRowsLen int
retFieldTypes []*types.FieldType
outputOffset []int
// cache for decode handle.
handleBytes []byte
// memIdxHandles is uses to store the handle ids that has been read by memIndexReader.
memIdxHandles set.Int64Set
// belowHandleIndex is the handle's position of the below scan plan.
belowHandleIndex int
}
Expand All @@ -62,8 +60,6 @@ func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *mem
addedRows: make([][]types.Datum, 0, len(us.dirty.addedRows)),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
handleBytes: make([]byte, 0, 16),
memIdxHandles: set.NewInt64Set(),
belowHandleIndex: us.belowHandleIndex,
}
}
Expand Down Expand Up @@ -92,8 +88,6 @@ func (m *memIndexReader) getMemRows() ([][]types.Datum, error) {
if err != nil {
return err
}
handle := data[m.belowHandleIndex].GetInt64()
m.memIdxHandles.Insert(handle)

mutableRow.SetDatums(data...)
matched, _, err := expression.EvalBool(m.ctx, m.conditions, mutableRow.ToRow())
Expand Down Expand Up @@ -150,17 +144,16 @@ type memTableReader struct {
}

func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *memTableReader {
kvRanges := tblReader.kvRanges
colIDs := make(map[int64]int)
for i, col := range tblReader.columns {
for i, col := range us.columns {
colIDs[col.ID] = i
}

return &memTableReader{
ctx: us.ctx,
table: tblReader.table.Meta(),
table: us.table.Meta(),
columns: us.columns,
kvRanges: kvRanges,
kvRanges: tblReader.kvRanges,
desc: us.desc,
conditions: us.conditions,
addedRows: make([][]types.Datum, 0, len(us.dirty.addedRows)),
Expand Down Expand Up @@ -305,3 +298,100 @@ func reverseDatumSlice(rows [][]types.Datum) {
rows[i], rows[j] = rows[j], rows[i]
}
}

func (m *memIndexReader) getMemRowsHandle() ([]int64, error) {
pkTp := types.NewFieldType(mysql.TypeLonglong)
if m.table.PKIsHandle {
for _, col := range m.table.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
pkTp = &col.FieldType
break
}
}
}
handles := make([]int64, 0, m.addedRowsLen)
err := iterTxnMemBuffer(m.ctx, m.kvRanges, func(key, value []byte) error {
handle, err := tablecodec.DecodeIndexHandle(key, value, len(m.index.Columns), pkTp)
if err != nil {
return err
}
handles = append(handles, handle)
return nil
})
if err != nil {
return nil, err
}

if m.desc {
for i, j := 0, len(handles)-1; i < j; i, j = i+1, j-1 {
handles[i], handles[j] = handles[j], handles[i]
}
}
return handles, nil
}

type memIndexLookUpReader struct {
ctx sessionctx.Context
index *model.IndexInfo
columns []*model.ColumnInfo
table table.Table
desc bool
conditions []expression.Expression
retFieldTypes []*types.FieldType

idxReader *memIndexReader
}

func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpExecutor) *memIndexLookUpReader {
kvRanges := idxLookUpReader.kvRanges
outputOffset := []int{len(idxLookUpReader.index.Columns)}
memIdxReader := &memIndexReader{
ctx: us.ctx,
index: idxLookUpReader.index,
table: idxLookUpReader.table.Meta(),
kvRanges: kvRanges,
desc: idxLookUpReader.desc,
addedRowsLen: len(us.dirty.addedRows),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleIndex: us.belowHandleIndex,
}

return &memIndexLookUpReader{
ctx: us.ctx,
index: idxLookUpReader.index,
columns: idxLookUpReader.columns,
table: idxLookUpReader.table,
desc: idxLookUpReader.desc,
conditions: us.conditions,
retFieldTypes: retTypes(us),
idxReader: memIdxReader,
}
}

func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) {
handles, err := m.idxReader.getMemRowsHandle()
if err != nil || len(handles) == 0 {
return nil, err
}

tblKVRanges := distsql.TableHandlesToKVRanges(getPhysicalTableID(m.table), handles)
colIDs := make(map[int64]int, len(m.columns))
for i, col := range m.columns {
colIDs[col.ID] = i
}

memTblReader := &memTableReader{
ctx: m.ctx,
table: m.table.Meta(),
columns: m.columns,
kvRanges: tblKVRanges,
conditions: m.conditions,
addedRows: make([][]types.Datum, 0, len(handles)),
retFieldTypes: m.retFieldTypes,
colIDs: colIDs,
handleBytes: make([]byte, 0, 16),
}

return memTblReader.getMemRows()
}
120 changes: 4 additions & 116 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package executor

import (
"context"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/set"
)

// DirtyDB stores uncommitted write operations for a transaction.
Expand Down Expand Up @@ -55,11 +53,10 @@ type DirtyTable struct {
// the key is handle.
addedRows map[int64]struct{}
deletedRows map[int64]struct{}
truncated bool
}

// AddRow adds a row to the DirtyDB.
func (dt *DirtyTable) AddRow(handle int64, row []types.Datum) {
func (dt *DirtyTable) AddRow(handle int64) {
dt.addedRows[handle] = struct{}{}
}

Expand All @@ -69,12 +66,6 @@ func (dt *DirtyTable) DeleteRow(handle int64) {
dt.deletedRows[handle] = struct{}{}
}

// TruncateTable truncates a table.
func (dt *DirtyTable) TruncateTable() {
dt.addedRows = make(map[int64]struct{})
dt.truncated = true
}

// GetDirtyDB returns the DirtyDB bind to the context.
func GetDirtyDB(ctx sessionctx.Context) *DirtyDB {
var udb *DirtyDB
Expand Down Expand Up @@ -102,9 +93,7 @@ type UnionScanExec struct {
// belowHandleIndex is the handle's position of the below scan plan.
belowHandleIndex int

addedRows [][]types.Datum
// memIdxHandles is uses to store the handle ids that has been read by memIndexReader.
memIdxHandles set.Int64Set
addedRows [][]types.Datum
cursor4AddRows int
sortErr error
snapshotRows [][]types.Datum
Expand All @@ -130,10 +119,9 @@ func (us *UnionScanExec) open(ctx context.Context) error {
case *IndexReaderExecutor:
mIdxReader := buildMemIndexReader(us, x)
us.addedRows, err = mIdxReader.getMemRows()
us.memIdxHandles = mIdxReader.memIdxHandles
case *IndexLookUpExecutor:
us.memIdxHandles = set.NewInt64Set()
err = us.buildAndSortAddedRows(ctx, x.table)
idxLookup := buildMemIndexLookUpReader(us, x)
us.addedRows, err = idxLookup.getMemRows()
}
if err != nil {
return err
Expand Down Expand Up @@ -201,9 +189,6 @@ func (us *UnionScanExec) getOneRow(ctx context.Context) ([]types.Datum, error) {
}

func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, error) {
if us.dirty.truncated {
return nil, nil
}
if us.cursor4SnapshotRows < len(us.snapshotRows) {
return us.snapshotRows[us.cursor4SnapshotRows], nil
}
Expand All @@ -219,19 +204,11 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
snapshotHandle := row.GetInt64(us.belowHandleIndex)
if _, ok := us.dirty.deletedRows[snapshotHandle]; ok {
err = us.getMissIndexRowsByHandle(ctx, snapshotHandle)
if err != nil {
return nil, err
}
continue
}
if _, ok := us.dirty.addedRows[snapshotHandle]; ok {
// If src handle appears in added rows, it means there is conflict and the transaction will fail to
// commit, but for simplicity, we don't handle it here.
err = us.getMissIndexRowsByHandle(ctx, snapshotHandle)
if err != nil {
return nil, err
}
continue
}
us.snapshotRows = append(us.snapshotRows, row.GetDatumRow(retTypes(us.children[0])))
Expand All @@ -240,30 +217,6 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err
return us.snapshotRows[0], nil
}

// For index reader and index look up reader, update doesn't write index to txn memBuffer when the idx column
// is unchanged. So the `memIndexReader` and `memIndexLookUpReader` can't read the index from txn memBuffer.
// This function is used to get the missing row by the handle if the handle is in dirtyTable.addedRows.
func (us *UnionScanExec) getMissIndexRowsByHandle(ctx context.Context, handle int64) error {
reader := us.children[0]
switch reader.(type) {
case *TableReaderExecutor:
return nil
}
if _, ok := us.dirty.addedRows[handle]; !ok {
return nil
}
// Don't miss in memBuffer reader.
if us.memIdxHandles.Exist(handle) {
return nil
}
memRow, err := us.getMemRow(ctx, handle)
if memRow == nil || err != nil {
return err
}
us.snapshotRows = append(us.snapshotRows, memRow)
return nil
}

func (us *UnionScanExec) getAddedRow() []types.Datum {
var addedRow []types.Datum
if us.cursor4AddRows < len(us.addedRows) {
Expand Down Expand Up @@ -319,71 +272,6 @@ func (us *UnionScanExec) compare(a, b []types.Datum) (int, error) {
return cmp, nil
}

// rowWithColsInTxn gets the row from the transaction buffer.
func (us *UnionScanExec) rowWithColsInTxn(ctx context.Context, t table.Table, h int64) ([]types.Datum, error) {
key := t.RecordKey(h)
txn, err := us.ctx.Txn(true)
if err != nil {
return nil, err
}
value, err := txn.GetMemBuffer().Get(ctx, key)
if err != nil {
return nil, err
}
colIDs := make(map[int64]int)
for i, col := range us.columns {
colIDs[col.ID] = i
}
return decodeRowData(us.ctx, us.table.Meta(), us.columns, colIDs, h, []byte{}, value)
}

func (us *UnionScanExec) getMemRow(ctx context.Context, h int64) ([]types.Datum, error) {
data, err := us.rowWithColsInTxn(ctx, us.table, h)
if err != nil {
return nil, err
}
us.mutableRow.SetDatums(data...)
matched, _, err := expression.EvalBool(us.ctx, us.conditions, us.mutableRow.ToRow())
if err != nil {
return nil, err
}
if !matched {
return nil, nil
}
return data, nil
}

// TODO: remove `buildAndSortAddedRows` functions and `DirtyTable`.
func (us *UnionScanExec) buildAndSortAddedRows(ctx context.Context, t table.Table) error {
us.addedRows = make([][]types.Datum, 0, len(us.dirty.addedRows))
mutableRow := chunk.MutRowFromTypes(retTypes(us))
for h := range us.dirty.addedRows {
us.memIdxHandles.Insert(h)
newData, err := us.rowWithColsInTxn(ctx, t, h)
if err != nil {
return err
}
mutableRow.SetDatums(newData...)
matched, _, err := expression.EvalBool(us.ctx, us.conditions, mutableRow.ToRow())
if err != nil {
return err
}
if !matched {
continue
}
us.addedRows = append(us.addedRows, newData)
}
if us.desc {
sort.Sort(sort.Reverse(us))
} else {
sort.Sort(us)
}
if us.sortErr != nil {
return errors.Trace(us.sortErr)
}
return nil
}

// Len implements sort.Interface interface.
func (us *UnionScanExec) Len() int {
return len(us.addedRows)
Expand Down
Loading

0 comments on commit fb683be

Please sign in to comment.