Skip to content

Commit

Permalink
txn: make kv pairs converted from locks invisible (#42409)
Browse files Browse the repository at this point in the history
ref #28011
  • Loading branch information
zyguan authored Mar 21, 2023
1 parent 083440b commit 3ff66ac
Show file tree
Hide file tree
Showing 134 changed files with 1,266 additions and 958 deletions.
10 changes: 7 additions & 3 deletions br/pkg/glue/console_glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ func (t *Table) maxKeyLen() int {

// Print prints the table.
// The format would be like:
// Key1: <Value>
// Other: <Value>
//
// Key1: <Value>
// Other: <Value>
//
// LongKey: <Value>
// The format may change if the terminal size is small.
func (t *Table) Print() {
Expand Down Expand Up @@ -267,7 +269,9 @@ func (ps PrettyString) slicePointOf(s int) (realSlicePoint, endAt int) {
// It is the abstraction of some subarea of the terminal,
// you might imagine it as a panel in the tmux, but with infinity height.
// For example, printing a frame with the width of 10 chars, and 4 chars offset left, would be like:
// v~~~~~~~~~~v Here is the "width of a frame".
//
// v~~~~~~~~~~v Here is the "width of a frame".
//
// +--+----------+--+
// | Hello, wor |
// | ld. |
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type DupKVStream interface {

// LocalDupKVStream implements the interface of DupKVStream.
// It collects duplicate key-value pairs from a pebble.DB.
//
//goland:noinspection GoNameStartsWithPackageName
type LocalDupKVStream struct {
iter Iter
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ func (b noopBackend) CheckRequirements(context.Context, *backend.CheckCtx) error
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
// TablesFromMeta to succeed:
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// * Name
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// - Name
// - State (must be model.StatePublic)
// - Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
func (b noopBackend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return nil, nil
}
Expand Down
13 changes: 8 additions & 5 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ type stmtTask struct {
}

// WriteBatchRowsToDB write rows in batch mode, which will insert multiple rows like this:
// insert into t1 values (111), (222), (333), (444);
//
// insert into t1 values (111), (222), (333), (444);
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, r kv.Rows) error {
rows := r.(tidbRows)
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
Expand Down Expand Up @@ -500,10 +501,12 @@ func (be *tidbBackend) checkAndBuildStmt(rows tidbRows, tableName string, column
}

// WriteRowsToDB write rows in row-by-row mode, which will insert multiple rows like this:
// insert into t1 values (111);
// insert into t1 values (222);
// insert into t1 values (333);
// insert into t1 values (444);
//
// insert into t1 values (111);
// insert into t1 values (222);
// insert into t1 values (333);
// insert into t1 values (444);
//
// See more details in br#1366: https://github.com/pingcap/br/issues/1366
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, r kv.Rows) error {
rows := r.(tidbRows)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) {
}

// TODO: temporarily disable this test before we fix strict mode
//
//nolint:unused,deadcode
func testStrictMode(t *testing.T) {
s := createMysqlSuite(t)
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,12 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
}

// RunOnce is used by binary lightning and host when using lightning as a library.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. and glue could be nil to let lightning
// use a default glue later.
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a
// caller implemented glue.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. and glue could be nil to let lightning
// use a default glue later.
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a
// caller implemented glue.
//
// deprecated: use RunOnceWithOptions instead.
func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glue glue.Glue) error {
if err := taskCfg.Adjust(taskCtx); err != nil {
Expand Down Expand Up @@ -270,10 +271,10 @@ func (l *Lightning) RunServer() error {
}

// RunOnceWithOptions is used by binary lightning and host when using lightning as a library.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may
// be used:
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may
// be used:
// - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later.
// - WithDumpFileStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a
// storage by config
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStora
}

/*
Mydumper File Loader
Mydumper File Loader
*/
type MDLoader struct {
store storage.ExternalStorage
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/parser_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// }

/*
TODO : test with specified 'regionBlockSize' ...
TODO : test with specified 'regionBlockSize' ...
*/
func TestTableRegion(t *testing.T) {
cfg := newConfigWithSourceDir("./examples")
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,10 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
}

// checkCSVHeader try to check whether the csv header config is consistent with the source csv files by:
// 1. pick one table with two CSV files and a unique/primary key
// 2. read the first row of those two CSV files
// 3. checks if the content of those first rows are compatible with the table schema, and whether the
// two rows are identical, to determine if the first rows are a header rows.
// 1. pick one table with two CSV files and a unique/primary key
// 2. read the first row of those two CSV files
// 3. checks if the content of those first rows are compatible with the table schema, and whether the
// two rows are identical, to determine if the first rows are a header rows.
func (rc *Controller) checkCSVHeader(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta) error {
// if cfg set header = ture but source files actually contain not header, former SchemaCheck should
// return error in this situation, so we need do it again.
Expand Down
1 change: 0 additions & 1 deletion br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
return errors.Trace(err)
}

//
func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error {
var restoreMetaSQL string
var err error
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,9 @@ func TestScatterFinishInTime(t *testing.T) {
// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj)
// rewrite rules: aa -> xx, cc -> bb
// expected regions after split:
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
//
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func TestSplitAndScatter(t *testing.T) {
t.Run("BatchScatter", func(t *testing.T) {
client := initTestClient()
Expand Down Expand Up @@ -448,8 +449,9 @@ func initRewriteRules() *restore.RewriteRules {
}

// expected regions after split:
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
//
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func validateRegions(regions map[uint64]*restore.RegionInfo) bool {
keys := [...]string{"", "aay", "bba", "bbf", "bbh", "bbj", "cca", "xxe", "xxz", ""}
if len(regions) != len(keys)-1 {
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ func (ms *StreamMetadataSet) iterateDataFiles(f func(d *backuppb.DataFileInfo) (
}

// IterateFilesFullyBefore runs the function over all files contain data before the timestamp only.
// 0 before
// |------------------------------------------|
// |-file1---------------| <- File contains records in this TS range would be found.
// |-file2--------------| <- File contains any record out of this won't be found.
//
// 0 before
// |------------------------------------------|
// |-file1---------------| <- File contains records in this TS range would be found.
// |-file2--------------| <- File contains any record out of this won't be found.
//
// This function would call the `f` over file1 only.
func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *backuppb.DataFileInfo) (shouldBreak bool)) {
ms.iterateDataFiles(func(d *backuppb.DataFileInfo) (shouldBreak bool) {
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ var logCountSumRe = regexp.MustCompile(`tikv_stream_handle_kv_batch_sum ([0-9]+)

// MaybeQPS get a number like the QPS of last seconds for each store via the prometheus interface.
// TODO: this is a temporary solution(aha, like in a Hackthon),
// we MUST find a better way for providing this information.
//
// we MUST find a better way for providing this information.
func MaybeQPS(ctx context.Context, mgr *conn.Mgr) (float64, error) {
c := mgr.GetPDClient()
prefix := "http://"
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error {
return nil
}

// RunStreamCommand run all kinds of `stream task``
// RunStreamCommand run all kinds of `stream task
func RunStreamCommand(
ctx context.Context,
g glue.Glue,
Expand Down
1 change: 1 addition & 0 deletions br/tests/br_key_locked/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type Locker struct {
}

// generateLocks sends Prewrite requests to TiKV to generate locks, without committing and rolling back.
//
//nolint:gosec
func (c *Locker) generateLocks(pctx context.Context) error {
log.Info("genLock started")
Expand Down
9 changes: 5 additions & 4 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,11 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
// The handle range is split from PD regions now. Each worker deal with a region table key range one time.
// Each handle range by estimation, concurrent processing needs to perform after the handle range has been acquired.
// The operation flow is as follows:
// 1. Open numbers of defaultWorkers goroutines.
// 2. Split table key range from PD regions.
// 3. Send tasks to running workers by workers's task channel. Each task deals with a region key ranges.
// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done.
// 1. Open numbers of defaultWorkers goroutines.
// 2. Split table key range from PD regions.
// 3. Send tasks to running workers by workers's task channel. Each task deals with a region key ranges.
// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done.
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, indexInfo *model.IndexInfo, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
Expand Down
4 changes: 3 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,9 @@ func ResolveCharsetCollation(charsetOpts ...ast.CharsetOpt) (string, string, err
}

// OverwriteCollationWithBinaryFlag is used to handle the case like
// CREATE TABLE t (a VARCHAR(255) BINARY) CHARSET utf8 COLLATE utf8_general_ci;
//
// CREATE TABLE t (a VARCHAR(255) BINARY) CHARSET utf8 COLLATE utf8_general_ci;
//
// The 'BINARY' sets the column collation to *_bin according to the table charset.
func OverwriteCollationWithBinaryFlag(colDef *ast.ColumnDef, chs, coll string) (newChs string, newColl string) {
ignoreBinFlag := colDef.Tp.GetCharset() != "" && (colDef.Tp.GetCollate() != "" || containsColumnOption(colDef, ast.ColumnOptionCollate))
Expand Down
27 changes: 13 additions & 14 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
//// Copyright 2015 PingCAP, Inc.
////
//// Licensed under the Apache License, Version 2.0 (the "License");
//// you may not use this file except in compliance with the License.
//// You may obtain a copy of the License at
////
//// http://www.apache.org/licenses/LICENSE-2.0
////
//// Unless required by applicable law or agreed to in writing, software
//// distributed under the License is distributed on an "AS IS" BASIS,
//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//// See the License for the specific language governing permissions and
//// limitations under the License.
//
// // Copyright 2015 PingCAP, Inc.
// //
// // Licensed under the Apache License, Version 2.0 (the "License");
// // you may not use this file except in compliance with the License.
// // You may obtain a copy of the License at
// //
// // http://www.apache.org/licenses/LICENSE-2.0
// //
// // Unless required by applicable law or agreed to in writing, software
// // distributed under the License is distributed on an "AS IS" BASIS,
// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// // See the License for the specific language governing permissions and
// // limitations under the License.
package ddl_test

import (
Expand Down
3 changes: 1 addition & 2 deletions ddl/fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package ddl_test

import (
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
18 changes: 11 additions & 7 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,18 @@ func (rc *reorgCtx) clean() {
// 3: clean global index
//
// ddl goroutine >---------+
// ^ |
// | |
// | |
// | | <---(doneCh)--- f()
//
// ^ |
// | |
// | |
// | | <---(doneCh)--- f()
//
// HandleDDLQueue(...) | <---(regular timeout)
// | | <---(ctx done)
// | |
// | |
//
// | | <---(ctx done)
// | |
// | |
//
// A more ddl round <-----+
//
// How can we cancel reorg job?
Expand Down
11 changes: 6 additions & 5 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,9 @@ func (builder *RequestBuilder) SetConcurrency(concurrency int) *RequestBuilder {
}

// SetTiDBServerID sets "TiDBServerID" for "kv.Request"
// ServerID is a unique id of TiDB instance among the cluster.
// See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md
//
// ServerID is a unique id of TiDB instance among the cluster.
// See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md
func (builder *RequestBuilder) SetTiDBServerID(serverID uint64) *RequestBuilder {
builder.Request.TiDBServerID = serverID
return builder
Expand Down Expand Up @@ -450,9 +451,9 @@ func encodeHandleKey(ran *ranger.Range) ([]byte, []byte) {
// interpreted as an int64 variable.
//
// This function does the following:
// 1. split ranges into two groups as described above.
// 2. if there's a range that straddles the int64 boundary, split it into two ranges, which results in one smaller and
// one greater than MaxInt64.
// 1. split ranges into two groups as described above.
// 2. if there's a range that straddles the int64 boundary, split it into two ranges, which results in one smaller and
// one greater than MaxInt64.
//
// if `KeepOrder` is false, we merge the two groups of ranges into one group, to save an rpc call later
// if `desc` is false, return signed ranges first, vice versa.
Expand Down
5 changes: 3 additions & 2 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,9 @@ func (f FileFormat) String() string {
}

// Extension returns the extension for specific format.
// text -> "sql"
// csv -> "csv"
//
// text -> "sql"
// csv -> "csv"
func (f FileFormat) Extension() string {
switch f {
case FileFormatSQLText:
Expand Down
3 changes: 2 additions & 1 deletion executor/aggfuncs/func_ntile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const (

// ntile divides the partition into n ranked groups and returns the group number a row belongs to.
// e.g. We have 11 rows and n = 3. They will be divided into 3 groups.
// First 4 rows belongs to group 1. Following 4 rows belongs to group 2. The last 3 rows belongs to group 3.
//
// First 4 rows belongs to group 1. Following 4 rows belongs to group 2. The last 3 rows belongs to group 3.
type ntile struct {
n uint64
baseAggFunc
Expand Down
Loading

0 comments on commit 3ff66ac

Please sign in to comment.