Skip to content

Commit

Permalink
Merge branch 'master' into kafka-go-producer
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Feb 1, 2023
2 parents a355901 + 9c20bf4 commit 428ddce
Show file tree
Hide file tree
Showing 115 changed files with 502 additions and 9,046 deletions.
11 changes: 6 additions & 5 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,18 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/resume", api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", api.pauseChangefeed)

// processor apis
processorGroup := v2.Group("/processors")
processorGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
processorGroup.GET("/:changefeed_id/:capture_id", api.getProcessor)

// capture apis
captureGroup := v2.Group("/captures")
captureGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
captureGroup.POST("/:capture_id/drain", api.drainCapture)
captureGroup.GET("", api.listCaptures)

// processor apis
processorGroup := v2.Group("/processors")
processorGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
processorGroup.GET("/:changefeed_id/:capture_id", api.getProcessor)
processorGroup.GET("", api.listProcessors)

verifyTableGroup := v2.Group("/verify_table")
verifyTableGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
verifyTableGroup.POST("", api.verifyTable)
Expand Down
10 changes: 9 additions & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ type ChangefeedConfig struct {
PDConfig
}

// ProcessorCommonInfo holds the common info of a processor
type ProcessorCommonInfo struct {
Namespace string `json:"namespace"`
ChangeFeedID string `json:"changefeed_id"`
CaptureID string `json:"capture_id"`
}

// ReplicaConfig is a duplicate of config.ReplicaConfig
type ReplicaConfig struct {
MemoryQuota uint64 `json:"memory_quota"`
Expand Down Expand Up @@ -553,7 +560,8 @@ type ChangeFeedInfo struct {
CreatorVersion string `json:"creator_version,omitempty"`
}

// RunningError represents some running error from cdc components, such as processor.
// RunningError represents some running error from cdc components,
// such as processor.
type RunningError struct {
Addr string `json:"addr"`
Code string `json:"code"`
Expand Down
40 changes: 38 additions & 2 deletions cdc/api/v2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func (h *OpenAPIV2) getProcessor(c *gin.Context) {
)
return
}
info, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)

info, err := h.capture.StatusProvider().GetChangeFeedInfo(
ctx,
changefeedID,
)
if err != nil {
_ = c.Error(err)
return
Expand All @@ -63,7 +67,7 @@ func (h *OpenAPIV2) getProcessor(c *gin.Context) {
cerror.WrapError(
cerror.ErrAPIInvalidParam,
fmt.Errorf("changefeed in abnormal state: %s, "+
"can't get processors of an abnormal changefeed",
"can't get processor of an abnormal changefeed",
string(info.State),
),
),
Expand Down Expand Up @@ -111,3 +115,35 @@ func (h *OpenAPIV2) getProcessor(c *gin.Context) {
}
c.JSON(http.StatusOK, &processorDetail)
}

// listProcessors lists all processors in the TiCDC cluster
// @Summary List processors
// @Description list all processors in the TiCDC cluster
// @Tags processor
// @Produce json
// @Success 200 {array} model.ProcessorCommonInfo
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/processors [get]
func (h *OpenAPIV2) listProcessors(c *gin.Context) {
ctx := c.Request.Context()
infos, err := h.capture.StatusProvider().GetProcessors(ctx)
if err != nil {
_ = c.Error(err)
return
}
prcInfos := make([]ProcessorCommonInfo, 0, len(infos))
for i, info := range infos {
resp := ProcessorCommonInfo{
Namespace: info.CfID.Namespace,
ChangeFeedID: info.CfID.ID,
CaptureID: info.CaptureID,
}
prcInfos[i] = resp
}
resp := &ListResponse[ProcessorCommonInfo]{
Total: len(prcInfos),
Items: prcInfos,
}

c.JSON(http.StatusOK, resp)
}
13 changes: 1 addition & 12 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory"
ssystem "github.com/pingcap/tiflow/cdc/sorter/db/system"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -95,11 +94,6 @@ type captureImpl struct {
createEtcdClient createEtcdClientFunc
EtcdClient etcd.CDCEtcdClient

// useSortEngine indicates whether to use the new pull based sort engine or
// the old push based sorter system. the latter will be removed after all sorter
// have been transformed into pull based sort engine.
useSortEngine bool
sorterSystem *ssystem.System
sortEngineFactory *factory.SortEngineFactory

// MessageServer is the receiver of the messages from the other nodes.
Expand Down Expand Up @@ -134,7 +128,6 @@ func NewCapture(pdEndpoints []string,
createEtcdClient createEtcdClientFunc,
grpcService *p2p.ServerWrapper,
sortEngineMangerFactory *factory.SortEngineFactory,
sorterSystem *ssystem.System,
) Capture {
return &captureImpl{
config: config.GetGlobalServerConfig(),
Expand All @@ -147,10 +140,7 @@ func NewCapture(pdEndpoints []string,
newOwner: owner.NewOwner,
info: &model.CaptureInfo{},
createEtcdClient: createEtcdClient,

useSortEngine: sortEngineMangerFactory != nil,
sortEngineFactory: sortEngineMangerFactory,
sorterSystem: sorterSystem,
sortEngineFactory: sortEngineMangerFactory,
}
}

Expand Down Expand Up @@ -328,7 +318,6 @@ func (c *captureImpl) run(stdCtx context.Context) error {
EtcdClient: c.EtcdClient,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
SorterSystem: c.sorterSystem,
SortEngineFactory: c.sortEngineFactory,
})

Expand Down
12 changes: 0 additions & 12 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,13 @@ var (
Help: "Bucketed histogram of processorManager close processor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
})

tableMemoryHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_memory_consumption",
Help: "each table's memory consumption after sorter, in bytes",
Buckets: prometheus.ExponentialBuckets(256, 2.0, 20),
}, []string{"namespace", "changefeed"})

processorMemoryGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "memory_consumption",
Help: "processor's memory consumption estimated in bytes",
}, []string{"namespace", "changefeed"})

remainKVEventsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand All @@ -90,7 +79,6 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(processorSchemaStorageGcTsGauge)
registry.MustRegister(processorTickDuration)
registry.MustRegister(processorCloseDuration)
registry.MustRegister(tableMemoryHistogram)
registry.MustRegister(processorMemoryGauge)
registry.MustRegister(remainKVEventsGauge)
sinkmanager.InitMetrics(registry)
Expand Down
7 changes: 0 additions & 7 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ type processor struct {
metricProcessorErrorCounter prometheus.Counter
metricProcessorTickDuration prometheus.Observer
metricsTableSinkTotalRows prometheus.Counter
metricsTableMemoryHistogram prometheus.Observer
metricsProcessorMemoryGauge prometheus.Gauge
metricRemainKVEventGauge prometheus.Gauge
}
Expand Down Expand Up @@ -424,8 +423,6 @@ func newProcessor(
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricsTableSinkTotalRows: sinkmetric.TableSinkTotalRowsCountCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricsTableMemoryHistogram: tableMemoryHistogram.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricsProcessorMemoryGauge: processorMemoryGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricRemainKVEventGauge: remainKVEventsGauge.
Expand Down Expand Up @@ -1016,12 +1013,8 @@ func (p *processor) cleanupMetrics() {
processorErrorCounter.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorTickDuration.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)

tableMemoryHistogram.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorMemoryGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)

remainKVEventsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)

sinkmetric.TableSinkTotalRowsCountCounter.
DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
}
Expand Down
9 changes: 9 additions & 0 deletions cdc/processor/sinkmanager/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,20 @@ var (
},
// type includes hit and miss.
[]string{"namespace", "changefeed", "type"})

// outputEventCount is the metric that counts events output by the sorter.
outputEventCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "output_event_count",
Help: "The number of events output by the sorter",
}, []string{"namespace", "changefeed", "type"})
)

// InitMetrics registers all metrics in this file.
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(MemoryQuota)
registry.MustRegister(RedoEventCache)
registry.MustRegister(RedoEventCacheAccess)
registry.MustRegister(outputEventCount)
}
9 changes: 4 additions & 5 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
metrics "github.com/pingcap/tiflow/cdc/sorter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -155,7 +154,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// All transactions before currTxnCommitTs are resolved.
err = w.advanceTableSink(task, currTxnCommitTs, committedTxnSize+pendingTxnSize)
} else {
// This means all events of the currenet transaction have been fetched, but we can't
// This means all events of the current transaction have been fetched, but we can't
// ensure whether there are more transaction with the same CommitTs or not.
err = w.advanceTableSinkWithBatchID(task, currTxnCommitTs, committedTxnSize+pendingTxnSize, batchID)
batchID += 1
Expand All @@ -164,7 +163,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
pendingTxnSize = 0
} else if w.splitTxn && currTxnCommitTs > 0 {
// This branch will advance some complete transactions before currTxnCommitTs,
// and one partail transaction with `batchID`.
// and one partial transaction with `batchID`.
err = w.advanceTableSinkWithBatchID(task, currTxnCommitTs, committedTxnSize+pendingTxnSize, batchID)
batchID += 1
committedTxnSize = 0
Expand Down Expand Up @@ -251,7 +250,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
defer func() {
w.metricRedoEventCacheMiss.Add(float64(allEventSize))
task.tableSink.receivedEventCount.Add(int64(allEventCount))
metrics.OutputEventCount.WithLabelValues(
outputEventCount.WithLabelValues(
task.tableSink.changefeed.Namespace,
task.tableSink.changefeed.ID,
"kv",
Expand Down Expand Up @@ -360,7 +359,7 @@ func (w *sinkWorker) fetchFromCache(
newLowerBound = popRes.boundary.Next()
if len(popRes.events) > 0 {
task.tableSink.receivedEventCount.Add(int64(popRes.pushCount))
metrics.OutputEventCount.WithLabelValues(
outputEventCount.WithLabelValues(
task.tableSink.changefeed.Namespace,
task.tableSink.changefeed.ID,
"kv",
Expand Down
19 changes: 10 additions & 9 deletions cdc/processor/sourcemanager/engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
epebble "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/pebble"
metrics "github.com/pingcap/tiflow/cdc/sorter"
"github.com/pingcap/tiflow/pkg/config"
dbMetrics "github.com/pingcap/tiflow/pkg/db"
"go.uber.org/atomic"
"go.uber.org/multierr"
)
Expand Down Expand Up @@ -171,17 +169,20 @@ func (f *SortEngineFactory) collectMetrics() {
for i, db := range f.dbs {
stats := db.Metrics()
id := strconv.Itoa(i + 1)
metrics.OnDiskDataSizeGauge.WithLabelValues(id).Set(float64(stats.DiskSpaceUsage()))
metrics.InMemoryDataSizeGauge.WithLabelValues(id).Set(float64(stats.BlockCache.Size))
dbMetrics.IteratorGauge().WithLabelValues(id).Set(float64(stats.TableIters))
dbMetrics.WriteDelayCount().WithLabelValues(id).Set(float64(stdatomic.LoadUint64(&f.writeStalls[i].counter)))
engine.OnDiskDataSize().WithLabelValues(id).Set(float64(stats.DiskSpaceUsage()))
engine.InMemoryDataSize().WithLabelValues(id).Set(float64(stats.BlockCache.Size))
engine.IteratorGauge().WithLabelValues(id).Set(float64(stats.TableIters))
engine.WriteDelayCount().WithLabelValues(id).
Set(float64(stdatomic.LoadUint64(&f.writeStalls[i].counter)))

metricLevelCount := dbMetrics.LevelCount().MustCurryWith(map[string]string{"id": id})
metricLevelCount := engine.LevelCount().MustCurryWith(map[string]string{"id": id})
for level, metric := range stats.Levels {
metricLevelCount.WithLabelValues(fmt.Sprint(level)).Set(float64(metric.NumFiles))
}
dbMetrics.BlockCacheAccess().WithLabelValues(id, "hit").Set(float64(stats.BlockCache.Hits))
dbMetrics.BlockCacheAccess().WithLabelValues(id, "miss").Set(float64(stats.BlockCache.Misses))
engine.BlockCacheAccess().WithLabelValues(id, "hit").
Set(float64(stats.BlockCache.Hits))
engine.BlockCacheAccess().WithLabelValues(id, "miss").
Set(float64(stats.BlockCache.Misses))
}
}
}
4 changes: 2 additions & 2 deletions cdc/processor/sourcemanager/engine/factory/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

"github.com/cockroachdb/pebble"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
epebble "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/pebble"
metrics "github.com/pingcap/tiflow/cdc/sorter/db"
"github.com/pingcap/tiflow/pkg/config"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -52,7 +52,7 @@ func createPebbleDBs(
}
opts.EventListener.CompactionEnd = func(job pebble.CompactionInfo) {
idstr := strconv.Itoa(id + 1)
x := metrics.SorterCompactionDuration().WithLabelValues(idstr)
x := engine.SorterCompactionDuration().WithLabelValues(idstr)
x.Observe(job.TotalDuration.Seconds())
}
}
Expand Down
Loading

0 comments on commit 428ddce

Please sign in to comment.