Skip to content

Commit

Permalink
Merge branch 'master' into metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Jan 28, 2021
2 parents fa9f8b0 + 558d561 commit f31174d
Show file tree
Hide file tree
Showing 36 changed files with 277 additions and 54 deletions.
74 changes: 54 additions & 20 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,39 +408,73 @@ func (e *LoadDataInfo) indexOfTerminator(bs []byte) int {
fieldTermLen := len(fieldTerm)
lineTerm := []byte(e.LinesInfo.Terminated)
lineTermLen := len(lineTerm)
length := len(bs)
type termType int
const (
notTerm termType = iota
fieldTermType
lineTermType
)
// likely, fieldTermLen should equal to lineTermLen, compare fieldTerm first can avoid useless lineTerm comparison.
cmpTerm := func(restLen int, bs []byte) (typ termType) {
if restLen >= fieldTermLen && bytes.Equal(bs[:fieldTermLen], fieldTerm) {
typ = fieldTermType
return
}
if restLen >= lineTermLen && bytes.Equal(bs[:lineTermLen], lineTerm) {
typ = lineTermType
return
}
return
}
if lineTermLen > fieldTermLen && bytes.HasPrefix(lineTerm, fieldTerm) {
// unlikely, fieldTerm is prefix of lineTerm, we should compare lineTerm first.
cmpTerm = func(restLen int, bs []byte) (typ termType) {
if restLen >= lineTermLen && bytes.Equal(bs[:lineTermLen], lineTerm) {
typ = lineTermType
return
}
if restLen >= fieldTermLen && bytes.Equal(bs[:fieldTermLen], fieldTerm) {
typ = fieldTermType
return
}
return
}
}
atFieldStart := true
inQuoter := false
for i := 0; i < length; i++ {
loop:
for i := 0; i < len(bs); i++ {
if atFieldStart && bs[i] == e.FieldsInfo.Enclosed {
inQuoter = true
atFieldStart = false
continue
}
restLen := length - i - 1
restLen := len(bs) - i - 1
if inQuoter && bs[i] == e.FieldsInfo.Enclosed {
// look ahead to see if it is end of field. if the next is field terminator, then it is.
if restLen >= fieldTermLen && bytes.Equal(bs[i+1:i+fieldTermLen+1], fieldTerm) {
// look ahead to see if it is end of line or field.
switch cmpTerm(restLen, bs[i+1:]) {
case lineTermType:
return i + 1
case fieldTermType:
i += fieldTermLen
inQuoter = false
atFieldStart = true
continue
continue loop
default:
}
// look ahead to see if it is end of line. if the next is line terminator, then return.
if restLen >= lineTermLen && bytes.Equal(bs[i+1:i+lineTermLen+1], lineTerm) {
return i + 1
}
}
// look ahead to see if it is end of field. if the next is field terminator, then it is.
if !inQuoter && restLen >= fieldTermLen-1 && bytes.Equal(bs[i:i+fieldTermLen], fieldTerm) {
i += fieldTermLen - 1
inQuoter = false
atFieldStart = true
continue
}
// look ahead to see if it is end of line. if the next is line terminator, then return.
if !inQuoter && restLen >= lineTermLen-1 && bytes.Equal(bs[i:i+lineTermLen], lineTerm) {
return i
if !inQuoter {
// look ahead to see if it is end of line or field.
switch cmpTerm(restLen+1, bs[i:]) {
case lineTermType:
return i
case fieldTermType:
i += fieldTermLen - 1
inQuoter = false
atFieldStart = true
continue loop
default:
}
}
// if it is escaped char, skip next char.
if bs[i] == e.FieldsInfo.Escaped {
Expand Down
19 changes: 19 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,25 @@ func (s *testSuite4) TestLoadData(c *C) {
{[]byte("xxx1\\1\\\"2\n\"\\3\nxxx4\\4\\\"5\n5\"\\6"), nil, []string{"1|1|2\n|3", "4|4|5\n5|6"}, nil, "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)

ld.LinesInfo.Terminated = "#\n"
ld.FieldsInfo.Terminated = "#"
tests = []testCase{
{[]byte("xxx1#\nxxx2#\n"), nil, []string{"1|<nil>|<nil>|<nil>", "2|<nil>|<nil>|<nil>"}, nil, "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"},
{[]byte("xxx1#2#3#4#\nnxxx2#3#4#5#\n"), nil, []string{"1|2|3|4", "2|3|4|5"}, nil, "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"},
{[]byte("xxx1#2#\"3#\"#\"4\n\"#\nxxx2#3#\"#4#\n\"#5#\n"), nil, []string{"1|2|3#|4", "2|3|#4#\n|5"}, nil, "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)

ld.LinesInfo.Terminated = "#"
ld.FieldsInfo.Terminated = "##"
ld.LinesInfo.Starting = ""
tests = []testCase{
{[]byte("1#2#"), nil, []string{"1|<nil>|<nil>|<nil>", "2|<nil>|<nil>|<nil>"}, nil, "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"},
{[]byte("1##2##3##4#2##3##4##5#"), nil, []string{"1|2|3|4", "2|3|4|5"}, nil, "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"},
{[]byte("1##2##\"3##\"##\"4\n\"#2##3##\"##4#\"##5#"), nil, []string{"1|2|3##|4", "2|3|##4#|5"}, nil, "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}

func (s *testSuite4) TestLoadDataEscape(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tipb/go-binlog"
"github.com/prometheus/client_golang/prometheus"
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package tikv
import (
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
"github.com/pingcap/tidb/kv"
tidbmetrics "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/pingcap/parser/terror"
tidbmetrics "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
tidbmetrics "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/logutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -144,7 +144,7 @@ const (
gcScanLockModeKey = "tikv_gc_scan_lock_mode"
gcScanLockModeLegacy = "legacy"
gcScanLockModePhysical = "physical"
gcScanLockModeDefault = gcScanLockModePhysical
gcScanLockModeDefault = gcScanLockModeLegacy

gcAutoConcurrencyKey = "tikv_gc_auto_concurrency"
gcDefaultAutoConcurrency = true
Expand Down
9 changes: 5 additions & 4 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,23 +499,24 @@ func (s *testGCWorkerSuite) TestCheckGCMode(c *C) {
func (s *testGCWorkerSuite) TestCheckScanLockMode(c *C) {
usePhysical, err := s.gcWorker.checkUsePhysicalScanLock()
c.Assert(err, IsNil)
c.Assert(usePhysical, Equals, false)
c.Assert(usePhysical, Equals, gcScanLockModeDefault == gcScanLockModePhysical)
// Now the row must be set to the default value.
str, err := s.gcWorker.loadValueFromSysTable(gcScanLockModeKey)
c.Assert(err, IsNil)
c.Assert(str, Equals, gcScanLockModeDefault)

err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModeLegacy)
err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModePhysical)
c.Assert(err, IsNil)
usePhysical, err = s.gcWorker.checkUsePhysicalScanLock()
c.Assert(err, IsNil)
c.Assert(usePhysical, Equals, false)
c.Assert(usePhysical, Equals, true)

err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModePhysical)
err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModeLegacy)
c.Assert(err, IsNil)
usePhysical, err = s.gcWorker.checkUsePhysicalScanLock()
c.Assert(err, IsNil)
c.Assert(usePhysical, Equals, true)
c.Assert(usePhysical, Equals, false)

err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, "invalid_mode")
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/latch"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/twmb/murmur3"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)
Expand Down
78 changes: 78 additions & 0 deletions store/tikv/logutil/hex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package logutil

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"reflect"
"strings"

"github.com/golang/protobuf/proto"
)

// Hex defines a fmt.Stringer for proto.Message.
// We can't define the String() method on proto.Message, but we can wrap it.
func Hex(msg proto.Message) fmt.Stringer {
return hexStringer{msg}
}

type hexStringer struct {
proto.Message
}

func (h hexStringer) String() string {
val := reflect.ValueOf(h.Message)
var w bytes.Buffer
prettyPrint(&w, val)
return w.String()
}

func prettyPrint(w io.Writer, val reflect.Value) {
tp := val.Type()
switch val.Kind() {
case reflect.Slice:
elemType := tp.Elem()
if elemType.Kind() == reflect.Uint8 {
fmt.Fprintf(w, "%s", hex.EncodeToString(val.Bytes()))
} else {
fmt.Fprintf(w, "%s", val.Interface())
}
case reflect.Struct:
fmt.Fprintf(w, "{")
for i := 0; i < val.NumField(); i++ {
fv := val.Field(i)
ft := tp.Field(i)
if strings.HasPrefix(ft.Name, "XXX_") {
continue
}
if i != 0 {
fmt.Fprintf(w, " ")
}
fmt.Fprintf(w, "%s:", ft.Name)
prettyPrint(w, fv)
}
fmt.Fprintf(w, "}")
case reflect.Ptr:
if val.IsNil() {
fmt.Fprintf(w, "%v", val.Interface())
} else {
prettyPrint(w, reflect.Indirect(val))
}
default:
fmt.Fprintf(w, "%v", val.Interface())
}
}
Loading

0 comments on commit f31174d

Please sign in to comment.