Skip to content

Commit

Permalink
sinkv2(ticdc): add interfaces (#6017)
Browse files Browse the repository at this point in the history
ref #5928
  • Loading branch information
Rustin170506 authored Jun 27, 2022
1 parent 433fef2 commit e4bab46
Show file tree
Hide file tree
Showing 14 changed files with 468 additions and 0 deletions.
10 changes: 10 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ type RowChangedEvent struct {
SplitTxn bool `json:"-" msg:"-"`
}

// GetCommitTs returns the commit timestamp of this event.
func (r *RowChangedEvent) GetCommitTs() uint64 {
return r.CommitTs
}

// IsDelete returns true if the row is a delete event
func (r *RowChangedEvent) IsDelete() bool {
return len(r.PreColumns) != 0 && len(r.Columns) == 0
Expand Down Expand Up @@ -611,6 +616,11 @@ type SingleTableTxn struct {
FinishWg *sync.WaitGroup
}

// GetCommitTs returns the commit timestamp of the transaction.
func (t *SingleTableTxn) GetCommitTs() uint64 {
return t.CommitTs
}

// Append adds a row changed event into SingleTableTxn
func (t *SingleTableTxn) Append(row *RowChangedEvent) {
if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.Table.TableID != t.Table.TableID {
Expand Down
33 changes: 33 additions & 0 deletions cdc/sinkv2/ddlsink/ddl_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddlsink

import (
"context"

"github.com/pingcap/tiflow/cdc/model"
)

// DDLEventSink is the interface for sink of DDL events.
type DDLEventSink interface {
// WriteDDLEvent writes a DDL event to the sink.
// Note: This is a synchronous and thread-safe method.
WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
// WriteCheckpointTs writes a checkpoint timestamp to the sink.
// Note: This is a synchronous and thread-safe method.
// This only for MQSink for now.
WriteCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error
// Close closes the sink.
Close() error
}
Empty file added cdc/sinkv2/ddlsink/mq/.keep
Empty file.
Empty file added cdc/sinkv2/ddlsink/mysql/.keep
Empty file.
42 changes: 42 additions & 0 deletions cdc/sinkv2/eventsink/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package eventsink

import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
)

// TableEvent is the interface for events which can be written to sink by TableSink.
type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
}

// CallbackFunc is the callback function for callbackable event.
type CallbackFunc func()

// CallbackableEvent means the event can be callbacked.
// It also contains the table status.
type CallbackableEvent[E TableEvent] struct {
Event E
Callback CallbackFunc
TableStatus *pipeline.TableState
}

// RowChangeCallbackableEvent is the row change event which can be callbacked.
type RowChangeCallbackableEvent = CallbackableEvent[*model.RowChangedEvent]

// TxnCallbackableEvent is the txn event which can be callbacked.
type TxnCallbackableEvent = CallbackableEvent[*model.SingleTableTxn]
51 changes: 51 additions & 0 deletions cdc/sinkv2/eventsink/event_appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License

package eventsink

import "github.com/pingcap/tiflow/cdc/model"

// Appender is the interface for appending events to buffer.
type Appender[E TableEvent] interface {
// Append appends the event to buffer.
Append(buffer []E, rows ...*model.RowChangedEvent) []E
}

// Assert Appender[E TableEvent] implementation
var _ Appender[*model.RowChangedEvent] = (*RowChangeEventAppender)(nil)

// RowChangeEventAppender is the builder for RowChangedEvent.
type RowChangeEventAppender struct{}

// Append appends the given rows to the given buffer.
func (r *RowChangeEventAppender) Append(
buffer []*model.RowChangedEvent,
rows ...*model.RowChangedEvent,
) []*model.RowChangedEvent {
return append(buffer, rows...)
}

// Assert Appender[E TableEvent] implementation
var _ Appender[*model.SingleTableTxn] = (*TxnEventAppender)(nil)

// TxnEventAppender is the appender for SingleTableTxn.
type TxnEventAppender struct{}

// Append appends the given rows to the given txn buffer.
func (t *TxnEventAppender) Append(
buffer []*model.SingleTableTxn,
rows ...*model.RowChangedEvent,
) []*model.SingleTableTxn {
// TODO implement me
panic("implement me")
}
23 changes: 23 additions & 0 deletions cdc/sinkv2/eventsink/event_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package eventsink

// EventSink is the interface for event sink.
type EventSink[E TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(rows ...*CallbackableEvent[E])
// Close closes the sink.
Close() error
}
37 changes: 37 additions & 0 deletions cdc/sinkv2/eventsink/mq/mq_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mq

import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
)

// Assert EventSink[E event.TableEvent] implementation
var _ eventsink.EventSink[*model.RowChangedEvent] = (*Sink)(nil)

// Sink is the mq sink.
type Sink struct{}

// WriteEvents writes events to the sink.
func (s *Sink) WriteEvents(rows ...*eventsink.RowChangeCallbackableEvent) {
// TODO implement me
panic("implement me")
}

// Close closes the sink.
func (s *Sink) Close() error {
// TODO implement me
panic("implement me")
}
37 changes: 37 additions & 0 deletions cdc/sinkv2/eventsink/txn/txn_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
)

// Assert EventSink[E event.TableEvent] implementation
var _ eventsink.EventSink[*model.SingleTableTxn] = (*Sink)(nil)

// Sink is the sink for SingleTableTxn.
type Sink struct{}

// WriteEvents writes events to the sink.
func (s *Sink) WriteEvents(rows ...*eventsink.TxnCallbackableEvent) {
// TODO implement me
panic("implement me")
}

// Close closes the sink.
func (s *Sink) Close() error {
// TODO implement me
panic("implement me")
}
87 changes: 87 additions & 0 deletions cdc/sinkv2/tablesink/event_table_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tablesink

import (
"sort"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
)

// Assert TableSink implementation
var _ TableSink = (*eventTableSink[*model.RowChangedEvent])(nil)
var _ TableSink = (*eventTableSink[*model.SingleTableTxn])(nil)

type eventTableSink[E eventsink.TableEvent] struct {
eventID uint64
maxResolvedTs model.ResolvedTs
backendSink eventsink.EventSink[E]
progressTracker *progressTracker
eventAppender eventsink.Appender[E]
// NOTICE: It is ordered by commitTs.
eventBuffer []E
state *pipeline.TableState
}

func (e *eventTableSink[E]) AppendRowChangedEvents(rows ...*model.RowChangedEvent) {
e.eventBuffer = e.eventAppender.Append(e.eventBuffer, rows...)
}

func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) {
// If resolvedTs is not greater than maxResolvedTs,
// the flush is unnecessary.
if !e.maxResolvedTs.Less(resolvedTs) {
return
}
e.maxResolvedTs = resolvedTs

i := sort.Search(len(e.eventBuffer), func(i int) bool {
return e.eventBuffer[i].GetCommitTs() > resolvedTs.Ts
})
// Despite the lack of data, we have to move forward with progress.
if i == 0 {
e.progressTracker.addResolvedTs(e.eventID, resolvedTs)
e.eventID++
return
}
resolvedEvents := e.eventBuffer[:i]

resolvedCallbackableEvents := make([]*eventsink.CallbackableEvent[E], 0, len(resolvedEvents))
for _, ev := range resolvedEvents {
ce := &eventsink.CallbackableEvent[E]{
Event: ev,
Callback: func() {
e.progressTracker.remove(e.eventID)
},
TableStatus: e.state,
}
resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce)
e.progressTracker.addEvent(e.eventID)
e.eventID++
}
// Do not forget to add the resolvedTs to progressTracker.
e.progressTracker.addResolvedTs(e.eventID, resolvedTs)
e.eventID++
e.backendSink.WriteEvents(resolvedCallbackableEvents...)
}

func (e *eventTableSink[E]) GetCheckpointTs() model.ResolvedTs {
return e.progressTracker.minTs()
}

func (e *eventTableSink[E]) Close() {
e.state.Store(pipeline.TableStateStopped)
}
Loading

0 comments on commit e4bab46

Please sign in to comment.