Skip to content

Commit

Permalink
schema: register message table topics on engine
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Perkins <derek@derekperkins.com>
  • Loading branch information
derekperkins committed Jun 26, 2019
1 parent 4b31b8d commit 7fc5551
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Engine struct {
mu sync.Mutex
isOpen bool
tables map[string]*Table
topics map[string][]*Table
lastChange int64
reloadTime time.Duration
notifiers map[string]notifier
Expand Down Expand Up @@ -166,6 +167,9 @@ func (se *Engine) Open() error {
// Skip over the table that had an error and move on to the next one
return
}
// register the message topic on the engine if necessary
se.registerTopic(table)

table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8])
mu.Lock()
tables[tableName] = table
Expand Down Expand Up @@ -336,6 +340,10 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string
tabletenv.InternalErrors.Add("Schema", 1)
return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: failed to load table %s: %v", tableName, err)
}

// register the message topic on the engine if necessary
se.registerTopic(table)

// table_rows, data_length, index_length, max_data_length
table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8])

Expand All @@ -355,6 +363,32 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string
return nil
}

// registerTopic optionally connects the vt_topic metadata on a message table
// to a map of topic strings. A table can belong to only one topic.
func (se *Engine) registerTopic(ta *Table) {
if ta.MessageInfo == nil || ta.MessageInfo.Topic == "" {
return
}

// lazily initialize the topic map if necessary
if se.topics == nil {
se.topics = make(map[string][]*Table)
}

msgTables, ok := se.topics[ta.MessageInfo.Topic]
if ok {
// check to see if this table is already registered to the topic
for _, t := range msgTables {
if t == ta {
return
}
}
}

// append this table to the list of subscribed tables to the topic
se.topics[ta.MessageInfo.Topic] = append(se.topics[ta.MessageInfo.Topic], ta)
}

// RegisterNotifier registers the function for schema change notification.
// It also causes an immediate notification to the caller. The notified
// function must not change the map or its contents. The only exception
Expand Down

0 comments on commit 7fc5551

Please sign in to comment.