diff --git a/downstreamadapter/sink/helper/eventrouter/event_router.go b/downstreamadapter/sink/helper/eventrouter/event_router.go index 97ed79a0e..0734d08a0 100644 --- a/downstreamadapter/sink/helper/eventrouter/event_router.go +++ b/downstreamadapter/sink/helper/eventrouter/event_router.go @@ -82,24 +82,20 @@ func (s *EventRouter) GetTopicForRowChange(tableInfo *common.TableInfo) string { // GetTopicForDDL returns the target topic for DDL. func (s *EventRouter) GetTopicForDDL(ddl *commonEvent.DDLEvent) string { - schema := ddl.SchemaName - table := ddl.TableName - - // TODO: fix this - //var schema, table string - // if ddl.PreTableInfo != nil { - // if ddl.PreTableInfo.TableName.Table == "" { - // return s.defaultTopic - // } - // schema = ddl.PreTableInfo.TableName.Schema - // table = ddl.PreTableInfo.TableName.Table - // } else { - // if ddl.TableInfo.TableName.Table == "" { - // return s.defaultTopic - // } - // schema = ddl.TableInfo.TableName.Schema - // table = ddl.TableInfo.TableName.Table - // } + var schema, table string + preTableInfo := ddl.GetPreTableInfo() + if preTableInfo != nil { + schema, table = preTableInfo.GetSchemaName(), preTableInfo.GetTableName() + if table == "" { + return s.defaultTopic + } + } else { + tableInfo := ddl.GetTableInfo() + schema, table = tableInfo.GetSchemaName(), tableInfo.GetTableName() + if table == "" { + return s.defaultTopic + } + } topicGenerator := s.matchTopicGenerator(schema, table) return topicGenerator.Substitute(schema, table) diff --git a/downstreamadapter/worker/kafka_ddl_worker.go b/downstreamadapter/worker/kafka_ddl_worker.go index b269d1158..712c8d1ed 100644 --- a/downstreamadapter/worker/kafka_ddl_worker.go +++ b/downstreamadapter/worker/kafka_ddl_worker.go @@ -16,6 +16,7 @@ import ( "github.com/pingcap/ticdc/pkg/sink/codec/encoder" "github.com/pingcap/ticdc/pkg/sink/util" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" + ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" "go.uber.org/zap" ) @@ -126,38 +127,65 @@ func (w *KafkaDDLWorker) encodeAndSendDDLEvents() error { zap.String("changefeed", w.changeFeedID.Name())) return nil } - message, err := w.encoder.EncodeDDLEvent(event) - if err != nil { - log.Error("Failed to encode ddl event", - zap.String("namespace", w.changeFeedID.Namespace()), - zap.String("changefeed", w.changeFeedID.Name()), - zap.Error(err)) - continue - } - topic := w.eventRouter.GetTopicForDDL(event) - partitionNum, err := w.topicManager.GetPartitionNum(w.ctx, topic) - if err != nil { - log.Error("failed to get partition number for topic", zap.String("topic", topic), zap.Error(err)) - continue + messages := make([]*ticommon.Message, 0) + topics := make([]string, 0) + // Some ddl event may be multi-events, we need to split it into multiple messages. + // Such as rename table test.table1 to test.table10, test.table2 to test.table20 + if event.IsMultiEvents() { + subEvents := event.GetSubEvents() + for _, subEvent := range subEvents { + message, err := w.encoder.EncodeDDLEvent(subEvent) + if err != nil { + log.Error("Failed to encode ddl event", + zap.String("namespace", w.changeFeedID.Namespace()), + zap.String("changefeed", w.changeFeedID.Name()), + zap.Error(err)) + continue + } + messages = append(messages, message) + topics = append(topics, w.eventRouter.GetTopicForDDL(subEvent)) + } + } else { + message, err := w.encoder.EncodeDDLEvent(event) + if err != nil { + log.Error("Failed to encode ddl event", + zap.String("namespace", w.changeFeedID.Namespace()), + zap.String("changefeed", w.changeFeedID.Name()), + zap.Error(err)) + continue + } + messages = append(messages, message) + topic := w.eventRouter.GetTopicForDDL(event) + topics = append(topics, topic) } - if w.partitionRule == PartitionAll { + for i, message := range messages { + topic := topics[i] + + partitionNum, err := w.topicManager.GetPartitionNum(w.ctx, topic) + if err != nil { + log.Error("failed to get partition number for topic", zap.String("topic", topic), zap.Error(err)) + continue + } + + if w.partitionRule == PartitionAll { + err = w.statistics.RecordDDLExecution(func() error { + return w.producer.SyncBroadcastMessage(w.ctx, topic, partitionNum, message) + }) + return errors.Trace(err) + } err = w.statistics.RecordDDLExecution(func() error { - return w.producer.SyncBroadcastMessage(w.ctx, topic, partitionNum, message) + return w.producer.SyncSendMessage(w.ctx, topic, 0, message) }) - return errors.Trace(err) - } - err = w.statistics.RecordDDLExecution(func() error { - return w.producer.SyncSendMessage(w.ctx, topic, 0, message) - }) - if err != nil { - log.Error("Failed to RecordDDLExecution", - zap.String("namespace", w.changeFeedID.Namespace()), - zap.String("changefeed", w.changeFeedID.Name()), - zap.Error(err)) - continue + if err != nil { + log.Error("Failed to RecordDDLExecution", + zap.String("namespace", w.changeFeedID.Namespace()), + zap.String("changefeed", w.changeFeedID.Name()), + zap.Error(err)) + continue + } } }