Skip to content

Commit

Permalink
Merge branch 'master' into placement_pd_interface
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Nov 15, 2021
2 parents a788555 + 64681dd commit f1dd592
Show file tree
Hide file tree
Showing 82 changed files with 1,723 additions and 1,037 deletions.
5 changes: 2 additions & 3 deletions bindinfo/capture_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,8 @@ func TestCapturedBindingCharset(t *testing.T) {
require.Len(t, rows, 1)
require.Equal(t, "update `test` . `t` set `name` = ? where `name` <= ?", rows[0][0])
require.Equal(t, "UPDATE /*+ use_index(@`upd_1` `test`.`t` `idx`)*/ `test`.`t` SET `name`='hello' WHERE `name` <= 'abc'", rows[0][1])
// Charset and Collation are empty now, they are not used currently.
require.Equal(t, "", rows[0][6])
require.Equal(t, "", rows[0][7])
require.Equal(t, "utf8mb4", rows[0][6])
require.Equal(t, "utf8mb4_bin", rows[0][7])
}

func TestConcurrentCapture(t *testing.T) {
Expand Down
5 changes: 1 addition & 4 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,10 @@ func (s *mysqlSuite) TestWriteRowsErrorDowngrading(c *C) {
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)").
WillReturnResult(driver.ResultNoRows)

ctx := context.Background()
logger := log.L()
Expand Down
52 changes: 52 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,48 @@ func (d *Duration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d.Duration)), nil
}

// Charset defines character set
type Charset int

const (
Binary Charset = iota
UTF8MB4
GB18030
GBK
)

// String return the string value of charset
func (c Charset) String() string {
switch c {
case Binary:
return "binary"
case UTF8MB4:
return "utf8mb4"
case GB18030:
return "gb18030"
case GBK:
return "gbk"
default:
return "unknown_charset"
}
}

// ParseCharset parser character set for string
func ParseCharset(dataCharacterSet string) (Charset, error) {
switch strings.ToLower(dataCharacterSet) {
case "", "binary":
return Binary, nil
case "utf8mb4":
return UTF8MB4, nil
case "gb18030":
return GB18030, nil
case "gbk":
return GBK, nil
default:
return Binary, errors.Errorf("found unsupported data-character-set: %s", dataCharacterSet)
}
}

func NewConfig() *Config {
return &Config{
App: Lightning{
Expand Down Expand Up @@ -786,6 +828,16 @@ func (cfg *Config) Adjust(ctx context.Context) error {
if len(cfg.Mydumper.DataCharacterSet) == 0 {
cfg.Mydumper.DataCharacterSet = defaultCSVDataCharacterSet
}
charset, err1 := ParseCharset(cfg.Mydumper.DataCharacterSet)
if err1 != nil {
return err1
}
if charset == GBK || charset == GB18030 {
log.L().Warn(
"incompatible strings may be encountered during the transcoding process and will be replaced, please be aware of the risk of not being able to retain the original information",
zap.String("source-character-set", charset.String()),
zap.ByteString("invalid-char-replacement", []byte(cfg.Mydumper.DataInvalidCharReplace)))
}

if cfg.TikvImporter.Backend == "" {
return errors.New("tikv-importer.backend must not be empty!")
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,3 +854,23 @@ func (s *configTestSuite) TestCheckpointKeepStrategy(c *C) {
c.Assert(res, DeepEquals, []byte(value))
}
}

func (s configTestSuite) TestLoadCharsetFromConfig(c *C) {
cases := map[string]config.Charset{
"binary": config.Binary,
"BINARY": config.Binary,
"GBK": config.GBK,
"gbk": config.GBK,
"Gbk": config.GBK,
"gB18030": config.GB18030,
"GB18030": config.GB18030,
}
for k, v := range cases {
charset, err := config.ParseCharset(k)
c.Assert(err, IsNil)
c.Assert(charset, Equals, v)
}

_, err := config.ParseCharset("Unknown")
c.Assert(err, ErrorMatches, "found unsupported data-character-set: Unknown")
}
46 changes: 34 additions & 12 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package errormanager

import (
"context"
"database/sql"
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -94,6 +109,7 @@ type ErrorManager struct {
taskID int64
schemaEscaped string
remainingError config.MaxError
dupResolution config.DuplicateResolutionAlgorithm
}

// New creates a new error manager.
Expand All @@ -111,7 +127,7 @@ func New(db *sql.DB, cfg *config.Config) *ErrorManager {

// Init creates the schemas and tables to store the task information.
func (em *ErrorManager) Init(ctx context.Context) error {
if em.db == nil {
if em.db == nil || (em.remainingError.Type.Load() == 0 && em.dupResolution == config.DupeResAlgNone) {
return nil
}

Expand All @@ -120,15 +136,21 @@ func (em *ErrorManager) Init(ctx context.Context) error {
Logger: log.L(),
}

sqls := [][2]string{
{"create task info schema", createSchema},
{"create syntax error table", createSyntaxErrorTable},
{"create type error table", createTypeErrorTable},
{"create conflict error table", createConflictErrorTable},
sqls := make([][2]string, 0)
sqls = append(sqls, [2]string{"create task info schema", createSchema})
if em.remainingError.Syntax.Load() > 0 {
sqls = append(sqls, [2]string{"create syntax error table", createSyntaxErrorTable})
}
if em.remainingError.Type.Load() > 0 {
sqls = append(sqls, [2]string{"create type error table", createTypeErrorTable})
}
if em.dupResolution != config.DupeResAlgNone && em.remainingError.Conflict.Load() > 0 {
sqls = append(sqls, [2]string{"create conflict error table", createConflictErrorTable})
}

for _, sql := range sqls {
err := exec.Exec(ctx, sql[0], fmt.Sprintf(sql[1], em.schemaEscaped))
// trim spaces for unit test pattern matching
err := exec.Exec(ctx, sql[0], strings.TrimSpace(fmt.Sprintf(sql[1], em.schemaEscaped)))
if err != nil {
return err
}
Expand All @@ -148,6 +170,11 @@ func (em *ErrorManager) RecordTypeError(
rowText string,
encodeErr error,
) error {
// elide the encode error if needed.
if em.remainingError.Type.Dec() < 0 {
return encodeErr
}

if em.db != nil {
errMsg := encodeErr.Error()
logger = logger.With(
Expand All @@ -173,11 +200,6 @@ func (em *ErrorManager) RecordTypeError(
return multierr.Append(encodeErr, err)
}
}

// elide the encode error if needed.
if em.remainingError.Type.Dec() < 0 {
return encodeErr
}
return nil
}

Expand Down
83 changes: 83 additions & 0 deletions br/pkg/lightning/errormanager/errormanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package errormanager

import (
"context"
"math"
"testing"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"go.uber.org/atomic"

"github.com/pingcap/tidb/br/pkg/lightning/config"
)

var _ = Suite(errorManagerSuite{})

func TestErrorManager(t *testing.T) {
TestingT(t)
}

type errorManagerSuite struct{}

func (e errorManagerSuite) TestInit(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)

em := &ErrorManager{
db: db,
schemaEscaped: "`lightning_errors`",
remainingError: config.MaxError{
Charset: *atomic.NewInt64(math.MaxInt64),
Conflict: *atomic.NewInt64(math.MaxInt64),
},
}

ctx := context.Background()
err = em.Init(ctx)
c.Assert(err, IsNil)

em.dupResolution = config.DupeResAlgRecord
mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`;").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1.*").
WillReturnResult(sqlmock.NewResult(2, 1))
err = em.Init(ctx)
c.Assert(err, IsNil)

em.dupResolution = config.DupeResAlgNone
em.remainingError.Type.Store(1)
mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`;").
WillReturnResult(sqlmock.NewResult(3, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v1.*").
WillReturnResult(sqlmock.NewResult(4, 1))
err = em.Init(ctx)
c.Assert(err, IsNil)

em.dupResolution = config.DupeResAlgRecord
em.remainingError.Type.Store(1)
mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`.*").
WillReturnResult(sqlmock.NewResult(5, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v1.*").
WillReturnResult(sqlmock.NewResult(6, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1.*").
WillReturnResult(sqlmock.NewResult(7, 1))
err = em.Init(ctx)
c.Assert(err, IsNil)

c.Assert(mock.ExpectationsWereMet(), IsNil)
}
Loading

0 comments on commit f1dd592

Please sign in to comment.