Skip to content

Commit

Permalink
*: support to replicate tables without explicit row id (#1005)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Oct 20, 2020
1 parent 0947943 commit fea8a70
Show file tree
Hide file tree
Showing 18 changed files with 466 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *changeFeed) addTable(tblInfo *model.TableInfo, targetTs model.Ts) {
return
}

if !tblInfo.IsEligible() {
if !tblInfo.IsEligible(c.info.Config.ForceReplicate) {
log.Warn("skip ineligible table", zap.Int64("tid", tblInfo.ID), zap.Stringer("table", tblInfo.TableName))
return
}
Expand Down
91 changes: 55 additions & 36 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type schemaSnapshot struct {
ineligibleTableID map[int64]struct{}

currentTs uint64

// if explicit is true, treat tables without explicit row id as eligible
explicitTables bool
}

// SingleSchemaSnapshot is a single schema snapshot independent of schema storage
Expand Down Expand Up @@ -93,11 +96,11 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo,
}

// NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64) (*SingleSchemaSnapshot, error) {
return newSchemaSnapshotFromMeta(meta, currentTs)
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) {
return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables)
}

func newEmptySchemaSnapshot() *schemaSnapshot {
func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
return &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64),
schemaNameToID: make(map[string]int64),
Expand All @@ -108,11 +111,13 @@ func newEmptySchemaSnapshot() *schemaSnapshot {

truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),

explicitTables: explicitTables,
}
}

func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot()
func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(explicitTables)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
Expand All @@ -132,7 +137,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64) (*schemaSnap
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
isEligible := tableInfo.IsEligible()
isEligible := tableInfo.IsEligible(explicitTables)
if !isEligible {
snap.ineligibleTableID[tableInfo.ID] = struct{}{}
}
Expand Down Expand Up @@ -196,39 +201,51 @@ func (s *schemaSnapshot) PrintStatus(logger func(msg string, fields ...zap.Field

// Clone clones Storage
func (s *schemaSnapshot) Clone() *schemaSnapshot {
n := &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64, len(s.tableNameToID)),
schemaNameToID: make(map[string]int64, len(s.schemaNameToID)),

schemas: make(map[int64]*timodel.DBInfo, len(s.schemas)),
tables: make(map[int64]*model.TableInfo, len(s.tables)),
partitionTable: make(map[int64]*model.TableInfo, len(s.partitionTable)),
clone := *s

truncateTableID: make(map[int64]struct{}, len(s.truncateTableID)),
ineligibleTableID: make(map[int64]struct{}, len(s.ineligibleTableID)),
}
tableNameToID := make(map[model.TableName]int64, len(s.tableNameToID))
for k, v := range s.tableNameToID {
n.tableNameToID[k] = v
tableNameToID[k] = v
}
clone.tableNameToID = tableNameToID

schemaNameToID := make(map[string]int64, len(s.schemaNameToID))
for k, v := range s.schemaNameToID {
n.schemaNameToID[k] = v
schemaNameToID[k] = v
}
clone.schemaNameToID = schemaNameToID

schemas := make(map[int64]*timodel.DBInfo, len(s.schemas))
for k, v := range s.schemas {
n.schemas[k] = v.Clone()
schemas[k] = v.Clone()
}
clone.schemas = schemas

tables := make(map[int64]*model.TableInfo, len(s.tables))
for k, v := range s.tables {
n.tables[k] = v.Clone()
tables[k] = v.Clone()
}
clone.tables = tables

partitionTable := make(map[int64]*model.TableInfo, len(s.partitionTable))
for k, v := range s.partitionTable {
n.partitionTable[k] = v.Clone()
partitionTable[k] = v.Clone()
}
clone.partitionTable = partitionTable

truncateTableID := make(map[int64]struct{}, len(s.truncateTableID))
for k, v := range s.truncateTableID {
n.truncateTableID[k] = v
truncateTableID[k] = v
}
clone.truncateTableID = truncateTableID

ineligibleTableID := make(map[int64]struct{}, len(s.ineligibleTableID))
for k, v := range s.ineligibleTableID {
n.ineligibleTableID[k] = v
ineligibleTableID[k] = v
}
return n
clone.ineligibleTableID = ineligibleTableID

return &clone
}

// GetTableNameByID looks up a TableName with the given table id
Expand Down Expand Up @@ -464,14 +481,14 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error {
schema.Tables = append(schema.Tables, table.TableInfo)

s.tables[table.ID] = table
if !table.IsEligible() {
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible() {
if !table.IsEligible(s.explicitTables) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand All @@ -489,14 +506,14 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error {
return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID)
}
s.tables[table.ID] = table
if !table.IsEligible() {
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible() {
if !table.IsEligible(s.explicitTables) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand Down Expand Up @@ -610,25 +627,27 @@ type SchemaStorage struct {
gcTs uint64
resolvedTs uint64

filter *filter.Filter
filter *filter.Filter
explicitTables bool
}

// NewSchemaStorage creates a new schema storage
func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter) (*SchemaStorage, error) {
func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter, forceReplicate bool) (*SchemaStorage, error) {
var snap *schemaSnapshot
var err error
if meta == nil {
snap = newEmptySchemaSnapshot()
snap = newEmptySchemaSnapshot(forceReplicate)
} else {
snap, err = newSchemaSnapshotFromMeta(meta, startTs)
snap, err = newSchemaSnapshotFromMeta(meta, startTs, forceReplicate)
}
if err != nil {
return nil, errors.Trace(err)
}
schema := &SchemaStorage{
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
explicitTables: forceReplicate,
}
return schema, nil
}
Expand Down Expand Up @@ -702,7 +721,7 @@ func (s *SchemaStorage) HandleDDLJob(job *timodel.Job) error {
}
snap = lastSnap.Clone()
} else {
snap = newEmptySchemaSnapshot()
snap = newEmptySchemaSnapshot(s.explicitTables)
}
if err := snap.handleDDL(job); err != nil {
return errors.Trace(err)
Expand Down
88 changes: 83 additions & 5 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (t *schemaSuite) TestSchema(c *C) {
Query: "create database test",
}
// reconstruct the local schema
snap := newEmptySchemaSnapshot()
snap := newEmptySchemaSnapshot(false)
err := snap.handleDDL(job)
c.Assert(err, IsNil)
_, exist := snap.SchemaByID(job.SchemaID)
Expand Down Expand Up @@ -195,7 +195,7 @@ func (*schemaSuite) TestTable(c *C) {
jobs = append(jobs, job)

// reconstruct the local schema
snap := newEmptySchemaSnapshot()
snap := newEmptySchemaSnapshot(false)
for _, job := range jobs {
err := snap.handleDDL(job)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -277,7 +277,7 @@ func (*schemaSuite) TestTable(c *C) {

func (t *schemaSuite) TestHandleDDL(c *C) {

snap := newEmptySchemaSnapshot()
snap := newEmptySchemaSnapshot(false)
dbName := timodel.NewCIStr("Test")
colName := timodel.NewCIStr("A")
tbName := timodel.NewCIStr("T")
Expand Down Expand Up @@ -527,7 +527,7 @@ func (t *schemaSuite) TestMultiVersionStorage(c *C) {
}

jobs = append(jobs, job)
storage, err := NewSchemaStorage(nil, 0, nil)
storage, err := NewSchemaStorage(nil, 0, nil, false)
c.Assert(err, IsNil)
for _, job := range jobs {
err := storage.HandleDDLJob(job)
Expand Down Expand Up @@ -665,7 +665,7 @@ func (t *schemaSuite) TestCreateSnapFromMeta(c *C) {
c.Assert(err, IsNil)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
c.Assert(err, IsNil)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false)
c.Assert(err, IsNil)
_, ok := snap.GetTableByName("test", "simple_test1")
c.Assert(ok, IsTrue)
Expand All @@ -677,3 +677,81 @@ func (t *schemaSuite) TestCreateSnapFromMeta(c *C) {
c.Assert(dbInfo.Name.O, Equals, "test2")
c.Assert(len(dbInfo.Tables), Equals, 3)
}

func (t *schemaSuite) TestSnapshotClone(c *C) {
store, err := mockstore.NewMockTikvStore()
c.Assert(err, IsNil)

session.SetSchemaLease(0)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(c, store)
tk.MustExec("create database test2")
tk.MustExec("create table test.simple_test1 (id bigint primary key)")
tk.MustExec("create table test.simple_test2 (id bigint primary key)")
tk.MustExec("create table test2.simple_test3 (id bigint primary key)")
tk.MustExec("create table test2.simple_test4 (id bigint primary key)")
tk.MustExec("create table test2.simple_test5 (a bigint)")
ver, err := store.CurrentVersion()
c.Assert(err, IsNil)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
c.Assert(err, IsNil)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* explicitTables */)
c.Assert(err, IsNil)

clone := snap.Clone()
c.Assert(clone.tableNameToID, DeepEquals, snap.tableNameToID)
c.Assert(clone.schemaNameToID, DeepEquals, snap.schemaNameToID)
c.Assert(clone.truncateTableID, DeepEquals, snap.truncateTableID)
c.Assert(clone.ineligibleTableID, DeepEquals, snap.ineligibleTableID)
c.Assert(clone.currentTs, Equals, snap.currentTs)
c.Assert(clone.explicitTables, Equals, snap.explicitTables)
c.Assert(len(clone.tables), Equals, len(snap.tables))
c.Assert(len(clone.schemas), Equals, len(snap.schemas))
c.Assert(len(clone.partitionTable), Equals, len(snap.partitionTable))

tableCount := len(snap.tables)
clone.tables = make(map[int64]*model.TableInfo)
c.Assert(len(snap.tables), Equals, tableCount)
}

func (t *schemaSuite) TestExplicitTables(c *C) {
store, err := mockstore.NewMockTikvStore()
c.Assert(err, IsNil)

session.SetSchemaLease(0)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(c, store)
ver1, err := store.CurrentVersion()
c.Assert(err, IsNil)
tk.MustExec("create database test2")
tk.MustExec("create table test.simple_test1 (id bigint primary key)")
tk.MustExec("create table test.simple_test2 (id bigint unique key)")
tk.MustExec("create table test2.simple_test3 (a bigint)")
tk.MustExec("create table test2.simple_test4 (a varchar(20) unique key)")
tk.MustExec("create table test2.simple_test5 (a varchar(20))")
ver2, err := store.CurrentVersion()
c.Assert(err, IsNil)
meta1, err := kv.GetSnapshotMeta(store, ver1.Ver)
c.Assert(err, IsNil)
snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* explicitTables */)
c.Assert(err, IsNil)
meta2, err := kv.GetSnapshotMeta(store, ver2.Ver)
c.Assert(err, IsNil)
snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* explicitTables */)
c.Assert(err, IsNil)
snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* explicitTables */)
c.Assert(err, IsNil)

c.Assert(len(snap2.tables)-len(snap1.tables), Equals, 5)
// some system tables are also ineligible
c.Assert(len(snap2.ineligibleTableID), GreaterEqual, 4)

c.Assert(len(snap3.tables)-len(snap1.tables), Equals, 5)
c.Assert(snap3.ineligibleTableID, HasLen, 0)
}
5 changes: 4 additions & 1 deletion cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,10 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool {
}

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible() bool {
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
if forceReplicate {
return true
}
if ti.IsView() {
return true
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (o *Owner) newChangeFeed(
if err != nil {
return nil, errors.Trace(err)
}
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, checkpointTs)
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, checkpointTs, info.Config.ForceReplicate)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (o *Owner) newChangeFeed(
log.Warn("table not found for table ID", zap.Int64("tid", tid))
continue
}
if !tblInfo.IsEligible() {
if !tblInfo.IsEligible(info.Config.ForceReplicate) {
log.Warn("skip ineligible table", zap.Int64("tid", tid), zap.Stringer("table", table))
continue
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) {
}()
t := meta.NewMeta(txn)

schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(t, 0)
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(t, 0, false)
c.Assert(err, check.IsNil)

cf := &changeFeed{
Expand All @@ -755,6 +755,7 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) {
orphanTables: make(map[model.TableID]model.Ts),
toCleanTables: make(map[model.TableID]model.Ts),
filter: f,
info: &model.ChangeFeedInfo{Config: config.GetDefaultReplicaConfig()},
}
for i, job := range jobs {
err = cf.schema.HandleDDL(job)
Expand Down
Loading

0 comments on commit fea8a70

Please sign in to comment.