Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into add-sequence-operator
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Apr 4, 2023
2 parents e4010a8 + a9d7577 commit 699e39d
Show file tree
Hide file tree
Showing 279 changed files with 7,793 additions and 3,917 deletions.
3 changes: 3 additions & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ header:
- "tidb-binlog/driver/example"
- "tidb-binlog/proto/go-binlog/secondary_binlog.pb.go"
- "**/*.sql"
- "**/*.csv"
- "**/*.parquet"
- "**/*.zst"
- ".bazelversion"
- "build/image/.ci_bazel"
comment: on-failure
16 changes: 8 additions & 8 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,8 @@ def go_deps():
name = "com_github_golangci_golangci_lint",
build_file_proto_mode = "disable",
importpath = "github.com/golangci/golangci-lint",
sum = "h1:TwQtQi5dGE/uFOxYGKwddJo7T9sHsRfTUN00HZMl5Jo=",
version = "v1.52.1",
sum = "h1:FrPElUUI5rrHXg1mQ7KxI1MXPAw5lBVskiz7U7a8a1A=",
version = "v1.52.2",
)
go_repository(
name = "com_github_golangci_gosec",
Expand Down Expand Up @@ -3015,8 +3015,8 @@ def go_deps():
name = "com_github_moricho_tparallel",
build_file_proto_mode = "disable",
importpath = "github.com/moricho/tparallel",
sum = "h1:8dDx3S3e+jA+xiQXC7O3dvfRTe/J+FYlTDDW01Y7z/Q=",
version = "v0.3.0",
sum = "h1:fQKD4U1wRMAYNngDonW5XupoB/ZGJHdpzrWqgyg9krA=",
version = "v0.3.1",
)

go_repository(
Expand Down Expand Up @@ -4108,8 +4108,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:m5Y7tBW5Rq8L1ANxibitBa/DInDy3hA2Qvk1Ys9u1NU=",
version = "v2.0.7-0.20230317032622-884a634378d4",
sum = "h1:XpdZrei86oIrRjXbqvlQh23TdHXVtSxWmsxxwy/Zgc0=",
version = "v2.0.7-0.20230328084104-ea13e9700259",
)
go_repository(
name = "com_github_tikv_pd",
Expand All @@ -4123,8 +4123,8 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sum = "h1:hZ9gL1wq4dnIzEHjoTB4MHiqi3cA6pqANL4yxUtyCP0=",
version = "v0.0.0-20230324033443-79ec29cee8db",
sum = "h1:bzlSSzw+6qTwPs8pMcPI1bt27TAOhSdAEwdPCz6eBlg=",
version = "v0.0.0-20230329114254-1948c247c2b1",
)
go_repository(
name = "com_github_timakin_bodyclose",
Expand Down
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ errdoc:tools/bin/errdoc-gen
lint:tools/bin/revive
@echo "linting"
@tools/bin/revive -formatter friendly -config tools/check/revive.toml $(FILES_TIDB_TESTS)
@tools/bin/revive -formatter friendly -config tools/check/revive.toml ./br/pkg/lightning/...

license:
bazel $(BAZEL_GLOBAL_CONFIG) run $(BAZEL_CMD_CONFIG) \
Expand Down Expand Up @@ -333,8 +334,8 @@ mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

mock_lightning:
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter,TargetInfoGetter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down Expand Up @@ -479,6 +480,10 @@ bazel_addindextest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/addindextest/...

bazel_loaddatatest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/loaddatatest/...

bazel_lint: bazel_prepare
bazel build //... --//build:with_nogo_flag=true

Expand Down
16 changes: 8 additions & 8 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "io_bazel_rules_go",
sha256 = "dd926a88a564a9246713a9c00b35315f54cbd46b31a26d5d8fb264c07045f05d",
sha256 = "6b65cb7917b4d1709f9410ffe00ecf3e160edf674b78c54a894471320862184f",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.38.1/rules_go-v0.38.1.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.38.1/rules_go-v0.38.1.zip",
"https://ats.apps.svc/bazelbuild/rules_go/releases/download/v0.38.1/rules_go-v0.38.1.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.39.0/rules_go-v0.39.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.39.0/rules_go-v0.39.0.zip",
"https://ats.apps.svc/bazelbuild/rules_go/releases/download/v0.39.0/rules_go-v0.39.0.zip",
],
)

http_archive(
name = "bazel_gazelle",
sha256 = "ecba0f04f96b4960a5b250c8e8eeec42281035970aa8852dda73098274d14a1d",
sha256 = "727f3e4edd96ea20c29e8c2ca9e8d2af724d8c7778e7923a854b2c80952bc405",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/bazel-gazelle/releases/download/v0.29.0/bazel-gazelle-v0.29.0.tar.gz",
"https://github.com/bazelbuild/bazel-gazelle/releases/download/v0.29.0/bazel-gazelle-v0.29.0.tar.gz",
"http://ats.apps.svc/bazelbuild/bazel-gazelle/releases/download/v0.29.0/bazel-gazelle-v0.29.0.tar.gz",
"https://mirror.bazel.build/github.com/bazelbuild/bazel-gazelle/releases/download/v0.30.0/bazel-gazelle-v0.30.0.tar.gz",
"https://github.com/bazelbuild/bazel-gazelle/releases/download/v0.30.0/bazel-gazelle-v0.30.0.tar.gz",
"http://ats.apps.svc/bazelbuild/bazel-gazelle/releases/download/v0.30.0/bazel-gazelle-v0.30.0.tar.gz",
],
)

Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ go_library(
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//parser/model",
"//table",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
61 changes: 19 additions & 42 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -76,6 +74,7 @@ func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger
)
}

// MakeUUID generates a UUID for the engine and a tag for the engine.
func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {
tag := makeTag(tableName, engineID)
engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag))
Expand All @@ -84,6 +83,7 @@ func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {

var engineNamespace = uuid.MustParse("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf")

// EngineFileSize represents the size of an engine on disk and in memory.
type EngineFileSize struct {
// UUID is the engine's UUID.
UUID uuid.UUID
Expand Down Expand Up @@ -149,8 +149,6 @@ type TargetInfoGetter interface {
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
encode.EncodingBuilder
TargetInfoGetter
// Close the connection to the backend.
Close()

Expand Down Expand Up @@ -196,18 +194,6 @@ type AbstractBackend interface {
// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)

// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
// may be repeated with other keys in local data source.
CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error)

// CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with
// the data import by other lightning.
CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error)

// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
// according to the required algorithm.
ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error

// TotalMemoryConsume counts total memory usage. This is only used for local backend.
TotalMemoryConsume() int64
}
Expand Down Expand Up @@ -248,43 +234,33 @@ type ClosedEngine struct {
engine
}

// LocalEngineWriter is a thread-local writer for writing rows into a single engine.
type LocalEngineWriter struct {
writer EngineWriter
tableName string
}

// MakeBackend creates a new Backend from an AbstractBackend.
func MakeBackend(ab AbstractBackend) Backend {
return Backend{abstract: ab}
}

// Close the connection to the backend.
func (be Backend) Close() {
be.abstract.Close()
}

func (be Backend) MakeEmptyRows() encode.Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) {
return be.abstract.NewEncoder(ctx, config)
}

// ShouldPostProcess returns whether KV-specific post-processing should be
func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

func (be Backend) CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error {
return be.abstract.CheckRequirements(ctx, checkCtx)
}

func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return be.abstract.FetchRemoteTableModels(ctx, schemaName)
}

// FlushAll flushes all opened engines.
func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}

// TotalMemoryConsume returns the total memory consumed by the backend.
func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}
Expand Down Expand Up @@ -380,16 +356,9 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
}, nil
}

func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) {
return be.abstract.CollectLocalDuplicateRows(ctx, tbl, tableName, opts)
}

func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) {
return be.abstract.CollectRemoteDuplicateRows(ctx, tbl, tableName, opts)
}

func (be Backend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error {
return be.abstract.ResolveDuplicateRows(ctx, tbl, tableName, algorithm)
// Inner returns the underlying abstract backend.
func (be Backend) Inner() AbstractBackend {
return be.abstract
}

// Close the opened engine to prepare it for importing.
Expand All @@ -408,6 +377,7 @@ func (engine *OpenedEngine) Flush(ctx context.Context) error {
return engine.backend.FlushEngine(ctx, engine.uuid)
}

// LocalWriter returns a writer that writes to the local backend.
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (*LocalEngineWriter, error) {
w, err := engine.backend.LocalWriter(ctx, cfg, engine.uuid)
if err != nil {
Expand All @@ -416,6 +386,7 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}

// TotalMemoryConsume returns the total memory consumed by the engine.
func (engine *OpenedEngine) TotalMemoryConsume() int64 {
return engine.engine.backend.TotalMemoryConsume()
}
Expand All @@ -425,10 +396,12 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string,
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}

// Close closes the engine and returns the status of the engine.
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}

// IsSynced returns whether the engine is synced.
func (w *LocalEngineWriter) IsSynced() bool {
return w.writer.IsSynced()
}
Expand Down Expand Up @@ -498,14 +471,17 @@ func (engine *ClosedEngine) Cleanup(ctx context.Context) error {
return err
}

// Logger returns the logger for the engine.
func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}

// ChunkFlushStatus is the status of a chunk flush.
type ChunkFlushStatus interface {
Flushed() bool
}

// EngineWriter is the interface for writing data to an engine.
type EngineWriter interface {
AppendRows(
ctx context.Context,
Expand All @@ -517,6 +493,7 @@ type EngineWriter interface {
Close(ctx context.Context) (ChunkFlushStatus, error)
}

// GetEngineUUID returns the engine UUID.
func (engine *OpenedEngine) GetEngineUUID() uuid.UUID {
return engine.uuid
}
10 changes: 6 additions & 4 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type backendSuite struct {
controller *gomock.Controller
mockBackend *mock.MockBackend
encBuilder *mock.MockEncodingBuilder
backend backend.Backend
ts uint64
}
Expand All @@ -32,6 +33,7 @@ func createBackendSuite(c gomock.TestReporter) *backendSuite {
controller: controller,
mockBackend: mockBackend,
backend: backend.MakeBackend(mockBackend),
encBuilder: mock.NewMockEncodingBuilder(controller),
ts: oracle.ComposeTS(time.Now().Unix()*1000, 0),
}
}
Expand Down Expand Up @@ -316,8 +318,8 @@ func TestMakeEmptyRows(t *testing.T) {
defer s.tearDownTest()

rows := mock.NewMockRows(s.controller)
s.mockBackend.EXPECT().MakeEmptyRows().Return(rows)
require.Equal(t, rows, s.mockBackend.MakeEmptyRows())
s.encBuilder.EXPECT().MakeEmptyRows().Return(rows)
require.Equal(t, rows, s.encBuilder.MakeEmptyRows())
}

func TestNewEncoder(t *testing.T) {
Expand All @@ -328,9 +330,9 @@ func TestNewEncoder(t *testing.T) {
options := &encode.EncodingConfig{
SessionOptions: encode.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890},
}
s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder, nil)
s.encBuilder.EXPECT().NewEncoder(nil, options).Return(encoder, nil)

realEncoder, err := s.mockBackend.NewEncoder(nil, options)
realEncoder, err := s.encBuilder.NewEncoder(nil, options)
require.Equal(t, realEncoder, encoder)
require.NoError(t, err)
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/types"
)

// EncodingConfig is the configuration for the encoding backend.
type EncodingConfig struct {
SessionOptions
Path string // path of data file
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "kv",
srcs = [
"allocator.go",
"base.go",
"kv2sql.go",
"session.go",
"sql2kv.go",
Expand Down
Loading

0 comments on commit 699e39d

Please sign in to comment.