Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
xiongjiwei committed Dec 22, 2021
1 parent 38bd4d1 commit c57a8a2
Showing 1 changed file with 22 additions and 91 deletions.
113 changes: 22 additions & 91 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,43 +366,13 @@ func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
// getValidData returns prevData and curData that starts from starting symbol.
// If the data doesn't have starting symbol, prevData is nil and curData is curData[len(curData)-startingLen+1:].
// If curData size less than startingLen, curData is returned directly.
func (e *LoadDataInfo) getValidData(prevData, curData []byte) ([]byte, []byte) {
startingLen := len(e.LinesInfo.Starting)
if startingLen == 0 {
return prevData, curData
}

prevLen := len(prevData)
if prevLen > 0 {
// starting symbol in the prevData
idx := strings.Index(string(hack.String(prevData)), e.LinesInfo.Starting)
if idx != -1 {
return prevData[idx:], curData
}

// starting symbol in the middle of prevData and curData
restStart := curData
if len(curData) >= startingLen {
restStart = curData[:startingLen-1]
}
prevData = append(prevData, restStart...)
idx = strings.Index(string(hack.String(prevData)), e.LinesInfo.Starting)
if idx != -1 {
return prevData[idx:prevLen], curData
}
}

// starting symbol in the curData
func (e *LoadDataInfo) getValidData(curData []byte) ([]byte, bool) {
idx := strings.Index(string(hack.String(curData)), e.LinesInfo.Starting)
if idx != -1 {
return nil, curData[idx:]
if idx == -1 {
return curData[len(curData)-len(e.LinesInfo.Starting)+1:], false
}

// no starting symbol
if len(curData) >= startingLen {
curData = curData[len(curData)-startingLen+1:]
}
return nil, curData
return curData[idx:], true
}

func (e *LoadDataInfo) isInQuoter(bs []byte) bool {
Expand Down Expand Up @@ -464,7 +434,7 @@ func (e *LoadDataInfo) IndexOfTerminator(bs []byte, inQuoter bool) int {
atFieldStart := true
loop:
for i := 0; i < len(bs); i++ {
if atFieldStart && e.FieldsInfo.Enclosed != byte(0) &&bs[i] == e.FieldsInfo.Enclosed {
if atFieldStart && e.FieldsInfo.Enclosed != byte(0) && bs[i] == e.FieldsInfo.Enclosed {
inQuoter = !inQuoter
atFieldStart = false
continue
Expand Down Expand Up @@ -508,66 +478,31 @@ loop:
// getLine returns a line, curData, the next data start index and a bool value.
// If it has starting symbol the bool is true, otherwise is false.
func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, []byte, bool) {
startingLen := len(e.LinesInfo.Starting)
prevData, curData = e.getValidData(prevData, curData)
if prevData == nil && len(curData) < startingLen {
return nil, curData, false
}
inquotor := e.isInQuoter(prevData)
prevLen := len(prevData)
terminatedLen := len(e.LinesInfo.Terminated)
curStartIdx := 0
if prevLen < startingLen {
curStartIdx = startingLen - prevLen
}
endIdx := -1
if len(curData) >= curStartIdx {
if ignore {
endIdx = strings.Index(string(hack.String(curData[curStartIdx:])), e.LinesInfo.Terminated)
} else {
endIdx = e.IndexOfTerminator(curData[curStartIdx:], inquotor)
}
}
if endIdx == -1 {
// no terminated symbol
if len(prevData) == 0 {
return nil, curData, true
}

// terminated symbol in the middle of prevData and curData
if prevData != nil {
curData = append(prevData, curData...)
if ignore {
endIdx = strings.Index(string(hack.String(curData[startingLen:])), e.LinesInfo.Terminated)
} else {
endIdx = e.IndexOfTerminator(curData[startingLen:], false)
}
startLen := len(e.LinesInfo.Starting)
if startLen != 0 {
if len(curData) < startLen {
return nil, curData, false
}
if endIdx != -1 {
nextDataIdx := startingLen + endIdx + terminatedLen
return curData[startingLen : startingLen+endIdx], curData[nextDataIdx:], true
var ok bool
curData, ok = e.getValidData(curData)
if !ok {
return nil, curData, false
}
// no terminated symbol
return nil, curData, true
}

// terminated symbol in the curData
nextDataIdx := curStartIdx + endIdx + terminatedLen
if len(prevData) == 0 {
return curData[curStartIdx : curStartIdx+endIdx], curData[nextDataIdx:], true
}

// terminated symbol in the curData
prevData = append(prevData, curData[:nextDataIdx]...)
endIdx := -1
if ignore {
endIdx = strings.Index(string(hack.String(prevData[startingLen:])), e.LinesInfo.Terminated)
endIdx = strings.Index(string(hack.String(curData[startLen:])), e.LinesInfo.Terminated)
} else {
endIdx = e.IndexOfTerminator(prevData[startingLen:], false)
endIdx = e.IndexOfTerminator(curData[startLen:], false)
}
if endIdx >= prevLen {
return prevData[startingLen : startingLen+endIdx], curData[nextDataIdx:], true

if endIdx == -1 {
return nil, curData, true
}

// terminated symbol in the middle of prevData and curData
lineLen := startingLen + endIdx + terminatedLen
defer func() {
r := recover()
if r != nil {
Expand All @@ -578,18 +513,14 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, [
zap.String("Line Starting", e.LinesInfo.Starting),
zap.String("Line Terminated", e.LinesInfo.Terminated),
zap.Int("endIdx", endIdx),
zap.Int("prevLen", prevLen),
zap.Int("lineLen", lineLen),
zap.Int("nextDataIdx", nextDataIdx),
zap.Bool("inquotor", inquotor),
zap.Bool("ignore", ignore),
zap.Bool("prevData-contains-00bytes", bytes.Contains(prevData, []byte{e.FieldsInfo.Enclosed})),
zap.Bool("curData-contains-00bytes", bytes.Contains(curData, []byte{e.FieldsInfo.Enclosed})),
)
panic(r)
}
}()
return prevData[startingLen : startingLen+endIdx], curData[lineLen-prevLen:], true
return curData[startLen : startLen+endIdx], curData[startLen+endIdx+len(e.LinesInfo.Terminated):], true
}

// InsertData inserts data into specified table according to the specified format.
Expand Down

0 comments on commit c57a8a2

Please sign in to comment.