Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add index acceleration implement #35984

Closed
Closed
15 changes: 15 additions & 0 deletions br/pkg/conn/util/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "util",
srcs = ["util.go"],
importpath = "github.com/pingcap/tidb/br/pkg/conn/util",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/errors",
"//br/pkg/version",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_tikv_pd_client//:client",
],
)
62 changes: 62 additions & 0 deletions br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package util

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
errors2 "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/version"
pd "github.com/tikv/pd/client"
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}

// filter out all stores which are TiFlash.
j := 0
for _, store := range stores {
isTiFlash := false
if version.IsTiFlash(store) {
if storeBehavior == SkipTiFlash {
continue
} else if storeBehavior == ErrorOnTiFlash {
return nil, errors.Annotatef(errors2.ErrPDInvalidResponse,
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
isTiFlash = true
}
if !isTiFlash && storeBehavior == TiFlashOnly {
continue
}
stores[j] = store
j++
}
return stores[:j], nil
}

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
// store (e.g. TiFlash store) is found.
type StoreBehavior uint8

const (
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
// found to be a TiFlash node.
ErrorOnTiFlash StoreBehavior = 0
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash StoreBehavior = 1
// TiFlashOnly caused GetAllTiKVStores to skip the store which is not a
// TiFlash node.
TiFlashOnly StoreBehavior = 2
)
2 changes: 2 additions & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ go_library(
"//br/pkg/storage",
"//br/pkg/utils",
"//br/pkg/version/build",
"//expression",
"//planner/core",
"//util/promutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ type AbstractBackend interface {
// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
// according to the required algorithm.
ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error

// TotalMemoryConsume is only used for local backend to cacul memory consumption.
TotalMemoryConsume() int64
}

// Backend is the delivery target for Lightning
Expand Down Expand Up @@ -280,6 +283,10 @@ func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}

func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
Expand Down Expand Up @@ -405,6 +412,10 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}

func (engine *OpenedEngine) TotalMemoryConsume() int64 {
return engine.engine.backend.TotalMemoryConsume()
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
Expand Down Expand Up @@ -495,3 +506,7 @@ type EngineWriter interface {
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}

func (oe *OpenedEngine) GetEngineUUID() uuid.UUID {
return oe.uuid
}
33 changes: 1 addition & 32 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "kv",
Expand All @@ -25,7 +25,6 @@ go_library(
"//meta/autoid",
"//parser/model",
"//parser/mysql",
"//planner/core",
"//sessionctx",
"//sessionctx/variable",
"//table",
Expand All @@ -42,33 +41,3 @@ go_library(
"@org_uber_go_zap//zapcore",
],
)

go_test(
name = "kv_test",
srcs = [
"session_test.go",
"sql2kv_test.go",
],
embed = [":kv"],
deps = [
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
"//ddl",
"//kv",
"//meta/autoid",
"//parser",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//sessionctx",
"//table",
"//table/tables",
"//tablecodec",
"//types",
"//util/mock",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)
31 changes: 31 additions & 0 deletions br/pkg/lightning/backend/kv/kvtest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "kvtest_test",
srcs = [
"session_test.go",
"sql2kv_test.go",
],
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
"//ddl",
"//kv",
"//meta/autoid",
"//parser",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//sessionctx",
"//table",
"//table/tables",
"//tablecodec",
"//types",
"//util/mock",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kv
package kvtest

import (
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

func TestSession(t *testing.T) {
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
session := kv.NewSession(&kv.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
_, err := session.Txn(true)
require.NoError(t, err)
}
Loading