Skip to content

Commit

Permalink
minor fix for controller, schemastore, eventbroker and thread pool (#459
Browse files Browse the repository at this point in the history
)

* fix ut

* fix thread pool

* fix event broker doScan
  • Loading branch information
CharlesCheung96 authored Nov 5, 2024
1 parent dbe92d6 commit a17e884
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 26 deletions.
4 changes: 3 additions & 1 deletion api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
v2.Use(middleware.ErrorHandleMiddleware())

v2.GET("status", api.serverStatus)
// For compatibility with the old API,
// For compatibility with the old API.
// TiDB Operator relies on this API to determine whether the TiCDC node is healthy.
router.GET("/status", api.serverStatus)
// Intergration test relies on this API to determine whether the TiCDC node is healthy.
router.GET("/debug/info", gin.WrapF(api.handleDebugInfo))

coordinatorMiddleware := middleware.ForwardToCoordinatorMiddleware(api.server)

Expand Down
33 changes: 33 additions & 0 deletions api/v2/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"fmt"
"net/http"
)

func (h *OpenAPIV2) handleDebugInfo(w http.ResponseWriter, req *http.Request) {
ctx, cli := req.Context(), h.server.GetEtcdClient()
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
kvs, err := cli.GetAllCDCInfo(ctx)
if err != nil {
fmt.Fprintf(w, "failed to get info: %s\n\n", err.Error())
return
}

for _, kv := range kvs {
fmt.Fprintf(w, "%s\n\t%s\n\n", string(kv.Key), string(kv.Value))
}
}
4 changes: 2 additions & 2 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func NewController(
nodes := c.nodeManager.GetAliveNodes()
// detect the capture changes
c.nodeManager.RegisterNodeChangeHandler("coordinator-controller", func(allNodes map[node.ID]*node.Info) {
//c.nodeChanged.Store(true)
c.onNodeChanged()
c.nodeChanged.Store(true)
// c.onNodeChanged()
})
log.Info("changefeed bootstrap initial nodes",
zap.Int("nodes", len(nodes)))
Expand Down
1 change: 0 additions & 1 deletion logservice/schemastore/multi_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
break
}
}

}
case model.ActionReorganizePartition:
physicalIDs := getAllPartitionIDs(event.TableInfo)
Expand Down
13 changes: 8 additions & 5 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter
for {
allTargetTs := make([]uint64, 0, limit)
p.mu.RLock()
// log.Info("fetchTableTriggerDDLEvents",
// log.Debug("fetchTableTriggerDDLEvents in persistentStorage",
// zap.Any("start", start),
// zap.Int("limit", limit),
// zap.Any("tableTriggerDDLHistory", p.tableTriggerDDLHistory))
Expand Down Expand Up @@ -1370,10 +1370,13 @@ func getDroppedIDs(oldIDs []int64, newIDs []int64) []int64 {
}

func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent {
wrapTableInfo := common.WrapTableInfo(
rawEvent.CurrentSchemaID,
rawEvent.CurrentSchemaName,
rawEvent.TableInfo)
var wrapTableInfo *common.TableInfo
if rawEvent.TableInfo != nil {
wrapTableInfo = common.WrapTableInfo(
rawEvent.CurrentSchemaID,
rawEvent.CurrentSchemaName,
rawEvent.TableInfo)
}

ddlEvent := commonEvent.DDLEvent{
Type: rawEvent.Type,
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
for _, e := range ddlEvents {
c.sendDDL(ctx, remoteID, e, task.dispatcherStat)
}
task.dispatcherStat.watermark.Store(dataRange.EndTs)
// After all the events are sent, we send the watermark to the dispatcher.
c.sendWatermark(remoteID,
task.dispatcherStat,
Expand Down Expand Up @@ -406,7 +407,6 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
dml.Seq = task.dispatcherStat.seq.Add(1)
c.emitSyncPointEventIfNeeded(dml.CommitTs, task.dispatcherStat, remoteID)
c.messageCh <- newWrapDMLEvent(remoteID, dml, task.dispatcherStat.getEventSenderState())
task.dispatcherStat.watermark.Store(dml.CommitTs)
task.dispatcherStat.metricEventServiceSendKvCount.Add(float64(dml.Len()))
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sink/mysql/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {
case commonEvent.InfluenceTypeNormal:
tableIds = append(tableIds, relatedTables.TableIDs...)
case commonEvent.InfluenceTypeDB:
if w.tableSchemaStore == nil {
log.Panic("table schema store is nil")
}
ids := w.tableSchemaStore.GetTableIdsByDB(relatedTables.SchemaID)
tableIds = append(tableIds, ids...)
case commonEvent.InfluenceTypeAll:
Expand Down
12 changes: 6 additions & 6 deletions tests/integration_tests/charset_gbk/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ CREATE TABLE t1 (
PRIMARY KEY (id)
) ENGINE = InnoDB;

ALTER TABLE t1
ADD COLUMN 城市 char(32);
-- ALTER TABLE t1
-- ADD COLUMN 城市 char(32);

ALTER TABLE t1
MODIFY COLUMN 城市 varchar(32);
-- ALTER TABLE t1
-- MODIFY COLUMN 城市 varchar(32);

ALTER TABLE t1
DROP COLUMN 城市;
-- ALTER TABLE t1
-- DROP COLUMN 城市;

/* this is a DDL test for table */
CREATE TABLE 表2 (
Expand Down
12 changes: 8 additions & 4 deletions utils/threadpool/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package threadpool

import (
"runtime"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -49,10 +50,9 @@ type FuncTask func() time.Time

type TaskHandle struct {
st *scheduledTask
ts *threadPoolImpl
}

func (h *TaskHandle) Cancel() { h.ts.cancel(h.st) }
func (h *TaskHandle) Cancel() { h.st.cancel() }

type ThreadPool interface {
Submit(task Task, next time.Time) *TaskHandle
Expand All @@ -75,12 +75,16 @@ type funcTaskImpl struct {
func (t *funcTaskImpl) Execute() time.Time { return t.f() }

type scheduledTask struct {
task Task
time time.Time // Next disired execution time. time.Time{} means the task is done.
task Task
time time.Time // Next disired execution time. time.Time{} means the task is done.
canceled atomic.Bool

heapIndex int // The index of the task in the heap.
}

func (m *scheduledTask) cancel() { m.canceled.Store(true) }
func (m *scheduledTask) isCanceled() bool { return m.canceled.Load() }

func (m *scheduledTask) SetHeapIndex(index int) { m.heapIndex = index }
func (m *scheduledTask) GetHeapIndex() int { return m.heapIndex }
func (m *scheduledTask) LessThan(other *scheduledTask) bool { return m.time.Before(other.time) }
8 changes: 2 additions & 6 deletions utils/threadpool/thread_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t *threadPoolImpl) Submit(task Task, next time.Time) *TaskHandle {
task: task,
}
t.reactor.newTaskChan <- taskAndTime{st, next}
return &TaskHandle{st, t}
return &TaskHandle{st}
}

func (t *threadPoolImpl) SubmitFunc(task FuncTask, next time.Time) *TaskHandle {
Expand All @@ -55,10 +55,6 @@ func (t *threadPoolImpl) Stop() {
t.reactor.wg.Wait()
}

func (t *threadPoolImpl) cancel(st *scheduledTask) {
t.reactor.newTaskChan <- taskAndTime{st, time.Time{}}
}

func (t *threadPoolImpl) executeTasks() {
defer t.wg.Done()

Expand All @@ -68,7 +64,7 @@ func (t *threadPoolImpl) executeTasks() {
return
case task := <-t.pendingTaskChan:
// Canceled task will not be executed and dropped.
if !task.time.IsZero() {
if !task.isCanceled() {
next := task.task.Execute()
if !next.IsZero() {
t.reactor.newTaskChan <- taskAndTime{task, next}
Expand Down

0 comments on commit a17e884

Please sign in to comment.