Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: flush redo meta in owner instead of processor (#6313) #6321

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,26 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
if newCheckpointTs > barrierTs {
newCheckpointTs = barrierTs
}
prevResolvedTs := c.state.Status.ResolvedTs
var flushedCheckpointTs, flushedResolvedTs model.Ts
if c.redoManager.Enabled() {
// newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled,
// we can only upload it to etcd after it has been flushed into redo meta.
c.redoManager.UpdateMeta(newCheckpointTs, newResolvedTs)
c.redoManager.GetFlushedMeta(&flushedCheckpointTs, &flushedResolvedTs)
log.Debug("owner gets flushed meta",
zap.Uint64("flushedResolvedTs", flushedResolvedTs),
zap.Uint64("flushedCheckpointTs", flushedCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs))
if flushedResolvedTs != 0 {
// It's not necessary to replace newCheckpointTs with flushedResolvedTs,
// as cdc can ensure newCheckpointTs can never exceed prevResolvedTs.
newResolvedTs = flushedResolvedTs
} else {
newResolvedTs = prevResolvedTs
}
}
c.updateStatus(newCheckpointTs, newResolvedTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
} else if c.state.Status != nil {
Expand Down Expand Up @@ -369,7 +389,7 @@ LOOP:
}()

stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
redoManagerOpts := &redo.ManagerOptions{EnableBgRunner: false}
redoManagerOpts := &redo.ManagerOptions{EnableBgRunner: true}
redoManager, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts)
if err != nil {
return err
Expand Down
39 changes: 10 additions & 29 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ type processor struct {
schemaStorage entry.SchemaStorage
lastSchemaTs model.Ts

filter *filter.Filter
mounter entry.Mounter
sink sink.Sink
redoManager redo.LogManager
lastRedoFlush time.Time
filter *filter.Filter
mounter entry.Mounter
sink sink.Sink
redoManager redo.LogManager

initialized bool
errCh chan error
Expand Down Expand Up @@ -245,13 +244,12 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) {
func newProcessor(ctx cdcContext.Context, upStream *upstream.Upstream) *processor {
changefeedID := ctx.ChangefeedVars().ID
p := &processor{
upStream: upStream,
tables: make(map[model.TableID]tablepipeline.TablePipeline),
errCh: make(chan error, 1),
changefeedID: changefeedID,
captureInfo: ctx.GlobalVars().CaptureInfo,
cancel: func() {},
lastRedoFlush: time.Now(),
upStream: upStream,
tables: make(map[model.TableID]tablepipeline.TablePipeline),
errCh: make(chan error, 1),
changefeedID: changefeedID,
captureInfo: ctx.GlobalVars().CaptureInfo,
cancel: func() {},

metricResolvedTsGauge: resolvedTsGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
Expand Down Expand Up @@ -376,9 +374,6 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
if err := p.lazyInit(ctx); err != nil {
return errors.Trace(err)
}
if err := p.flushRedoLogMeta(ctx); err != nil {
return err
}
// it is no need to check the error here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := p.upStream.PDClock.CurrentTime()
Expand Down Expand Up @@ -881,20 +876,6 @@ func (p *processor) doGCSchemaStorage(ctx cdcContext.Context) {
p.metricSchemaStorageGcTsGauge.Set(float64(lastSchemaPhysicalTs))
}

// flushRedoLogMeta flushes redo log meta, including resolved-ts and checkpoint-ts
func (p *processor) flushRedoLogMeta(ctx context.Context) error {
if p.redoManager.Enabled() &&
time.Since(p.lastRedoFlush).Milliseconds() > p.changefeed.Info.Config.Consistent.FlushIntervalInMs {
st := p.changefeed.Status
err := p.redoManager.UpdateCheckpointTs(ctx, st.CheckpointTs)
if err != nil {
return err
}
p.lastRedoFlush = time.Now()
}
return nil
}

func (p *processor) Close() error {
log.Info("processor closing ...",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down
70 changes: 14 additions & 56 deletions cdc/redo/common/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate msgp

package common

import (
"encoding/binary"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const (
Expand Down Expand Up @@ -59,59 +56,20 @@ const (

// LogMeta is used for store meta info.
type LogMeta struct {
CheckPointTs uint64
ResolvedTsList map[int64]uint64
CheckpointTs uint64 `msg:"checkpointTs"`
ResolvedTs uint64 `msg:"resolvedTs"`
}

// ResolvedTs returns the minimal resolved timestamp of all tables.
// If no resolved timestamp is record, return checkpoint instead.
func (z LogMeta) ResolvedTs() model.Ts {
var minResolvedTs uint64 = math.MaxUint64
for _, ts := range z.ResolvedTsList {
if ts < minResolvedTs {
minResolvedTs = ts
// ParseMeta parses meta.
func ParseMeta(metas []*LogMeta, checkpointTs, resolvedTs *model.Ts) {
*checkpointTs = 0
*resolvedTs = 0
for _, meta := range metas {
if *checkpointTs < meta.CheckpointTs {
*checkpointTs = meta.CheckpointTs
}
if *resolvedTs < meta.ResolvedTs {
*resolvedTs = meta.ResolvedTs
}
}
if minResolvedTs == math.MaxUint64 {
return z.CheckPointTs
}
return minResolvedTs
}

// MarshalMsg implements msgp.Marshaler
func (z LogMeta) MarshalMsg(b []byte) ([]byte, error) {
buff := make([]byte, 16+16*len(z.ResolvedTsList))
binary.LittleEndian.PutUint64(buff[0:8], z.CheckPointTs)
binary.LittleEndian.PutUint64(buff[8:16], uint64(len(z.ResolvedTsList)))

tStart := 16
for tID, ts := range z.ResolvedTsList {
binary.LittleEndian.PutUint64(buff[tStart:tStart+8], uint64(tID))
binary.LittleEndian.PutUint64(buff[tStart+8:tStart+16], ts)
tStart += 16
}
b = append(b, buff...)
return b, nil
}

// UnmarshalMsg implements msgp.Unmarshaler
func (z *LogMeta) UnmarshalMsg(b []byte) ([]byte, error) {
if len(b) < 16 {
return b, cerror.WrapError(cerror.ErrUnmarshalFailed, errors.New("unmarshal redo meta"))
}
z.CheckPointTs = binary.LittleEndian.Uint64(b[0:8])
tsListLen := binary.LittleEndian.Uint64(b[8:16])
z.ResolvedTsList = make(map[model.TableID]model.Ts, tsListLen)
if len(b) < 16+16*int(tsListLen) {
return b, cerror.WrapError(cerror.ErrUnmarshalFailed, errors.New("unmarshal redo meta"))
}

tStart := 16
for i := 0; i < int(tsListLen); i++ {
tID := int64(binary.LittleEndian.Uint64(b[tStart : tStart+8]))
ts := binary.LittleEndian.Uint64(b[tStart+8 : tStart+16])
z.ResolvedTsList[tID] = ts
tStart += 16
}
return b[16*tsListLen+16:], nil
}
135 changes: 135 additions & 0 deletions cdc/redo/common/redo_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading