Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: make kv pairs converted from locks invisible #42409

Merged
merged 3 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions br/pkg/glue/console_glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ func (t *Table) maxKeyLen() int {

// Print prints the table.
// The format would be like:
// Key1: <Value>
// Other: <Value>
//
// Key1: <Value>
// Other: <Value>
//
// LongKey: <Value>
// The format may change if the terminal size is small.
func (t *Table) Print() {
Expand Down Expand Up @@ -267,7 +269,9 @@ func (ps PrettyString) slicePointOf(s int) (realSlicePoint, endAt int) {
// It is the abstraction of some subarea of the terminal,
// you might imagine it as a panel in the tmux, but with infinity height.
// For example, printing a frame with the width of 10 chars, and 4 chars offset left, would be like:
// v~~~~~~~~~~v Here is the "width of a frame".
//
// v~~~~~~~~~~v Here is the "width of a frame".
//
// +--+----------+--+
// | Hello, wor |
// | ld. |
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type DupKVStream interface {

// LocalDupKVStream implements the interface of DupKVStream.
// It collects duplicate key-value pairs from a pebble.DB.
//
//goland:noinspection GoNameStartsWithPackageName
type LocalDupKVStream struct {
iter Iter
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ func (b noopBackend) CheckRequirements(context.Context, *backend.CheckCtx) error
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
// TablesFromMeta to succeed:
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// * Name
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// - Name
// - State (must be model.StatePublic)
// - Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
func (b noopBackend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return nil, nil
}
Expand Down
13 changes: 8 additions & 5 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ type stmtTask struct {
}

// WriteBatchRowsToDB write rows in batch mode, which will insert multiple rows like this:
// insert into t1 values (111), (222), (333), (444);
//
// insert into t1 values (111), (222), (333), (444);
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, r kv.Rows) error {
rows := r.(tidbRows)
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
Expand Down Expand Up @@ -500,10 +501,12 @@ func (be *tidbBackend) checkAndBuildStmt(rows tidbRows, tableName string, column
}

// WriteRowsToDB write rows in row-by-row mode, which will insert multiple rows like this:
// insert into t1 values (111);
// insert into t1 values (222);
// insert into t1 values (333);
// insert into t1 values (444);
//
// insert into t1 values (111);
// insert into t1 values (222);
// insert into t1 values (333);
// insert into t1 values (444);
//
// See more details in br#1366: https://github.com/pingcap/br/issues/1366
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, r kv.Rows) error {
rows := r.(tidbRows)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) {
}

// TODO: temporarily disable this test before we fix strict mode
//
//nolint:unused,deadcode
func testStrictMode(t *testing.T) {
s := createMysqlSuite(t)
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,12 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
}

// RunOnce is used by binary lightning and host when using lightning as a library.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. and glue could be nil to let lightning
// use a default glue later.
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a
// caller implemented glue.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. and glue could be nil to let lightning
// use a default glue later.
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a
// caller implemented glue.
//
// deprecated: use RunOnceWithOptions instead.
func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glue glue.Glue) error {
if err := taskCfg.Adjust(taskCtx); err != nil {
Expand Down Expand Up @@ -270,10 +271,10 @@ func (l *Lightning) RunServer() error {
}

// RunOnceWithOptions is used by binary lightning and host when using lightning as a library.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may
// be used:
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may
// be used:
// - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later.
// - WithDumpFileStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a
// storage by config
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStora
}

/*
Mydumper File Loader
Mydumper File Loader
*/
type MDLoader struct {
store storage.ExternalStorage
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/parser_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// }

/*
TODO : test with specified 'regionBlockSize' ...
TODO : test with specified 'regionBlockSize' ...
*/
func TestTableRegion(t *testing.T) {
cfg := newConfigWithSourceDir("./examples")
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,10 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
}

// checkCSVHeader try to check whether the csv header config is consistent with the source csv files by:
// 1. pick one table with two CSV files and a unique/primary key
// 2. read the first row of those two CSV files
// 3. checks if the content of those first rows are compatible with the table schema, and whether the
// two rows are identical, to determine if the first rows are a header rows.
// 1. pick one table with two CSV files and a unique/primary key
// 2. read the first row of those two CSV files
// 3. checks if the content of those first rows are compatible with the table schema, and whether the
// two rows are identical, to determine if the first rows are a header rows.
func (rc *Controller) checkCSVHeader(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta) error {
// if cfg set header = ture but source files actually contain not header, former SchemaCheck should
// return error in this situation, so we need do it again.
Expand Down
1 change: 0 additions & 1 deletion br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
return errors.Trace(err)
}

//
func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error {
var restoreMetaSQL string
var err error
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,9 @@ func TestScatterFinishInTime(t *testing.T) {
// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj)
// rewrite rules: aa -> xx, cc -> bb
// expected regions after split:
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
//
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func TestSplitAndScatter(t *testing.T) {
t.Run("BatchScatter", func(t *testing.T) {
client := initTestClient()
Expand Down Expand Up @@ -448,8 +449,9 @@ func initRewriteRules() *restore.RewriteRules {
}

// expected regions after split:
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
//
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func validateRegions(regions map[uint64]*restore.RegionInfo) bool {
keys := [...]string{"", "aay", "bba", "bbf", "bbh", "bbj", "cca", "xxe", "xxz", ""}
if len(regions) != len(keys)-1 {
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ func (ms *StreamMetadataSet) iterateDataFiles(f func(d *backuppb.DataFileInfo) (
}

// IterateFilesFullyBefore runs the function over all files contain data before the timestamp only.
// 0 before
// |------------------------------------------|
// |-file1---------------| <- File contains records in this TS range would be found.
// |-file2--------------| <- File contains any record out of this won't be found.
//
// 0 before
// |------------------------------------------|
// |-file1---------------| <- File contains records in this TS range would be found.
// |-file2--------------| <- File contains any record out of this won't be found.
//
// This function would call the `f` over file1 only.
func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *backuppb.DataFileInfo) (shouldBreak bool)) {
ms.iterateDataFiles(func(d *backuppb.DataFileInfo) (shouldBreak bool) {
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ var logCountSumRe = regexp.MustCompile(`tikv_stream_handle_kv_batch_sum ([0-9]+)

// MaybeQPS get a number like the QPS of last seconds for each store via the prometheus interface.
// TODO: this is a temporary solution(aha, like in a Hackthon),
// we MUST find a better way for providing this information.
//
// we MUST find a better way for providing this information.
func MaybeQPS(ctx context.Context, mgr *conn.Mgr) (float64, error) {
c := mgr.GetPDClient()
prefix := "http://"
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error {
return nil
}

// RunStreamCommand run all kinds of `stream task``
// RunStreamCommand run all kinds of `stream task
func RunStreamCommand(
ctx context.Context,
g glue.Glue,
Expand Down
1 change: 1 addition & 0 deletions br/tests/br_key_locked/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type Locker struct {
}

// generateLocks sends Prewrite requests to TiKV to generate locks, without committing and rolling back.
//
//nolint:gosec
func (c *Locker) generateLocks(pctx context.Context) error {
log.Info("genLock started")
Expand Down
9 changes: 5 additions & 4 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,11 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
// The handle range is split from PD regions now. Each worker deal with a region table key range one time.
// Each handle range by estimation, concurrent processing needs to perform after the handle range has been acquired.
// The operation flow is as follows:
// 1. Open numbers of defaultWorkers goroutines.
// 2. Split table key range from PD regions.
// 3. Send tasks to running workers by workers's task channel. Each task deals with a region key ranges.
// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done.
// 1. Open numbers of defaultWorkers goroutines.
// 2. Split table key range from PD regions.
// 3. Send tasks to running workers by workers's task channel. Each task deals with a region key ranges.
// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done.
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, indexInfo *model.IndexInfo, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
Expand Down
4 changes: 3 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,9 @@ func ResolveCharsetCollation(charsetOpts ...ast.CharsetOpt) (string, string, err
}

// OverwriteCollationWithBinaryFlag is used to handle the case like
// CREATE TABLE t (a VARCHAR(255) BINARY) CHARSET utf8 COLLATE utf8_general_ci;
//
// CREATE TABLE t (a VARCHAR(255) BINARY) CHARSET utf8 COLLATE utf8_general_ci;
//
// The 'BINARY' sets the column collation to *_bin according to the table charset.
func OverwriteCollationWithBinaryFlag(colDef *ast.ColumnDef, chs, coll string) (newChs string, newColl string) {
ignoreBinFlag := colDef.Tp.GetCharset() != "" && (colDef.Tp.GetCollate() != "" || containsColumnOption(colDef, ast.ColumnOptionCollate))
Expand Down
27 changes: 13 additions & 14 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
//// Copyright 2015 PingCAP, Inc.
////
//// Licensed under the Apache License, Version 2.0 (the "License");
//// you may not use this file except in compliance with the License.
//// You may obtain a copy of the License at
////
//// http://www.apache.org/licenses/LICENSE-2.0
////
//// Unless required by applicable law or agreed to in writing, software
//// distributed under the License is distributed on an "AS IS" BASIS,
//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//// See the License for the specific language governing permissions and
//// limitations under the License.
//
// // Copyright 2015 PingCAP, Inc.
// //
// // Licensed under the Apache License, Version 2.0 (the "License");
// // you may not use this file except in compliance with the License.
// // You may obtain a copy of the License at
// //
// // http://www.apache.org/licenses/LICENSE-2.0
// //
// // Unless required by applicable law or agreed to in writing, software
// // distributed under the License is distributed on an "AS IS" BASIS,
// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// // See the License for the specific language governing permissions and
// // limitations under the License.
package ddl_test

import (
Expand Down
3 changes: 1 addition & 2 deletions ddl/fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package ddl_test

import (
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
18 changes: 11 additions & 7 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,18 @@ func (rc *reorgCtx) clean() {
// 3: clean global index
//
// ddl goroutine >---------+
// ^ |
// | |
// | |
// | | <---(doneCh)--- f()
//
// ^ |
// | |
// | |
// | | <---(doneCh)--- f()
//
// HandleDDLQueue(...) | <---(regular timeout)
// | | <---(ctx done)
// | |
// | |
//
// | | <---(ctx done)
// | |
// | |
//
// A more ddl round <-----+
//
// How can we cancel reorg job?
Expand Down
11 changes: 6 additions & 5 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,9 @@ func (builder *RequestBuilder) SetConcurrency(concurrency int) *RequestBuilder {
}

// SetTiDBServerID sets "TiDBServerID" for "kv.Request"
// ServerID is a unique id of TiDB instance among the cluster.
// See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md
//
// ServerID is a unique id of TiDB instance among the cluster.
// See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md
func (builder *RequestBuilder) SetTiDBServerID(serverID uint64) *RequestBuilder {
builder.Request.TiDBServerID = serverID
return builder
Expand Down Expand Up @@ -450,9 +451,9 @@ func encodeHandleKey(ran *ranger.Range) ([]byte, []byte) {
// interpreted as an int64 variable.
//
// This function does the following:
// 1. split ranges into two groups as described above.
// 2. if there's a range that straddles the int64 boundary, split it into two ranges, which results in one smaller and
// one greater than MaxInt64.
// 1. split ranges into two groups as described above.
// 2. if there's a range that straddles the int64 boundary, split it into two ranges, which results in one smaller and
// one greater than MaxInt64.
//
// if `KeepOrder` is false, we merge the two groups of ranges into one group, to save an rpc call later
// if `desc` is false, return signed ranges first, vice versa.
Expand Down
5 changes: 3 additions & 2 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,9 @@ func (f FileFormat) String() string {
}

// Extension returns the extension for specific format.
// text -> "sql"
// csv -> "csv"
//
// text -> "sql"
// csv -> "csv"
func (f FileFormat) Extension() string {
switch f {
case FileFormatSQLText:
Expand Down
3 changes: 2 additions & 1 deletion executor/aggfuncs/func_ntile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const (

// ntile divides the partition into n ranked groups and returns the group number a row belongs to.
// e.g. We have 11 rows and n = 3. They will be divided into 3 groups.
// First 4 rows belongs to group 1. Following 4 rows belongs to group 2. The last 3 rows belongs to group 3.
//
// First 4 rows belongs to group 1. Following 4 rows belongs to group 2. The last 3 rows belongs to group 3.
type ntile struct {
n uint64
baseAggFunc
Expand Down
Loading