Skip to content

Commit

Permalink
Merge branch 'master' into grafana-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Dec 1, 2021
2 parents 45a4a0e + 4e61d16 commit 9b51d28
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 61 deletions.
6 changes: 1 addition & 5 deletions cmd/ddltest/ddl_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
zaplog "github.com/pingcap/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand All @@ -48,7 +47,6 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
goctx "golang.org/x/net/context"
Expand Down Expand Up @@ -91,11 +89,9 @@ type ddlSuite struct {
}

func createDDLSuite(t *testing.T) (s *ddlSuite) {
var err error
s = new(ddlSuite)

err := logutil.InitLogger(&logutil.LogConfig{Config: zaplog.Config{Level: *logLevel}})
require.NoError(t, err)

s.quit = make(chan struct{})

s.store, err = store.New(fmt.Sprintf("tikv://%s%s", *etcd, *tikvPath))
Expand Down
36 changes: 36 additions & 0 deletions cmd/ddltest/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddltest

import (
"fmt"
"os"
"testing"

zaplog "github.com/pingcap/log"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/testbridge"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testbridge.WorkaroundGoCheckFlags()
err := logutil.InitLogger(&logutil.LogConfig{Config: zaplog.Config{Level: *logLevel}})
if err != nil {
fmt.Fprint(os.Stderr, err.Error())
os.Exit(1)
}
goleak.VerifyTestMain(m)
}
7 changes: 6 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,12 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc
}
if mutation := req.Mutations[0]; mutation != nil {
label := resourcegrouptag.GetResourceGroupLabelByKey(mutation.Key)
return seVars.StmtCtx.GetResourceGroupTagByLabel(label)
normalized, digest := seVars.StmtCtx.SQLDigest()
if len(normalized) == 0 {
return nil
}
_, planDigest := seVars.StmtCtx.GetPlanDigest()
return resourcegrouptag.EncodeResourceGroupTag(digest, planDigest, label)
}
return nil
}
Expand Down
15 changes: 6 additions & 9 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ func (e *LoadDataInfo) isInQuoter(bs []byte) bool {
return inQuoter
}

// indexOfTerminator return index of terminator, if not, return -1.
// IndexOfTerminator return index of terminator, if not, return -1.
// normally, the field terminator and line terminator is short, so we just use brute force algorithm.
func (e *LoadDataInfo) indexOfTerminator(bs []byte, isInQuoter bool) int {
func (e *LoadDataInfo) IndexOfTerminator(bs []byte, inQuoter bool) int {
fieldTerm := []byte(e.FieldsInfo.Terminated)
fieldTermLen := len(fieldTerm)
lineTerm := []byte(e.LinesInfo.Terminated)
Expand Down Expand Up @@ -459,13 +459,10 @@ func (e *LoadDataInfo) indexOfTerminator(bs []byte, isInQuoter bool) int {
}
}
atFieldStart := true
inQuoter := false
loop:
for i := 0; i < len(bs); i++ {
if atFieldStart && bs[i] == e.FieldsInfo.Enclosed {
if !isInQuoter {
inQuoter = true
}
inQuoter = !inQuoter
atFieldStart = false
continue
}
Expand Down Expand Up @@ -525,7 +522,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, [
if ignore {
endIdx = strings.Index(string(hack.String(curData[curStartIdx:])), e.LinesInfo.Terminated)
} else {
endIdx = e.indexOfTerminator(curData[curStartIdx:], inquotor)
endIdx = e.IndexOfTerminator(curData[curStartIdx:], inquotor)
}
}
if endIdx == -1 {
Expand All @@ -539,7 +536,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, [
if ignore {
endIdx = strings.Index(string(hack.String(curData[startingLen:])), e.LinesInfo.Terminated)
} else {
endIdx = e.indexOfTerminator(curData[startingLen:], inquotor)
endIdx = e.IndexOfTerminator(curData[startingLen:], inquotor)
}
if endIdx != -1 {
nextDataIdx := startingLen + endIdx + terminatedLen
Expand All @@ -560,7 +557,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, [
if ignore {
endIdx = strings.Index(string(hack.String(prevData[startingLen:])), e.LinesInfo.Terminated)
} else {
endIdx = e.indexOfTerminator(prevData[startingLen:], inquotor)
endIdx = e.IndexOfTerminator(prevData[startingLen:], inquotor)
}
if endIdx >= prevLen {
return prevData[startingLen : startingLen+endIdx], curData[nextDataIdx:], true
Expand Down
22 changes: 22 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -2129,6 +2130,27 @@ func TestLoadDataEscape(t *testing.T) {
checkCases(tests, ld, t, tk, ctx, selectSQL, deleteSQL)
}

func TestLoadDataWithLongContent(t *testing.T) {
e := &executor.LoadDataInfo{
FieldsInfo: &ast.FieldsClause{Terminated: ",", Escaped: '\\', Enclosed: '"'},
LinesInfo: &ast.LinesClause{Terminated: "\n"},
}
tests := []struct {
content string
inQuoter bool
expectedIndex int
}{
{"123,123\n123,123", false, 7},
{"123123\\n123123", false, -1},
{"123123\n123123", true, -1},
{"123123\n123123\"\n", true, 14},
}

for _, tt := range tests {
require.Equal(t, tt.expectedIndex, e.IndexOfTerminator([]byte(tt.content), tt.inQuoter))
}
}

// TestLoadDataSpecifiedColumns reuse TestLoadDataEscape's test case :-)
func TestLoadDataSpecifiedColumns(t *testing.T) {
trivialMsg := "Records: 1 Deleted: 0 Skipped: 0 Warnings: 0"
Expand Down
10 changes: 8 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1894,7 +1894,7 @@ func getStmtCnt(content string) (stmtCnt map[string]int) {

const retryTime = 100

func (cli *testServerClient) waitUntilServerOnline() {
func (cli *testServerClient) waitUntilServerCanConnect() {
// connect server
retry := 0
for ; retry < retryTime; retry++ {
Expand All @@ -1911,8 +1911,14 @@ func (cli *testServerClient) waitUntilServerOnline() {
if retry == retryTime {
log.Fatal("failed to connect DB in every 10 ms", zap.Int("retryTime", retryTime))
}
}

for retry = 0; retry < retryTime; retry++ {
func (cli *testServerClient) waitUntilServerOnline() {
// connect server
cli.waitUntilServerCanConnect()

retry := 0
for ; retry < retryTime; retry++ {
// fetch http status
resp, err := cli.fetchStatus("/status")
if err == nil {
Expand Down
116 changes: 109 additions & 7 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func TestSocketAndIp(t *testing.T) {
err := server.Run()
require.NoError(t, err)
}()
time.Sleep(time.Millisecond * 100)
cli.waitUntilServerCanConnect()
defer server.Close()

// Test with Socket connection + Setup user1@% for all host access
Expand Down Expand Up @@ -689,17 +689,19 @@ func TestOnlySocket(t *testing.T) {
db, err := sql.Open("mysql", cli.getDSN(func(config *mysql.Config) {
config.User = "root"
config.DBName = "test"
config.Addr = "127.0.0.1"
}))
require.NoErrorf(t, err, "Connect succeeded when not configured!?!")
defer db.Close()
require.NoErrorf(t, err, "Open failed")
err = db.Ping()
require.Errorf(t, err, "Connect succeeded when not configured!?!")
db.Close()
db, err = sql.Open("mysql", cli.getDSN(func(config *mysql.Config) {
config.User = "user1"
config.DBName = "test"
config.Addr = "127.0.0.1"
}))
require.NoErrorf(t, err, "Connect succeeded when not configured!?!")
defer db.Close()
require.NoErrorf(t, err, "Open failed")
err = db.Ping()
require.Errorf(t, err, "Connect succeeded when not configured!?!")
db.Close()
// Test with unix domain socket file connection with all hosts
cli.runTests(t, func(config *mysql.Config) {
config.Net = "unix"
Expand Down Expand Up @@ -1677,3 +1679,103 @@ func (ts *tidbTestTopSQLSuite) loopExec(ctx context.Context, t *testing.T, fn fu
fn(db)
}
}

func TestLocalhostClientMapping(t *testing.T) {
t.Parallel()
osTempDir := os.TempDir()
tempDir, err := os.MkdirTemp(osTempDir, "tidb-test.*.socket")
require.NoError(t, err)
socketFile := tempDir + "/tidbtest.sock" // Unix Socket does not work on Windows, so '/' should be OK
defer os.RemoveAll(tempDir)

cli := newTestServerClient()
cfg := newTestConfig()
cfg.Socket = socketFile
cfg.Port = cli.port
cfg.Status.ReportStatus = false

ts, cleanup := createTidbTestSuite(t)
defer cleanup()

server, err := NewServer(cfg, ts.tidbdrv)
require.NoError(t, err)
cli.port = getPortFromTCPAddr(server.listener.Addr())
go func() {
err := server.Run()
require.NoError(t, err)
}()
defer server.Close()
cli.waitUntilServerCanConnect()

cli.port = getPortFromTCPAddr(server.listener.Addr())
// Create a db connection for root
db, err := sql.Open("mysql", cli.getDSN(func(config *mysql.Config) {
config.User = "root"
config.Net = "unix"
config.DBName = "test"
config.Addr = socketFile
}))
require.NoErrorf(t, err, "Open failed")
err = db.Ping()
require.NoErrorf(t, err, "Ping failed")
defer db.Close()
dbt := testkit.NewDBTestKit(t, db)
rows := dbt.MustQuery("select user()")
cli.checkRows(t, rows, "root@localhost")
rows = dbt.MustQuery("show grants")
cli.checkRows(t, rows, "GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION")

dbt.MustExec("CREATE USER 'localhostuser'@'localhost'")
dbt.MustExec("CREATE USER 'localhostuser'@'%'")
defer func() {
dbt.MustExec("DROP USER IF EXISTS 'localhostuser'@'%'")
dbt.MustExec("DROP USER IF EXISTS 'localhostuser'@'localhost'")
dbt.MustExec("DROP USER IF EXISTS 'localhostuser'@'127.0.0.1'")
}()

dbt.MustExec("GRANT SELECT ON test.* TO 'localhostuser'@'%'")
dbt.MustExec("GRANT SELECT,UPDATE ON test.* TO 'localhostuser'@'localhost'")

// Test with loopback interface - Should get access to localhostuser@localhost!
cli.runTests(t, func(config *mysql.Config) {
config.User = "localhostuser"
config.DBName = "test"
},
func(dbt *testkit.DBTestKit) {
rows := dbt.MustQuery("select user()")
// NOTICE: this is not compatible with MySQL! (MySQL would report localhostuser@localhost also for 127.0.0.1)
cli.checkRows(t, rows, "localhostuser@127.0.0.1")
rows = dbt.MustQuery("show grants")
cli.checkRows(t, rows, "GRANT USAGE ON *.* TO 'localhostuser'@'localhost'\nGRANT SELECT,UPDATE ON test.* TO 'localhostuser'@'localhost'")
})

dbt.MustExec("DROP USER IF EXISTS 'localhostuser'@'localhost'")
dbt.MustExec("CREATE USER 'localhostuser'@'127.0.0.1'")
dbt.MustExec("GRANT SELECT,UPDATE ON test.* TO 'localhostuser'@'127.0.0.1'")
// Test with unix domain socket file connection - Should get access to '%'
cli.runTests(t, func(config *mysql.Config) {
config.Net = "unix"
config.Addr = socketFile
config.User = "localhostuser"
config.DBName = "test"
},
func(dbt *testkit.DBTestKit) {
rows := dbt.MustQuery("select user()")
cli.checkRows(t, rows, "localhostuser@localhost")
rows = dbt.MustQuery("show grants")
cli.checkRows(t, rows, "GRANT USAGE ON *.* TO 'localhostuser'@'%'\nGRANT SELECT ON test.* TO 'localhostuser'@'%'")
})

// Test if only localhost exists
dbt.MustQuery("DROP USER 'localhostuser'@'%'")
dbSocket, err := sql.Open("mysql", cli.getDSN(func(config *mysql.Config) {
config.User = "localhostuser"
config.Net = "unix"
config.DBName = "test"
config.Addr = socketFile
}))
require.NoErrorf(t, err, "Open failed")
defer dbSocket.Close()
err = dbSocket.Ping()
require.Errorf(t, err, "Connection successful without matching host for unix domain socket!")
}
20 changes: 6 additions & 14 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/tracing"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
atomic2 "go.uber.org/atomic"
Expand Down Expand Up @@ -289,27 +288,20 @@ func (sc *StatementContext) GetPlanDigest() (normalized string, planDigest *pars

// GetResourceGroupTagger returns the implementation of tikvrpc.ResourceGroupTagger related to self.
func (sc *StatementContext) GetResourceGroupTagger() tikvrpc.ResourceGroupTagger {
normalized, digest := sc.SQLDigest()
planDigest := sc.planDigest
return func(req *tikvrpc.Request) {
if req == nil {
return
}
req.ResourceGroupTag = sc.GetResourceGroupTagByLabel(
if len(normalized) == 0 {
return
}
req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, planDigest,
resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req)))
}
}

// GetResourceGroupTagByLabel gets the resource group of the statement based on the label.
func (sc *StatementContext) GetResourceGroupTagByLabel(label tipb.ResourceGroupTagLabel) []byte {
if sc == nil {
return nil
}
normalized, sqlDigest := sc.SQLDigest()
if len(normalized) == 0 {
return nil
}
return resourcegrouptag.EncodeResourceGroupTag(sqlDigest, sc.planDigest, label)
}

// SetPlanDigest sets the normalized plan and plan digest.
func (sc *StatementContext) SetPlanDigest(normalized string, planDigest *parser.Digest) {
if planDigest != nil {
Expand Down
Loading

0 comments on commit 9b51d28

Please sign in to comment.