Skip to content

Commit

Permalink
Merge branch 'master' into ox_coll_substitute
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Jan 31, 2023
2 parents 369c980 + 05edfd4 commit 7b757b6
Show file tree
Hide file tree
Showing 48 changed files with 1,019 additions and 99 deletions.
11 changes: 11 additions & 0 deletions bindinfo/bind_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ type BindRecord struct {
Bindings []Binding
}

// Copy get the copy of bindRecord
func (br *BindRecord) Copy() *BindRecord {
nbr := &BindRecord{
OriginalSQL: br.OriginalSQL,
Db: br.Db,
}
nbr.Bindings = make([]Binding, len(br.Bindings))
copy(nbr.Bindings, br.Bindings)
return nbr
}

// HasEnabledBinding checks if there are any enabled bindings in bind record.
func (br *BindRecord) HasEnabledBinding() bool {
for _, binding := range br.Bindings {
Expand Down
60 changes: 50 additions & 10 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,32 +343,64 @@ type MaxError struct {
// In TiDB backend, this also includes all possible SQL errors raised from INSERT,
// such as unique key conflict when `on-duplicate` is set to `error`.
// When tolerated, the row causing the error will be skipped, and adds 1 to the counter.
// The default value is zero, which means that such errors are not tolerated.
Type atomic.Int64 `toml:"type" json:"type"`

// Conflict is the maximum number of unique key conflicts in local backend accepted.
// When tolerated, every pair of conflict adds 1 to the counter.
// Those pairs will NOT be deleted from the target. Conflict resolution is performed separately.
// TODO Currently this is hard-coded to infinity.
Conflict atomic.Int64 `toml:"conflict" json:"-"`
// The default value is max int64, which means conflict errors will be recorded as much as possible.
// Sometime the actual number of conflict record logged will be greater than the value configured here,
// because conflict error data are recorded batch by batch.
// If the limit is reached in a single batch, the entire batch of records will be persisted before an error is reported.
Conflict atomic.Int64 `toml:"conflict" json:"conflict"`
}

func (cfg *MaxError) UnmarshalTOML(v interface{}) error {
defaultValMap := map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": math.MaxInt64,
}
// set default value first
cfg.Syntax.Store(defaultValMap["syntax"])
cfg.Charset.Store(defaultValMap["charset"])
cfg.Type.Store(defaultValMap["type"])
cfg.Conflict.Store(defaultValMap["conflict"])
switch val := v.(type) {
case int64:
// ignore val that is smaller than 0
if val < 0 {
val = 0
if val >= 0 {
// only set type error
cfg.Type.Store(val)
}
cfg.Syntax.Store(0)
cfg.Charset.Store(math.MaxInt64)
cfg.Type.Store(val)
cfg.Conflict.Store(math.MaxInt64)
return nil
case map[string]interface{}:
// TODO support stuff like `max-error = { charset = 1000, type = 1000 }` if proved useful.
// support stuff like `max-error = { charset = 1000, type = 1000 }`.
getVal := func(k string, v interface{}) int64 {
defaultVal, ok := defaultValMap[k]
if !ok {
return 0
}
iVal, ok := v.(int64)
if !ok || iVal < 0 {
return defaultVal
}
return iVal
}
for k, v := range val {
switch k {
case "type":
cfg.Type.Store(getVal(k, v))
case "conflict":
cfg.Conflict.Store(getVal(k, v))
}
}
return nil
default:
return errors.Errorf("invalid max-error '%v', should be an integer or a map of string:int64", v)
}
return errors.Errorf("invalid max-error '%v', should be an integer", v)
}

// DuplicateResolutionAlgorithm is the config type of how to resolve duplicates.
Expand Down Expand Up @@ -805,8 +837,16 @@ func (cfg *Config) LoadFromTOML(data []byte) error {
unusedGlobalKeyStrs[key.String()] = struct{}{}
}

iterateUnusedKeys:
for _, key := range unusedConfigKeys {
keyStr := key.String()
switch keyStr {
// these keys are not counted as decoded by toml decoder, but actually they are decoded,
// because the corresponding unmarshal logic handles these key's decoding in a custom way
case "lightning.max-error.type",
"lightning.max-error.conflict":
continue iterateUnusedKeys
}
if _, found := unusedGlobalKeyStrs[keyStr]; found {
bothUnused = append(bothUnused, keyStr)
} else {
Expand Down
121 changes: 121 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"flag"
"fmt"
"math"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -561,6 +562,126 @@ func TestDurationUnmarshal(t *testing.T) {
require.Regexp(t, "time: unknown unit .?x.? in duration .?13x20s.?", err.Error())
}

func TestMaxErrorUnmarshal(t *testing.T) {
type testCase struct {
TOMLStr string
ExpectedValues map[string]int64
ExpectErrStr string
CaseName string
}
for _, tc := range []*testCase{
{
TOMLStr: `max-error = 123`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
"conflict": math.MaxInt64,
},
CaseName: "Normal_Int",
},
{
TOMLStr: `max-error = -123`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": math.MaxInt64,
},
CaseName: "Abnormal_Negative_Int",
},
{
TOMLStr: `max-error = "abcde"`,
ExpectErrStr: "invalid max-error 'abcde', should be an integer or a map of string:int64",
CaseName: "Abnormal_String",
},
{
TOMLStr: `[max-error]
syntax = 1
charset = 2
type = 3
conflict = 4
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 3,
"conflict": 4,
},
CaseName: "Normal_Map_All_Set",
},
{
TOMLStr: `[max-error]
conflict = 1000
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set",
},
{
TOMLStr: `max-error = { conflict = 1000, type = 123 }`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
"conflict": 1000,
},
CaseName: "Normal_OneLineMap_Partial_Set",
},
{
TOMLStr: `[max-error]
conflict = 1000
not_exist = 123
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set_Invalid_Key",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = -123
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set_Invalid_Value",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = abc
`,
ExpectErrStr: `toml: line 3 (last key "max-error.type"): expected value but found "abc" instead`,
CaseName: "Normal_Map_Partial_Set_Invalid_ValueType",
},
} {
targetLightningCfg := new(config.Lightning)
err := toml.Unmarshal([]byte(tc.TOMLStr), targetLightningCfg)
if len(tc.ExpectErrStr) > 0 {
require.Errorf(t, err, "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectErrStr, err.Error(), "test case: %s", tc.CaseName)
} else {
require.NoErrorf(t, err, "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["syntax"], targetLightningCfg.MaxError.Syntax.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["charset"], targetLightningCfg.MaxError.Charset.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["type"], targetLightningCfg.MaxError.Type.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["conflict"], targetLightningCfg.MaxError.Conflict.Load(), "test case: %s", tc.CaseName)
}
}
}

func TestDurationMarshalJSON(t *testing.T) {
duration := config.Duration{}
err := duration.UnmarshalText([]byte("13m20s"))
Expand Down
31 changes: 22 additions & 9 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func (em *ErrorManager) RecordTypeError(
if em.remainingError.Type.Dec() < 0 {
threshold := em.configError.Type.Load()
if threshold > 0 {
encodeErr = errors.Annotatef(encodeErr, "meet errors exceed the max-error.type threshold '%d'",
encodeErr = errors.Annotatef(encodeErr,
"The number of type errors exceeds the threshold configured by `max-error.type`: '%d'",
em.configError.Type.Load())
}
return encodeErr
Expand Down Expand Up @@ -241,25 +242,28 @@ func (em *ErrorManager) RecordDataConflictError(
tableName string,
conflictInfos []DataConflictInfo,
) error {
var gerr error
if len(conflictInfos) == 0 {
return nil
}

if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 {
threshold := em.configError.Conflict.Load()
return errors.Errorf(" meet errors exceed the max-error.conflict threshold '%d'", threshold)
// Still need to record this batch of conflict records, and then return this error at last.
// Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded
gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold)
}

if em.db == nil {
return nil
return gerr
}

exec := common.SQLWithRetry{
DB: em.db,
Logger: logger,
HideQueryLog: redact.NeedRedact(),
}
return exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error {
if err := exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error {
sb := &strings.Builder{}
fmt.Fprintf(sb, insertIntoConflictErrorData, em.schemaEscaped)
var sqlArgs []interface{}
Expand All @@ -279,7 +283,10 @@ func (em *ErrorManager) RecordDataConflictError(
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
return err
})
}); err != nil {
gerr = err
}
return gerr
}

func (em *ErrorManager) RecordIndexConflictError(
Expand All @@ -290,25 +297,28 @@ func (em *ErrorManager) RecordIndexConflictError(
conflictInfos []DataConflictInfo,
rawHandles, rawRows [][]byte,
) error {
var gerr error
if len(conflictInfos) == 0 {
return nil
}

if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 {
threshold := em.configError.Conflict.Load()
return errors.Errorf(" meet errors exceed the max-error.conflict threshold %d", threshold)
// Still need to record this batch of conflict records, and then return this error at last.
// Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded
gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold)
}

if em.db == nil {
return nil
return gerr
}

exec := common.SQLWithRetry{
DB: em.db,
Logger: logger,
HideQueryLog: redact.NeedRedact(),
}
return exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error {
if err := exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error {
sb := &strings.Builder{}
fmt.Fprintf(sb, insertIntoConflictErrorIndex, em.schemaEscaped)
var sqlArgs []interface{}
Expand All @@ -331,7 +341,10 @@ func (em *ErrorManager) RecordIndexConflictError(
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
return err
})
}); err != nil {
gerr = err
}
return gerr
}

// ResolveAllConflictKeys query all conflicting rows (handle and their
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/utils/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func makeJSONSchema(schema *backuppb.Schema) (*jsonSchema, error) {

func fromJSONSchema(jSchema *jsonSchema) (*backuppb.Schema, error) {
schema := jSchema.Schema
if schema == nil {
schema = &backuppb.Schema{}
}

var err error
schema.Db, err = json.Marshal(jSchema.DB)
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/utils/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,44 @@ var testMetaJSONs = [][]byte{
"is_raw_kv": true,
"br_version": "BR\nRelease Version: v5.0.0-master\nGit Commit Hash: c0d60dae4998cf9ac40f02e5444731c15f0b2522\nGit Branch: HEAD\nGo Version: go1.13.4\nUTC Build Time: 2021-03-25 08:10:08\nRace Enabled: false"
}`),
[]byte(`{
"files": [
{
"sha256": "3ae857ef9b379d498ae913434f1d47c3e90a55f3a4cd9074950bfbd163d5e5fc",
"start_key": "7480000000000000115f720000000000000000",
"end_key": "7480000000000000115f72ffffffffffffffff00",
"name": "1_20_9_36adb8cedcd7af34708edff520499e712e2cfdcb202f5707dc9305a031d55a98_1675066275424_write.sst",
"end_version": 439108573623222300,
"crc64xor": 16261462091570213000,
"total_kvs": 15,
"total_bytes": 1679,
"cf": "write",
"size": 2514,
"cipher_iv": "56MTbxA4CaNILpirKnBxUw=="
}
],
"schemas": [
{
"db": {
"charset": "utf8mb4",
"collate": "utf8mb4_bin",
"db_name": {
"L": "test",
"O": "test"
},
"id": 1,
"policy_ref_info": null,
"state": 5
}
}
],
"ddls": [],
"cluster_id": 7194351714070942000,
"cluster_version": "\"6.1.0\"\n",
"br_version": "BR\nRelease Version: v6.1.0\nGit Commit Hash: 1a89decdb192cbdce6a7b0020d71128bc964d30f\nGit Branch: heads/refs/tags/v6.1.0\nGo Version: go1.18.2\nUTC Build Time: 2022-06-05 05:09:12\nRace Enabled: false",
"end_version": 439108573623222300,
"new_collations_enabled": "True"
}`),
}

func TestEncodeAndDecode(t *testing.T) {
Expand Down
Loading

0 comments on commit 7b757b6

Please sign in to comment.