Skip to content

Commit

Permalink
Merge branch 'add-pd-metric' of github.com:jyz0309/tidb into add-pd-m…
Browse files Browse the repository at this point in the history
…etric
  • Loading branch information
jyz0309 committed Nov 30, 2021
2 parents fa928af + 4a35f10 commit 03b46e7
Show file tree
Hide file tree
Showing 176 changed files with 5,240 additions and 1,751 deletions.
115 changes: 63 additions & 52 deletions br/pkg/lightning/backend/importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (

"github.com/golang/mock/gomock"
"github.com/google/uuid"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
kvpb "github.com/pingcap/kvproto/pkg/import_kvpb"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/stretchr/testify/require"
)

type importerSuite struct {
Expand All @@ -43,20 +43,14 @@ type importerSuite struct {
kvPairs kv.Rows
}

var _ = Suite(&importerSuite{})

const testPDAddr = "pd-addr:2379"

// FIXME: Cannot use the real SetUpTest/TearDownTest to set up the mock
// otherwise the mock error will be ignored.

func (s *importerSuite) setUpTest(c *C) {
s.controller = gomock.NewController(c)
s.mockClient = mock.NewMockImportKVClient(s.controller)
s.mockWriter = mock.NewMockImportKV_WriteEngineClient(s.controller)
importer := NewMockImporter(s.mockClient, testPDAddr)

s.ctx = context.Background()
func createImportSuite(t *testing.T) *importerSuite {
controller := gomock.NewController(t)
mockClient := mock.NewMockImportKVClient(controller)
mockWriter := mock.NewMockImportKV_WriteEngineClient(controller)
importer := NewMockImporter(mockClient, testPDAddr)
s := &importerSuite{controller: controller, mockClient: mockClient, mockWriter: mockWriter, ctx: context.Background()}
engineUUID := uuid.MustParse("7e3f3a3c-67ce-506d-af34-417ec138fbcb")
s.engineUUID = engineUUID[:]
s.kvPairs = kv.MakeRowsFromKvPairs([]common.KvPair{
Expand All @@ -76,15 +70,17 @@ func (s *importerSuite) setUpTest(c *C) {

var err error
s.engine, err = importer.OpenEngine(s.ctx, &backend.EngineConfig{}, "`db`.`table`", -1)
c.Assert(err, IsNil)
require.NoError(t, err)
return s
}

func (s *importerSuite) tearDownTest() {
s.controller.Finish()
}

func (s *importerSuite) TestWriteRows(c *C) {
s.setUpTest(c)
func TestWriteRows(t *testing.T) {
t.Parallel()
s := createImportSuite(t)
defer s.tearDownTest()

s.mockClient.EXPECT().WriteEngine(s.ctx).Return(s.mockWriter, nil)
Expand All @@ -99,10 +95,10 @@ func (s *importerSuite) TestWriteRows(c *C) {
batchSendCall := s.mockWriter.EXPECT().
Send(gomock.Any()).
DoAndReturn(func(x *kvpb.WriteEngineRequest) error {
c.Assert(x.GetBatch().GetMutations(), DeepEquals, []*kvpb.Mutation{
require.Equal(t, []*kvpb.Mutation{
{Op: kvpb.Mutation_Put, Key: []byte("k1"), Value: []byte("v1")},
{Op: kvpb.Mutation_Put, Key: []byte("k2"), Value: []byte("v2")},
})
}, x.GetBatch().GetMutations())
return nil
}).
After(headSendCall)
Expand All @@ -112,24 +108,25 @@ func (s *importerSuite) TestWriteRows(c *C) {
After(batchSendCall)

writer, err := s.engine.LocalWriter(s.ctx, nil)
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(s.ctx, nil, s.kvPairs)
c.Assert(err, IsNil)
require.NoError(t, err)
st, err := writer.Close(s.ctx)
c.Assert(err, IsNil)
c.Assert(st, IsNil)
require.NoError(t, err)
require.Nil(t, st)
}

func (s *importerSuite) TestWriteHeadSendFailed(c *C) {
s.setUpTest(c)
func TestWriteHeadSendFailed(t *testing.T) {
t.Parallel()
s := createImportSuite(t)
defer s.tearDownTest()

s.mockClient.EXPECT().WriteEngine(s.ctx).Return(s.mockWriter, nil)

headSendCall := s.mockWriter.EXPECT().
Send(gomock.Any()).
DoAndReturn(func(x *kvpb.WriteEngineRequest) error {
c.Assert(x.GetHead(), NotNil)
require.NotNil(t, x.GetHead())
return errors.Annotate(context.Canceled, "fake unrecoverable write head error")
})
s.mockWriter.EXPECT().
Expand All @@ -138,27 +135,29 @@ func (s *importerSuite) TestWriteHeadSendFailed(c *C) {
After(headSendCall)

writer, err := s.engine.LocalWriter(s.ctx, nil)
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(s.ctx, nil, s.kvPairs)
c.Assert(err, ErrorMatches, "fake unrecoverable write head error.*")
require.Error(t, err)
require.Regexp(t, "^fake unrecoverable write head error", err.Error())
}

func (s *importerSuite) TestWriteBatchSendFailed(c *C) {
s.setUpTest(c)
func TestWriteBatchSendFailed(t *testing.T) {
t.Parallel()
s := createImportSuite(t)
defer s.tearDownTest()

s.mockClient.EXPECT().WriteEngine(s.ctx).Return(s.mockWriter, nil)

headSendCall := s.mockWriter.EXPECT().
Send(gomock.Any()).
DoAndReturn(func(x *kvpb.WriteEngineRequest) error {
c.Assert(x.GetHead(), NotNil)
require.NotNil(t, x.GetHead())
return nil
})
batchSendCall := s.mockWriter.EXPECT().
Send(gomock.Any()).
DoAndReturn(func(x *kvpb.WriteEngineRequest) error {
c.Assert(x.GetBatch(), NotNil)
require.NotNil(t, x.GetBatch())
return errors.Annotate(context.Canceled, "fake unrecoverable write batch error")
}).
After(headSendCall)
Expand All @@ -168,27 +167,29 @@ func (s *importerSuite) TestWriteBatchSendFailed(c *C) {
After(batchSendCall)

writer, err := s.engine.LocalWriter(s.ctx, nil)
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(s.ctx, nil, s.kvPairs)
c.Assert(err, ErrorMatches, "fake unrecoverable write batch error.*")
require.Error(t, err)
require.Regexp(t, "^fake unrecoverable write batch error", err.Error())
}

func (s *importerSuite) TestWriteCloseFailed(c *C) {
s.setUpTest(c)
func TestWriteCloseFailed(t *testing.T) {
t.Parallel()
s := createImportSuite(t)
defer s.tearDownTest()

s.mockClient.EXPECT().WriteEngine(s.ctx).Return(s.mockWriter, nil)

headSendCall := s.mockWriter.EXPECT().
Send(gomock.Any()).
DoAndReturn(func(x *kvpb.WriteEngineRequest) error {
c.Assert(x.GetHead(), NotNil)
require.NotNil(t, x.GetHead())
return nil
})
batchSendCall := s.mockWriter.EXPECT().
Send(gomock.Any()).
DoAndReturn(func(x *kvpb.WriteEngineRequest) error {
c.Assert(x.GetBatch(), NotNil)
require.NotNil(t, x.GetBatch())
return nil
}).
After(headSendCall)
Expand All @@ -198,13 +199,15 @@ func (s *importerSuite) TestWriteCloseFailed(c *C) {
After(batchSendCall)

writer, err := s.engine.LocalWriter(s.ctx, nil)
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(s.ctx, nil, s.kvPairs)
c.Assert(err, ErrorMatches, "fake unrecoverable close stream error.*")
require.Error(t, err)
require.Regexp(t, "^fake unrecoverable close stream error", err.Error())
}

func (s *importerSuite) TestCloseImportCleanupEngine(c *C) {
s.setUpTest(c)
func TestCloseImportCleanupEngine(t *testing.T) {
t.Parallel()
s := createImportSuite(t)
defer s.tearDownTest()

s.mockClient.EXPECT().
Expand All @@ -218,11 +221,11 @@ func (s *importerSuite) TestCloseImportCleanupEngine(c *C) {
Return(nil, nil)

engine, err := s.engine.Close(s.ctx, nil)
c.Assert(err, IsNil)
require.NoError(t, err)
err = engine.Import(s.ctx, 1)
c.Assert(err, IsNil)
require.NoError(t, err)
err = engine.Cleanup(s.ctx)
c.Assert(err, IsNil)
require.NoError(t, err)
}

func BenchmarkMutationAlloc(b *testing.B) {
Expand Down Expand Up @@ -261,33 +264,41 @@ func BenchmarkMutationPool(b *testing.B) {
_ = g
}

func (s *importerSuite) TestCheckTiDBVersion(c *C) {
func TestCheckTiDBVersion(t *testing.T) {
var version string
ctx := context.Background()

mockServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
c.Assert(req.URL.Path, Equals, "/status")
require.Equal(t, "/status", req.URL.Path)
w.WriteHeader(http.StatusOK)
err := json.NewEncoder(w).Encode(map[string]interface{}{
"version": version,
})
c.Assert(err, IsNil)
require.NoError(t, err)
}))

tls := common.NewTLSFromMockServer(mockServer)

version = "5.7.25-TiDB-v4.0.0"
c.Assert(checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion), IsNil)
require.Nil(t, checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion))

version = "5.7.25-TiDB-v9999.0.0"
c.Assert(checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion), ErrorMatches, "TiDB version too new.*")
err := checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion)
require.Error(t, err)
require.Regexp(t, "^TiDB version too new", err.Error())

version = "5.7.25-TiDB-v6.0.0"
c.Assert(checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion), ErrorMatches, "TiDB version too new.*")
err = checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion)
require.Error(t, err)
require.Regexp(t, "^TiDB version too new", err.Error())

version = "5.7.25-TiDB-v6.0.0-beta"
c.Assert(checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion), ErrorMatches, "TiDB version too new.*")
err = checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion)
require.Error(t, err)
require.Regexp(t, "^TiDB version too new", err.Error())

version = "5.7.25-TiDB-v1.0.0"
c.Assert(checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion), ErrorMatches, "TiDB version too old.*")
err = checkTiDBVersionByTLS(ctx, tls, requiredMinTiDBVersion, requiredMaxTiDBVersion)
require.Error(t, err)
require.Regexp(t, "^TiDB version too old", err.Error())
}
30 changes: 22 additions & 8 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type tidbEncoder struct {
// the index of table columns for each data field.
// index == len(table.columns) means this field is `_tidb_rowid`
columnIdx []int
// the max index used in this chunk, due to the ignore-columns config, we can't
// directly check the total column count, so we fall back to only check that
// the there are enough columns.
columnCnt int
}

Expand Down Expand Up @@ -284,22 +287,27 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnCount := 0
columnMaxIdx := -1
columnIdx := make([]int, len(columnPermutation))
for i := 0; i < len(columnPermutation); i++ {
columnIdx[i] = -1
}
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
columnCount++
if idx > columnMaxIdx {
columnMaxIdx = idx
}
}
}
enc.columnIdx = columnIdx
enc.columnCnt = columnCount
enc.columnCnt = columnMaxIdx + 1
}

// TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently.
// See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the
// column permutation with default, thus enc.columnCnt > len(row).
if len(row) > enc.columnCnt {
if len(row) < enc.columnCnt {
logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
zap.Array("data", kv.RowArrayMarshaler(row)))
return emptyTiDBRow, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row))
Expand All @@ -308,8 +316,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
cnt := 0
for i, field := range row {
if i != 0 {
if enc.columnIdx[i] < 0 {
continue
}
if cnt > 0 {
encoded.WriteByte(',')
}
datum := field
Expand All @@ -321,6 +333,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
)
return nil, err
}
cnt++
}
encoded.WriteByte(')')
return tidbRow{
Expand Down Expand Up @@ -569,7 +582,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
serverInfo := version.ParseServerInfo(versionStr)

rows, e := tx.Query(`
SELECT table_name, column_name, column_type, extra
SELECT table_name, column_name, column_type, generation_expression, extra
FROM information_schema.columns
WHERE table_schema = ?
ORDER BY table_name, ordinal_position;
Expand All @@ -585,8 +598,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
curTable *model.TableInfo
)
for rows.Next() {
var tableName, columnName, columnType, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
return e
}
if tableName != curTableName {
Expand Down Expand Up @@ -615,6 +628,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
FieldType: types.FieldType{
Flag: flag,
},
GeneratedExprString: generationExpr,
})
curColOffset++
}
Expand Down
Loading

0 comments on commit 03b46e7

Please sign in to comment.