Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: LOAD DATA support load one file from s3 and other OSS #40489

Merged
merged 16 commits into from
Jan 18, 2023
Merged
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
IgnoreLines: v.IgnoreLines,
ColumnAssignments: v.ColumnAssignments,
ColumnsAndUserVars: v.ColumnsAndUserVars,
OnDuplicate: v.OnDuplicate,
Ctx: b.ctx,
}
columnNames := loadDataInfo.initFieldMappings()
Expand All @@ -946,7 +947,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
}
loadDataExec := &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
IsLocal: v.IsLocal,
FileLocRef: v.FileLocRef,
OnDuplicate: v.OnDuplicate,
loadDataInfo: loadDataInfo,
}
Expand Down
138 changes: 127 additions & 11 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"bytes"
"context"
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -28,7 +30,9 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -46,7 +50,7 @@ var (
type LoadDataExec struct {
baseExecutor

IsLocal bool
FileLocRef ast.FileLocRefTp
OnDuplicate ast.OnDuplicateKeyHandlingType
loadDataInfo *LoadDataInfo
}
Expand All @@ -55,26 +59,35 @@ type LoadDataExec struct {
func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
// TODO: support load data without local field.
if !e.IsLocal {
if e.FileLocRef == ast.FileLocServer {
return errors.New("Load Data: don't support load data without local field")
}
e.loadDataInfo.OnDuplicate = e.OnDuplicate
// TODO: support lines terminated is "".
if len(e.loadDataInfo.LinesInfo.Terminated) == 0 {
return errors.New("Load Data: don't support load data terminated is nil")
}

sctx := e.loadDataInfo.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return errors.New("Load Data: previous load data option isn't closed normal")
}
if e.loadDataInfo.Path == "" {
return errors.New("Load Data: infile path is empty")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
if e.loadDataInfo.Table.Meta().IsBaseTable() {
return errors.New("can only load data into base tables")
}

switch e.FileLocRef {
case ast.FileLocServer:
panic("should never happen")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can TiDB automatically recover from panic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. we have check FileLocServer some line above and here I just want to list evey values of FileLocRef

Copy link
Contributor

@sleepymole sleepymole Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to panic with a more meaningful message.

case ast.FileLocClient:
// let caller use handleQuerySpecial to read data in this connection
sctx := e.loadDataInfo.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return errors.New("Load Data: previous load data option isn't closed normal")
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
case ast.FileLocRemote:
// TODO implement it
}
return nil
}

Expand Down Expand Up @@ -132,6 +145,109 @@ type FieldMapping struct {
UserVar *ast.VariableExpr
}

func (e *LoadDataInfo) Load(ctx context.Context, readerFn func() ([]byte, error)) error {
e.InitQueues()
e.SetMaxRowsInBatch(uint64(e.Ctx.GetSessionVars().DMLBatchSize))
e.StartStopWatcher()
// let stop watcher goroutine quit
defer e.ForceQuit()
err := sessiontxn.NewTxn(ctx, e.Ctx)
if err != nil {
return err
}
// processStream process input data, enqueue commit task
wg := new(sync.WaitGroup)
wg.Add(1)
go processStream(ctx, readerFn, e, wg)
err = e.CommitWork(ctx)
wg.Wait()
return err
}

// processStream process input stream from network
func processStream(ctx context.Context, readerFn func() ([]byte, error), loadDataInfo *LoadDataInfo, wg *sync.WaitGroup) {
var err error
var shouldBreak bool
var prevData, curData []byte
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("process routine panicked",
zap.Reflect("r", r),
zap.Stack("stack"))
}
if err != nil || r != nil {
loadDataInfo.ForceQuit()
} else {
loadDataInfo.CloseTaskQueue()
}
wg.Done()
}()
for {
curData, err = readerFn()
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
logutil.Logger(ctx).Error("read packet failed", zap.Error(err))
break
}
}
if len(curData) == 0 {
loadDataInfo.Drained = true
shouldBreak = true
if len(prevData) == 0 {
break
}
}
select {
case <-loadDataInfo.QuitCh:
err = errors.New("processStream forced to quit")
default:
}
if err != nil {
break
}
// prepare batch and enqueue task
prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo)
if err != nil {
break
}
if shouldBreak {
break
}
}
if err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
return
}
if err = loadDataInfo.EnqOneTask(ctx); err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
return
}
}

func insertDataWithCommit(ctx context.Context, prevData,
curData []byte, loadDataInfo *LoadDataInfo) ([]byte, error) {
var err error
var reachLimit bool
for {
prevData, reachLimit, err = loadDataInfo.InsertData(ctx, prevData, curData)
if err != nil {
return nil, err
}
if !reachLimit {
break
}
// push into commit task queue
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
return prevData, err
}
curData = prevData
prevData = nil
}
return prevData, nil
}

// reorderColumns reorder the e.insertColumns according to the order of columnNames
// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name.
func (e *LoadDataInfo) reorderColumns(columnNames []string) error {
Expand Down
22 changes: 20 additions & 2 deletions parser/ast/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -1801,12 +1801,26 @@ func (n *ColumnNameOrUserVar) Accept(v Visitor) (node Node, ok bool) {
return v.Leave(n)
}

type FileLocRefTp int

const (
// FileLocServer is used when there's no keywords in SQL, which means the data file should be located on the tidb-server.
FileLocServer FileLocRefTp = iota
// FileLocClient is used when there's LOCAL keyword in SQL, which means the data file should be located on the MySQL
// client.
FileLocClient
// FileLocRemote is used when there's REMOTE keyword in SQL, which means the data file should be located on a remote
// server, such as a cloud storage.
FileLocRemote
)

// LoadDataStmt is a statement to load data from a specified file, then insert this rows into an existing table.
// See https://dev.mysql.com/doc/refman/5.7/en/load-data.html
// in TiDB we extend the syntax to use LOAD DATA as a more general way to import data.
type LoadDataStmt struct {
dmlNode

IsLocal bool
FileLocRef FileLocRefTp
Path string
OnDuplicate OnDuplicateKeyHandlingType
Table *TableName
Expand All @@ -1822,8 +1836,12 @@ type LoadDataStmt struct {
// Restore implements Node interface.
func (n *LoadDataStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("LOAD DATA ")
if n.IsLocal {
switch n.FileLocRef {
case FileLocServer:
case FileLocClient:
ctx.WriteKeyWord("LOCAL ")
case FileLocRemote:
ctx.WriteKeyWord("REMOTE ")
}
ctx.WriteKeyWord("INFILE ")
ctx.WriteString(n.Path)
Expand Down
1 change: 1 addition & 0 deletions parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ var tokenMap = map[string]int{
"RELEASE": release,
"RELOAD": reload,
"REMOVE": remove,
"REMOTE": remote,
"RENAME": rename,
"REORGANIZE": reorganize,
"REPAIR": repair,
Expand Down
Loading