diff --git a/api/v2/api.go b/api/v2/api.go index 37b61344..2c87dad9 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -37,9 +37,11 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { v2.Use(middleware.ErrorHandleMiddleware()) v2.GET("status", api.serverStatus) - // For compatibility with the old API, + // For compatibility with the old API. // TiDB Operator relies on this API to determine whether the TiCDC node is healthy. router.GET("/status", api.serverStatus) + // Intergration test relies on this API to determine whether the TiCDC node is healthy. + router.GET("/debug/info", gin.WrapF(api.handleDebugInfo)) coordinatorMiddleware := middleware.ForwardToCoordinatorMiddleware(api.server) diff --git a/api/v2/info.go b/api/v2/info.go new file mode 100644 index 00000000..d44b63ec --- /dev/null +++ b/api/v2/info.go @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "fmt" + "net/http" +) + +func (h *OpenAPIV2) handleDebugInfo(w http.ResponseWriter, req *http.Request) { + ctx, cli := req.Context(), h.server.GetEtcdClient() + fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n") + kvs, err := cli.GetAllCDCInfo(ctx) + if err != nil { + fmt.Fprintf(w, "failed to get info: %s\n\n", err.Error()) + return + } + + for _, kv := range kvs { + fmt.Fprintf(w, "%s\n\t%s\n\n", string(kv.Key), string(kv.Value)) + } +} diff --git a/coordinator/controller.go b/coordinator/controller.go index 62a6a533..a3b54197 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -107,8 +107,8 @@ func NewController( nodes := c.nodeManager.GetAliveNodes() // detect the capture changes c.nodeManager.RegisterNodeChangeHandler("coordinator-controller", func(allNodes map[node.ID]*node.Info) { - //c.nodeChanged.Store(true) - c.onNodeChanged() + c.nodeChanged.Store(true) + // c.onNodeChanged() }) log.Info("changefeed bootstrap initial nodes", zap.Int("nodes", len(nodes))) diff --git a/logservice/schemastore/multi_version.go b/logservice/schemastore/multi_version.go index b1af0ec7..1a65b974 100644 --- a/logservice/schemastore/multi_version.go +++ b/logservice/schemastore/multi_version.go @@ -344,7 +344,6 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) { break } } - } case model.ActionReorganizePartition: physicalIDs := getAllPartitionIDs(event.TableInfo) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 61798b21..0b135537 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -413,7 +413,7 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter for { allTargetTs := make([]uint64, 0, limit) p.mu.RLock() - // log.Info("fetchTableTriggerDDLEvents", + // log.Debug("fetchTableTriggerDDLEvents in persistentStorage", // zap.Any("start", start), // zap.Int("limit", limit), // zap.Any("tableTriggerDDLHistory", p.tableTriggerDDLHistory)) @@ -1370,10 +1370,13 @@ func getDroppedIDs(oldIDs []int64, newIDs []int64) []int64 { } func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent { - wrapTableInfo := common.WrapTableInfo( - rawEvent.CurrentSchemaID, - rawEvent.CurrentSchemaName, - rawEvent.TableInfo) + var wrapTableInfo *common.TableInfo + if rawEvent.TableInfo != nil { + wrapTableInfo = common.WrapTableInfo( + rawEvent.CurrentSchemaID, + rawEvent.CurrentSchemaName, + rawEvent.TableInfo) + } ddlEvent := commonEvent.DDLEvent{ Type: rawEvent.Type, diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index ac09e429..513b8d04 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -375,6 +375,7 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) { for _, e := range ddlEvents { c.sendDDL(ctx, remoteID, e, task.dispatcherStat) } + task.dispatcherStat.watermark.Store(dataRange.EndTs) // After all the events are sent, we send the watermark to the dispatcher. c.sendWatermark(remoteID, task.dispatcherStat, @@ -406,7 +407,6 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) { dml.Seq = task.dispatcherStat.seq.Add(1) c.emitSyncPointEventIfNeeded(dml.CommitTs, task.dispatcherStat, remoteID) c.messageCh <- newWrapDMLEvent(remoteID, dml, task.dispatcherStat.getEventSenderState()) - task.dispatcherStat.watermark.Store(dml.CommitTs) task.dispatcherStat.metricEventServiceSendKvCount.Add(float64(dml.Len())) } diff --git a/pkg/sink/mysql/mysql_writer.go b/pkg/sink/mysql/mysql_writer.go index ffd1a706..c816e058 100644 --- a/pkg/sink/mysql/mysql_writer.go +++ b/pkg/sink/mysql/mysql_writer.go @@ -270,6 +270,9 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error { case commonEvent.InfluenceTypeNormal: tableIds = append(tableIds, relatedTables.TableIDs...) case commonEvent.InfluenceTypeDB: + if w.tableSchemaStore == nil { + log.Panic("table schema store is nil") + } ids := w.tableSchemaStore.GetTableIdsByDB(relatedTables.SchemaID) tableIds = append(tableIds, ids...) case commonEvent.InfluenceTypeAll: diff --git a/tests/integration_tests/charset_gbk/data/test.sql b/tests/integration_tests/charset_gbk/data/test.sql index dc10d096..61dec071 100644 --- a/tests/integration_tests/charset_gbk/data/test.sql +++ b/tests/integration_tests/charset_gbk/data/test.sql @@ -127,14 +127,14 @@ CREATE TABLE t1 ( PRIMARY KEY (id) ) ENGINE = InnoDB; -ALTER TABLE t1 - ADD COLUMN 城市 char(32); +-- ALTER TABLE t1 +-- ADD COLUMN 城市 char(32); -ALTER TABLE t1 - MODIFY COLUMN 城市 varchar(32); +-- ALTER TABLE t1 +-- MODIFY COLUMN 城市 varchar(32); -ALTER TABLE t1 - DROP COLUMN 城市; +-- ALTER TABLE t1 +-- DROP COLUMN 城市; /* this is a DDL test for table */ CREATE TABLE 表2 ( diff --git a/utils/threadpool/task.go b/utils/threadpool/task.go index 5b05a960..e3db379e 100644 --- a/utils/threadpool/task.go +++ b/utils/threadpool/task.go @@ -15,6 +15,7 @@ package threadpool import ( "runtime" + "sync/atomic" "time" ) @@ -49,10 +50,9 @@ type FuncTask func() time.Time type TaskHandle struct { st *scheduledTask - ts *threadPoolImpl } -func (h *TaskHandle) Cancel() { h.ts.cancel(h.st) } +func (h *TaskHandle) Cancel() { h.st.cancel() } type ThreadPool interface { Submit(task Task, next time.Time) *TaskHandle @@ -75,12 +75,16 @@ type funcTaskImpl struct { func (t *funcTaskImpl) Execute() time.Time { return t.f() } type scheduledTask struct { - task Task - time time.Time // Next disired execution time. time.Time{} means the task is done. + task Task + time time.Time // Next disired execution time. time.Time{} means the task is done. + canceled atomic.Bool heapIndex int // The index of the task in the heap. } +func (m *scheduledTask) cancel() { m.canceled.Store(true) } +func (m *scheduledTask) isCanceled() bool { return m.canceled.Load() } + func (m *scheduledTask) SetHeapIndex(index int) { m.heapIndex = index } func (m *scheduledTask) GetHeapIndex() int { return m.heapIndex } func (m *scheduledTask) LessThan(other *scheduledTask) bool { return m.time.Before(other.time) } diff --git a/utils/threadpool/thread_pool.go b/utils/threadpool/thread_pool.go index a06aaeb2..56bd69c3 100644 --- a/utils/threadpool/thread_pool.go +++ b/utils/threadpool/thread_pool.go @@ -39,7 +39,7 @@ func (t *threadPoolImpl) Submit(task Task, next time.Time) *TaskHandle { task: task, } t.reactor.newTaskChan <- taskAndTime{st, next} - return &TaskHandle{st, t} + return &TaskHandle{st} } func (t *threadPoolImpl) SubmitFunc(task FuncTask, next time.Time) *TaskHandle { @@ -55,10 +55,6 @@ func (t *threadPoolImpl) Stop() { t.reactor.wg.Wait() } -func (t *threadPoolImpl) cancel(st *scheduledTask) { - t.reactor.newTaskChan <- taskAndTime{st, time.Time{}} -} - func (t *threadPoolImpl) executeTasks() { defer t.wg.Done() @@ -68,7 +64,7 @@ func (t *threadPoolImpl) executeTasks() { return case task := <-t.pendingTaskChan: // Canceled task will not be executed and dropped. - if !task.time.IsZero() { + if !task.isCanceled() { next := task.task.Execute() if !next.IsZero() { t.reactor.newTaskChan <- taskAndTime{task, next}