Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tidb into fix45673
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Aug 22, 2023
2 parents 8b90135 + dd593b2 commit f947940
Show file tree
Hide file tree
Showing 46 changed files with 669 additions and 123 deletions.
21 changes: 21 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,27 @@ bazel_importintotest: failpoint-enable bazel_ci_simple_prepare
-- //tests/realtikvtest/importintotest/...
./build/jenkins_collect_coverage.sh

# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
bazel_importintotest2: failpoint-enable bazel_ci_simple_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=deadlock,intest \
--@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //tests/realtikvtest/importintotest2/...
./build/jenkins_collect_coverage.sh

# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
bazel_importintotest3: failpoint-enable bazel_ci_simple_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=deadlock,intest \
--@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //tests/realtikvtest/importintotest3/...
./build/jenkins_collect_coverage.sh

# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
bazel_importintotest4: failpoint-enable bazel_ci_simple_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=deadlock,intest \
--@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //tests/realtikvtest/importintotest4/...
./build/jenkins_collect_coverage.sh

bazel_lint: bazel_prepare
bazel build //... --//build:with_nogo_flag=true

Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ go_test(
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
],
)
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/external/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"io"
"slices"
"strings"
"testing"
"time"
Expand All @@ -28,7 +29,6 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"golang.org/x/exp/slices"
)

func TestWriter(t *testing.T) {
Expand Down Expand Up @@ -62,8 +62,8 @@ func TestWriter(t *testing.T) {
_, err = writer.Close(ctx)
require.NoError(t, err)

slices.SortFunc(kvs, func(i, j common.KvPair) bool {
return bytes.Compare(i.Key, j.Key) < 0
slices.SortFunc(kvs, func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})

bufSize := rand.Intn(100) + 1
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ go_library(
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
Expand Down
13 changes: 9 additions & 4 deletions br/pkg/lightning/backend/local/disk_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package local

import (
"cmp"
"slices"

"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"golang.org/x/exp/slices"
)

// DiskUsage is an interface to obtain the size occupied locally of all engines
Expand All @@ -38,11 +40,14 @@ func CheckDiskQuota(mgr DiskUsage, quota int64) (
totalMemSize int64,
) {
sizes := mgr.EngineFileSizes()
slices.SortFunc(sizes, func(i, j backend.EngineFileSize) bool {
slices.SortFunc(sizes, func(i, j backend.EngineFileSize) int {
if i.IsImporting != j.IsImporting {
return i.IsImporting
if i.IsImporting {
return -1
}
return 1
}
return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize
return cmp.Compare(i.DiskSize+i.MemSize, j.DiskSize+j.MemSize)
})
for _, size := range sizes {
totalDiskSize += size.DiskSize
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ go_test(
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_x_exp//slices",
"@org_uber_go_zap//:zap",
],
)
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/dup_detect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package importer

import (
"context"
"slices"
"testing"

"github.com/pingcap/errors"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/pingcap/tidb/util/dbutil"
"github.com/pingcap/tidb/util/extsort"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)

var (
Expand Down
2 changes: 2 additions & 0 deletions cmd/explaintest/r/collation_misc_enabled.result
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ utf8 83 1
utf8 33 1
utf8 192 1
utf8mb4 255 1
utf8mb4 309 1
utf8mb4 46 1
utf8mb4 45 1
utf8mb4 224 1
Expand All @@ -130,6 +131,7 @@ utf8_bin utf8 83 Yes Yes 1
utf8_general_ci utf8 33 Yes 1
utf8_unicode_ci utf8 192 Yes 1
utf8mb4_0900_ai_ci utf8mb4 255 Yes 1
utf8mb4_0900_bin utf8mb4 309 Yes 1
utf8mb4_bin utf8mb4 46 Yes Yes 1
utf8mb4_general_ci utf8mb4 45 Yes 1
utf8mb4_unicode_ci utf8mb4 224 Yes 1
Expand Down
3 changes: 1 addition & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/syncutil"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -747,7 +746,7 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
}
}
// reorg worker count at least 1 at most 10.
reorgCnt := mathutil.Min(mathutil.Max(runtime.GOMAXPROCS(0)/4, 1), reorgWorkerCnt)
reorgCnt := min(max(runtime.GOMAXPROCS(0)/4, 1), reorgWorkerCnt)
d.reorgWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(addIdxWorker), reorgCnt, reorgCnt, 0), reorg)
d.generalDDLWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(generalWorker), generalWorkerCnt, generalWorkerCnt, 0), general)
failpoint.Inject("NoDDLDispatchLoop", func(val failpoint.Value) {
Expand Down
11 changes: 6 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package executor

import (
"cmp"
"context"
"fmt"
"math"
"runtime/pprof"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -82,7 +84,6 @@ import (
tikvutil "github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -2475,8 +2476,8 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask) (_ workerpool.None) {
trySaveErr(err)
return
}
slices.SortFunc(tableChecksum, func(i, j groupByChecksum) bool {
return i.bucket < j.bucket
slices.SortFunc(tableChecksum, func(i, j groupByChecksum) int {
return cmp.Compare(i.bucket, j.bucket)
})

// compute index side checksum.
Expand All @@ -2485,8 +2486,8 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask) (_ workerpool.None) {
trySaveErr(err)
return
}
slices.SortFunc(indexChecksum, func(i, j groupByChecksum) bool {
return i.bucket < j.bucket
slices.SortFunc(indexChecksum, func(i, j groupByChecksum) int {
return cmp.Compare(i.bucket, j.bucket)
})

currentOffset := 0
Expand Down
24 changes: 15 additions & 9 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"runtime/trace"
"slices"
"sync"
"sync/atomic"

Expand All @@ -38,7 +39,6 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

// IndexLookUpMergeJoin realizes IndexLookUpJoin by merge join
Expand Down Expand Up @@ -450,23 +450,29 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo
// Because the necessary condition of merge join is both outer and inner keep order of join keys.
// In this case, we need sort the outer side.
if imw.outerMergeCtx.needOuterSort {
slices.SortFunc(task.outerOrderIdx, func(idxI, idxJ chunk.RowPtr) bool {
slices.SortFunc(task.outerOrderIdx, func(idxI, idxJ chunk.RowPtr) int {
rowI, rowJ := task.outerResult.GetRow(idxI), task.outerResult.GetRow(idxJ)
var cmp int64
var c int64
var err error
for _, keyOff := range imw.keyOff2KeyOffOrderByIdx {
joinKey := imw.outerMergeCtx.joinKeys[keyOff]
cmp, _, err = imw.outerMergeCtx.compareFuncs[keyOff](imw.ctx, joinKey, joinKey, rowI, rowJ)
c, _, err = imw.outerMergeCtx.compareFuncs[keyOff](imw.ctx, joinKey, joinKey, rowI, rowJ)
terror.Log(err)
if cmp != 0 {
if c != 0 {
break
}
}
if cmp != 0 || imw.nextColCompareFilters == nil {
return (cmp < 0 && !imw.desc) || (cmp > 0 && imw.desc)
if c != 0 || imw.nextColCompareFilters == nil {
if imw.desc {
return int(-c)
}
return int(c)
}
c = int64(imw.nextColCompareFilters.CompareRow(rowI, rowJ))
if imw.desc {
return int(-c)
}
cmp = int64(imw.nextColCompareFilters.CompareRow(rowI, rowJ))
return (cmp < 0 && !imw.desc) || (cmp > 0 && imw.desc)
return int(c)
})
}
dLookUpKeys, err := imw.constructDatumLookupKeys(task)
Expand Down
31 changes: 26 additions & 5 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"container/heap"
"context"
"errors"
"slices"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor/internal/exec"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"golang.org/x/exp/slices"
)

// SortExec represents sorting executor.
Expand Down Expand Up @@ -143,7 +143,7 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *SortExec) externalSorting(req *chunk.Chunk) (err error) {
if e.multiWayMerge == nil {
e.multiWayMerge = &multiWayMerge{e.lessRow, make([]partitionPointer, 0, len(e.partitionList))}
e.multiWayMerge = &multiWayMerge{e.lessRow, e.compressRow, make([]partitionPointer, 0, len(e.partitionList))}
for i := 0; i < len(e.partitionList); i++ {
row, err := e.partitionList[i].GetSortedRow(0)
if err != nil {
Expand Down Expand Up @@ -273,15 +273,30 @@ func (e *SortExec) lessRow(rowI, rowJ chunk.Row) bool {
return false
}

func (e *SortExec) compressRow(rowI, rowJ chunk.Row) int {
for i, colIdx := range e.keyColumns {
cmpFunc := e.keyCmpFuncs[i]
cmp := cmpFunc(rowI, colIdx, rowJ, colIdx)
if e.ByItems[i].Desc {
cmp = -cmp
}
if cmp != 0 {
return cmp
}
}
return 0
}

type partitionPointer struct {
row chunk.Row
partitionID int
consumed int
}

type multiWayMerge struct {
lessRowFunction func(rowI chunk.Row, rowJ chunk.Row) bool
elements []partitionPointer
lessRowFunction func(rowI chunk.Row, rowJ chunk.Row) bool
compressRowFunction func(rowI chunk.Row, rowJ chunk.Row) int
elements []partitionPointer
}

func (h *multiWayMerge) Less(i, j int) bool {
Expand Down Expand Up @@ -376,6 +391,12 @@ func (e *TopNExec) keyColumnsLess(i, j chunk.RowPtr) bool {
return e.lessRow(rowI, rowJ)
}

func (e *TopNExec) keyColumnsCompare(i, j chunk.RowPtr) int {
rowI := e.rowChunks.GetRow(i)
rowJ := e.rowChunks.GetRow(j)
return e.compressRow(rowI, rowJ)
}

func (e *TopNExec) initPointers() {
e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len())
e.memTracker.Consume(int64(8 * e.rowChunks.Len()))
Expand Down Expand Up @@ -481,7 +502,7 @@ func (e *TopNExec) executeTopN(ctx context.Context) error {
}
}
}
slices.SortFunc(e.rowPtrs, e.keyColumnsLess)
slices.SortFunc(e.rowPtrs, e.keyColumnsCompare)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions executor/test/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,7 @@ func TestShowForNewCollations(t *testing.T) {
"utf8_general_ci utf8 33 Yes 1",
"utf8_unicode_ci utf8 192 Yes 1",
"utf8mb4_0900_ai_ci utf8mb4 255 Yes 1",
"utf8mb4_0900_bin utf8mb4 309 Yes 1",
"utf8mb4_bin utf8mb4 46 Yes Yes 1",
"utf8mb4_general_ci utf8mb4 45 Yes 1",
"utf8mb4_unicode_ci utf8mb4 224 Yes 1",
Expand Down
1 change: 0 additions & 1 deletion planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ go_library(
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
Loading

0 comments on commit f947940

Please sign in to comment.