From aa3925e6a425c203fa5c7b36530baf10adcac74c Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Fri, 2 Nov 2018 13:19:39 +0800 Subject: [PATCH] DM: fix checkpoints update and flush, refine relay log write and read (#401) --- pkg/streamer/file.go | 1 - pkg/streamer/reader.go | 437 ++++++++++++++++++++++++++--------------- relay/relay.go | 21 +- syncer/syncer.go | 38 ++-- 4 files changed, 327 insertions(+), 170 deletions(-) diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index 90b52fde3f..9d25eeeb95 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -93,7 +93,6 @@ func parseBinlogFile(filename string) (*binlogFile, error) { // chendahui: I found there will always be only one dot in the mysql binlog name. parts := strings.Split(filename, baseSeqSeparator) if len(parts) != 2 { - log.Warnf("[streamer] filename %s not valid", filename) return nil, ErrInvalidBinlogFilename } diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index 00d1faaab4..7c7d7cd389 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -5,7 +5,6 @@ import ( "os" "path" "path/filepath" - "runtime/debug" "strings" "sync" @@ -47,6 +46,8 @@ type BinlogReader struct { indexPath string // relay server-uuid index file path uuids []string // master UUIDs (relay sub dir) + latestServerID uint32 // latest server ID, got from relay log + running bool wg sync.WaitGroup ctx context.Context @@ -93,90 +94,172 @@ func (r *BinlogReader) StartSync(pos mysql.Position) (Streamer, error) { return nil, errors.Trace(err) } + r.latestServerID = 0 r.running = true - s := newLocalStreamer() - updatePosition := func(event *replication.BinlogEvent) { - log.Debugf("[streamer] read event %+v", event.Header) - switch event.Header.EventType { - case replication.ROTATE_EVENT: - rotateEvent := event.Event.(*replication.RotateEvent) - currentPos := mysql.Position{ - Name: string(rotateEvent.NextLogName), - Pos: uint32(rotateEvent.Position), - } - if currentPos.Name > pos.Name { - pos = currentPos // need update Name and Pos - } - log.Infof("[streamer] rotate binlog to %v", pos) - default: - log.Debugf("[streamer] original pos %v, current pos %v", pos.Pos, event.Header.LogPos) - if pos.Pos < event.Header.LogPos { - pos.Pos = event.Header.LogPos - } - } - - } - r.wg.Add(1) go func() { defer r.wg.Done() - for { - select { - case <-r.ctx.Done(): - return - default: - log.Debugf("[streamer] start read from pos %v", pos) - if err := r.onStream(s, pos, updatePosition); err != nil { - s.closeWithError(err) - log.Errorf("[streamer] streaming error %v", errors.ErrorStack(err)) - return - } - } + log.Infof("[streamer] start read from pos %v", pos) + err = r.parseRelay(r.ctx, s, pos) + if errors.Cause(err) == r.ctx.Err() { + log.Infof("[streamer] parse relay finished because %v", r.ctx.Err()) + } else if err != nil { + s.closeWithError(err) + log.Errorf("[streamer] parse relay stopped because %v", errors.ErrorStack(err)) } }() return s, nil } -func (r *BinlogReader) onStream(s *LocalStreamer, pos mysql.Position, updatePos func(event *replication.BinlogEvent)) error { - defer func() { - if e := recover(); e != nil { - s.closeWithError(fmt.Errorf("Err: %v\n Stack: %s", e, string(debug.Stack()))) +// parseRelay parses relay root directory, it support master-slave switch (switching to next sub directory) +func (r *BinlogReader) parseRelay(ctx context.Context, s *LocalStreamer, pos mysql.Position) error { + var ( + needSwitch bool + nextUUID string + nextBinlogName string + err error + ) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + needSwitch, nextUUID, nextBinlogName, err = r.parseDirAsPossible(ctx, s, pos) + if err != nil { + return errors.Trace(err) + } + if !needSwitch { + return errors.NotSupportedf("parse for previous sub relay directory finished, but no next sub directory need to switch") + } + + _, suffixInt, err2 := utils.ParseSuffixForUUID(nextUUID) + if err2 != nil { + return errors.Annotatef(err2, "parse suffix for UUID %s", nextUUID) + } + uuidSuffix := utils.SuffixIntToStr(suffixInt) + + parsed, err2 := parseBinlogFile(nextBinlogName) + if err2 != nil { + return errors.Annotatef(err2, "parse binlog file name %s", nextBinlogName) } - }() - uuidWithSuffix, uuidSuffix, realPos, err := r.extractPos(pos) + // update pos, so can switch to next sub directory + pos.Name = r.constructBinlogName(parsed, uuidSuffix) + pos.Pos = 4 // start from pos 4 for next sub directory / file + } +} + +// parseDirAsPossible parses relay sub directory as far as possible +func (r *BinlogReader) parseDirAsPossible(ctx context.Context, s *LocalStreamer, pos mysql.Position) (needSwitch bool, nextUUID string, nextBinlogName string, err error) { + currentUUID, _, realPos, err := r.extractPos(pos) if err != nil { - return errors.Trace(err) + return false, "", "", errors.Annotatef(err, "parse relay dir with pos %v", pos) } + pos = realPos // use realPos to do syncing + var firstParse = true // the first parse time for the relay log file + var dir = path.Join(r.cfg.RelayDir, currentUUID) - pos = realPos // use realPos to do syncing - binlogDir := path.Join(r.cfg.RelayDir, uuidWithSuffix) + for { + select { + case <-ctx.Done(): + return false, "", "", ctx.Err() + default: + } + files, err := collectBinlogFiles(dir, pos.Name) + if err != nil { + return false, "", "", errors.Annotatef(err, "parse relay dir %s", dir) + } else if len(files) == 0 { + return false, "", "", errors.Errorf("no relay log files match pos %v", pos) + } - files, err := collectBinlogFiles(binlogDir, pos.Name) - if err != nil { - return errors.Trace(err) + log.Debugf("[streamer] start read from directory %s", dir) + + var ( + latestPos int64 + latestName string + offset = int64(pos.Pos) + ) + for i, relayLogFile := range files { + select { + case <-ctx.Done(): + return false, "", "", ctx.Err() + default: + } + if i == 0 { + if !strings.HasSuffix(relayLogFile, pos.Name) { + return false, "", "", errors.Errorf("the first relay log %s not match the start pos %v", relayLogFile, pos) + } + } else { + offset = 4 // for other relay log file, start parse from 4 + firstParse = true // new relay log file need to parse + } + needSwitch, latestPos, nextUUID, nextBinlogName, err = r.parseFileAsPossible(ctx, s, relayLogFile, offset, dir, firstParse, currentUUID, i == len(files)-1) + firstParse = false // already parsed + if err != nil { + return false, "", "", errors.Annotatef(err, "parse relay dir %s", dir) + } + if needSwitch { + // need switch to next relay sub directory + return true, nextUUID, nextBinlogName, nil + } + latestName = relayLogFile // record the latest file name + } + + // update pos, so can re-collect files from the latest file and re start parse from latest pos + pos.Pos = uint32(latestPos) + pos.Name = latestName } +} +// parseFileAsPossible parses single relay log file as far as possible +func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer, relayLogFile string, offset int64, relayLogDir string, firstParse bool, currentUUID string, possibleLast bool) (needSwitch bool, latestPos int64, nextUUID string, nextBinlogName string, err error) { var ( - offset int64 - serverID uint32 - lastFilePath string // last relay log file path - lastFilePos = pos.Pos // last parsed pos for relay log file + needReParse bool ) + latestPos = offset + for { + select { + case <-ctx.Done(): + return false, 0, "", "", ctx.Err() + default: + } + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast) + firstParse = false // set to false to handle the `continue` below + if err != nil { + return false, 0, "", "", errors.Annotatef(err, "parse relay file %s", relayLogFile) + } + if needReParse { + log.Debugf("[streamer] continue to re-parse relay log file %s", relayLogFile) + continue // should continue to parse this file + } + return needSwitch, latestPos, nextUUID, nextBinlogName, nil + } +} - onEventFunc := func(e *replication.BinlogEvent) error { - //TODO: put the implementaion of updatepos here? +// parseFile parses single relay log file from specified offset +func (r *BinlogReader) parseFile(ctx context.Context, s *LocalStreamer, relayLogFile string, offset int64, relayLogDir string, firstParse bool, currentUUID string, possibleLast bool) (needSwitch, needReParse bool, latestPos int64, nextUUID string, nextBinlogName string, err error) { + _, suffixInt, err := utils.ParseSuffixForUUID(currentUUID) + if err != nil { + return false, false, 0, "", "", errors.Trace(err) + } + uuidSuffix := utils.SuffixIntToStr(suffixInt) // current UUID's suffix, which will be added to binlog name + latestPos = offset // set to argument passed in + + onEventFunc := func(e *replication.BinlogEvent) error { + log.Debugf("[streamer] read event %+v", e.Header) if e.Header.Flags&0x0040 != 0 { // now LOG_EVENT_RELAY_LOG_F is only used for events which used to fill the gap in relay log file when switching the master server log.Debugf("skip event %+v created by relay writer", e.Header) return nil } - serverID = e.Header.ServerID // record server_id + r.latestServerID = e.Header.ServerID // record server_id switch e.Header.EventType { case replication.FORMAT_DESCRIPTION_EVENT: @@ -185,140 +268,183 @@ func (r *BinlogReader) onStream(s *LocalStreamer, pos mysql.Position, updatePos // add master UUID suffix to pos.Name env := e.Event.(*replication.RotateEvent) parsed, _ := parseBinlogFile(string(env.NextLogName)) - nameWithSuffix := fmt.Sprintf("%s%s%s%s%s", parsed.baseName, posUUIDSuffixSeparator, uuidSuffix, baseSeqSeparator, parsed.seq) + nameWithSuffix := r.constructBinlogName(parsed, uuidSuffix) env.NextLogName = []byte(nameWithSuffix) - if e.Header.Timestamp != 0 && offset != 0 { + if e.Header.Timestamp != 0 && e.Header.LogPos != 0 { // not fake rotate event, update file pos - lastFilePos = e.Header.LogPos + latestPos = int64(e.Header.LogPos) + } + + currentPos := mysql.Position{ + Name: string(env.NextLogName), + Pos: uint32(env.Position), } + log.Infof("[streamer] rotate binlog to %v", currentPos) default: // update file pos - lastFilePos = e.Header.LogPos + latestPos = int64(e.Header.LogPos) } - updatePos(e) - select { case s.ch <- e: - case <-r.ctx.Done(): - return nil + case <-ctx.Done(): } return nil } - for i, file := range files { - select { - case <-r.ctx.Done(): - return nil - default: - } - - if i == 0 { - offset = int64(pos.Pos) - } else { - offset = 4 // start read from pos 4 - } + fullPath := filepath.Join(relayLogDir, relayLogFile) + log.Debugf("[streamer] start read from relay log file %s", fullPath) - // always send a fake ROTATE_EVENT before parse binlog file + if firstParse { + // if the file is the first time to parse, send a fake ROTATE_EVENT before parse binlog file // ref: https://github.com/mysql/mysql-server/blob/4f1d7cf5fcb11a3f84cff27e37100d7295e7d5ca/sql/rpl_binlog_sender.cc#L248 - e, err2 := utils.GenFakeRotateEvent(file, uint64(offset), serverID) + e, err2 := utils.GenFakeRotateEvent(relayLogFile, uint64(offset), r.latestServerID) if err2 != nil { - return errors.Trace(err2) + return false, false, 0, "", "", errors.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset) } err2 = onEventFunc(e) if err2 != nil { - return errors.Trace(err2) - } - fullpath := filepath.Join(binlogDir, file) - log.Debugf("[streamer] parse file %s from offset %d", fullpath, offset) - if i == len(files)-1 { - lastFilePath = fullpath + return false, false, 0, "", "", errors.Annotatef(err2, "send event %+v", e.Header) } + } - err = r.parser.ParseFile(fullpath, offset, onEventFunc) - if i == len(files)-1 && err != nil && strings.Contains(err.Error(), "err EOF") { - // NOTE: go-mysql returned err not includes caused err, but as message, ref: parser.go `parseSingleEvent` - log.Warnf("[streamer] parse binlog file %s from offset %d got EOF %s", fullpath, offset, errors.ErrorStack(err)) - break // wait for re-parse - } else if err != nil { - log.Errorf("[streamer] parse binlog file %s from offset %d error %s", fullpath, offset, errors.ErrorStack(err)) - return errors.Trace(err) - } + err = r.parser.ParseFile(fullPath, offset, onEventFunc) + if possibleLast && err != nil && strings.Contains(err.Error(), "err EOF") { + // NOTE: go-mysql returned err not includes caused err, but as message, ref: parser.go `parseSingleEvent` + log.Warnf("[streamer] parse binlog file %s from offset %d got EOF %s", fullPath, offset, errors.ErrorStack(err)) + } else if err != nil { + log.Errorf("[streamer] parse binlog file %s from offset %d error %s", fullPath, offset, errors.ErrorStack(err)) + return false, false, 0, "", "", errors.Trace(err) } - // parse current sub dir finished, try switch to next - nextUUID, nextUUIDSuffix := r.getNextUUID(uuidWithSuffix) - if nextUUID != "" { - // next UUID (master relay sub dir) exist - nextBinlogName, err2 := r.getFirstBinlogName(nextUUID) - if err2 != nil { - return errors.Trace(err2) - } + if !possibleLast { + // there are more relay log files in current sub directory, continue to re-collect them + log.Infof("[streamer] more relay log file need to parse in %s", relayLogDir) + return false, false, latestPos, "", "", nil + } - // check relay log whether updated since the last ParseFile returned - fi, err := os.Stat(lastFilePath) - if err != nil { - return errors.Annotatef(err, "get stat for relay log %s", lastFilePath) - } - if uint32(fi.Size()) > lastFilePos { - log.Infof("[streamer] relay log file size has changed from %d to %d", lastFilePos, fi.Size()) - return nil // already updated, we need to parse it again - } + needSwitch, needReParse, nextUUID, nextBinlogName, err = r.needSwitchSubDir(currentUUID, fullPath, int64(latestPos)) + if err != nil { + return false, false, 0, "", "", errors.Trace(err) + } else if needReParse { + // need to re-parse the current relay log file + return false, true, latestPos, "", "", nil + } else if needSwitch { + // need to switch to next relay sub directory + return true, false, 0, nextUUID, nextBinlogName, nil + } - // switch to next sub dir by sending fake ROTATE event - log.Infof("[streamer] try switch relay log to %s/%s", nextUUID, nextBinlogName) - uuidSuffix = nextUUIDSuffix // update uuidSuffix, which will be added to binlog name - e, err2 := utils.GenFakeRotateEvent(nextBinlogName, 4, 0) - if err2 != nil { - return errors.Trace(err2) - } - err2 = onEventFunc(e) - if err2 != nil { - return errors.Trace(err2) - } - return nil // no need to watch anymore + updatedPath, err := r.relaySubDirUpdated(ctx, relayLogDir, fullPath, int64(latestPos)) + if err != nil { + return false, false, 0, "", "", errors.Trace(err) } - // watch dir for whether file count changed (new file generated) - err = r.watcher.Add(binlogDir) + if strings.HasSuffix(updatedPath, relayLogFile) { + // current relay log file updated, need to re-parse it + return false, true, latestPos, "", "", nil + } + + // need parse next relay log file or re-collect files + return false, false, latestPos, "", "", nil +} + +// needSwitchSubDir checks whether the reader need switch to next relay sub directory +func (r *BinlogReader) needSwitchSubDir(currentUUID string, latestFilePath string, latestFileSize int64) (needSwitch, needReParse bool, nextUUID string, nextBinlogName string, err error) { + nextUUID, _ = r.getNextUUID(currentUUID) + if len(nextUUID) == 0 { + // no next sub dir exists, not need to switch + return false, false, "", "", nil + } + + // try get the first binlog file in next sub directory + nextBinlogName, err = r.getFirstBinlogName(nextUUID) if err != nil { - return errors.Annotatef(err, "add watch for relay log dir %s", binlogDir) + // NOTE: current we can not handle `errors.IsNotFound(err)` easily + // because creating sub directory and writing relay log file are not atomic + // so we let user to pause syncing before switching relay's master server + return false, false, "", "", errors.Trace(err) } - defer r.watcher.Remove(binlogDir) - if len(lastFilePath) > 0 { - // check relay log whether updated since the last ParseFile returned - fi, err := os.Stat(lastFilePath) - if err != nil { - return errors.Annotatef(err, "get stat for relay log %s", lastFilePath) - } - if uint32(fi.Size()) > lastFilePos { - log.Infof("[streamer] relay log file size has changed from %d to %d", lastFilePos, fi.Size()) - return nil // already updated, we need to parse it again - } + // check the latest relay log file whether updated when checking next sub directory + cmp, err := r.fileSizeUpdated(latestFilePath, latestFileSize) + if err != nil { + return false, false, "", "", errors.Trace(err) + } else if cmp < 0 { + return false, false, "", "", errors.Errorf("file size of relay log %s become smaller", latestFilePath) + } else if cmp > 0 { + // the latest relay log file already updated, need to parse from it again (not need to switch sub directory) + return false, true, "", "", nil } - select { - case <-r.ctx.Done(): - return nil - case err, ok := <-r.watcher.Errors: - if !ok { - return errors.Errorf("watcher's errors chan for relay log dir %s closed", binlogDir) - } - return errors.Annotatef(err, "relay log dir %s", binlogDir) - case event, ok := <-r.watcher.Events: - if !ok { - return errors.Errorf("watcher's events chan for relay log dir %s closed", binlogDir) - } - log.Debugf("[streamer] watcher receive event %+v", event) + // need to switch to next sub directory + return true, false, nextUUID, nextBinlogName, nil +} - // TODO zxc: refine to choose different re-parse strategy according to event.Name - log.Debugf("[streamer] watcher receive event %+v", event) +// relaySubDirUpdated checks whether the relay sub directory updated +// including file changed, created, removed, etc. +func (r *BinlogReader) relaySubDirUpdated(ctx context.Context, dir string, latestFilePath string, latestFileSize int64) (updatedPath string, err error) { + err = r.watcher.Add(dir) + if err != nil { + return "", errors.Annotatef(err, "add watch for relay log dir %s", dir) } + defer r.watcher.Remove(dir) - log.Debugf("[streamer] onStream exits") - return nil + // check the latest relay log file whether updated when adding watching + cmp, err := r.fileSizeUpdated(latestFilePath, latestFileSize) + if err != nil { + return "", errors.Trace(err) + } else if cmp < 0 { + return "", errors.Errorf("file size of relay log %s become smaller", latestFilePath) + } else if cmp > 0 { + // the latest relay log file already updated, need to parse from it again (not need to re-collect relay log files) + return latestFilePath, nil + } + + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + case err, ok := <-r.watcher.Errors: + if !ok { + return "", errors.Errorf("watcher's errors chan for relay log dir %s closed", dir) + } + return "", errors.Annotatef(err, "relay log dir %s", dir) + case event, ok := <-r.watcher.Events: + if !ok { + return "", errors.Errorf("watcher's events chan for relay log dir %s closed", dir) + } + log.Debugf("[streamer] watcher receive event %+v", event) + baseName := path.Base(event.Name) + _, err = parseBinlogFile(baseName) + if err != nil { + log.Debugf("skip watcher event %+v", event) + continue // not valid binlog created, updated + } + return event.Name, nil + } + } +} + +// fileSizeUpdated checks whether the file's size has updated +// return +// 0: not updated +// 1: update to larger +// -1: update to smaller, should not happen +func (r *BinlogReader) fileSizeUpdated(path string, latestSize int64) (int, error) { + fi, err := os.Stat(path) + if err != nil { + return 0, errors.Annotatef(err, "get stat for relay log %s", path) + } + currSize := fi.Size() + if currSize == latestSize { + return 0, nil + } else if currSize > latestSize { + log.Debugf("[streamer] relay log file size has changed from %d to %d", latestSize, currSize) + return 1, nil + } else { + panic(fmt.Sprintf("relay log file size has changed from %d to %d", latestSize, currSize)) + } } // updateUUIDs re-parses UUID index file and updates UUID list @@ -372,6 +498,11 @@ func (r *BinlogReader) extractPos(pos mysql.Position) (uuidWithSuffix string, uu return } +// constructPosName construct binlog file name with UUID suffix +func (r *BinlogReader) constructBinlogName(originalName *binlogFile, uuidSuffix string) string { + return fmt.Sprintf("%s%s%s%s%s", originalName.baseName, posUUIDSuffixSeparator, uuidSuffix, baseSeqSeparator, originalName.seq) +} + func (r *BinlogReader) getNextUUID(uuid string) (string, string) { for i := len(r.uuids) - 2; i >= 0; i-- { if r.uuids[i] == uuid { diff --git a/relay/relay.go b/relay/relay.go index 02aaba5dae..7c8c8429ce 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/binary" "fmt" + "hash/crc32" "io" "math" "os" @@ -193,9 +194,10 @@ func (r *Relay) process(parentCtx context.Context) error { // 2. create a special streamer to fill the gap // 3. catchup pos after the gap // 4. close the special streamer - gapSyncer *replication.BinlogSyncer // syncer used to fill the gap in relay log file - gapStreamer *replication.BinlogStreamer // streamer used to fill the gap in relay log file - gapSyncEndPos *mysql.Position // the pos of the event after the gap + gapSyncer *replication.BinlogSyncer // syncer used to fill the gap in relay log file + gapStreamer *replication.BinlogStreamer // streamer used to fill the gap in relay log file + gapSyncEndPos *mysql.Position // the pos of the event after the gap + eventFormat *replication.FormatDescriptionEvent // latest FormatDescriptionEvent, used when re-calculate checksum ) closeGapSyncer := func() { @@ -272,6 +274,7 @@ func (r *Relay) process(parentCtx context.Context) error { switch ev := e.Event.(type) { case *replication.FormatDescriptionEvent: // FormatDescriptionEvent is the first event in binlog, we will close old one and create a new + eventFormat = ev // record FormatDescriptionEvent exist, err := r.handleFormatDescriptionEvent(lastPos.Name) if err != nil { return errors.Trace(err) @@ -344,7 +347,7 @@ func (r *Relay) process(parentCtx context.Context) error { } else { // add LOG_EVENT_RELAY_LOG_F flag to events which used to fill the gap // ref: https://dev.mysql.com/doc/internals/en/binlog-event-flag.html - r.addFlagToEvent(e, 0x0040) + r.addFlagToEvent(e, 0x0040, eventFormat) } } @@ -394,7 +397,7 @@ func (r *Relay) process(parentCtx context.Context) error { } // addFlagToEvent adds flag to binlog event -func (r *Relay) addFlagToEvent(e *replication.BinlogEvent, f uint16) { +func (r *Relay) addFlagToEvent(e *replication.BinlogEvent, f uint16, eventFormat *replication.FormatDescriptionEvent) { newF := e.Header.Flags | f // header structure: // 4 byte timestamp @@ -406,6 +409,14 @@ func (r *Relay) addFlagToEvent(e *replication.BinlogEvent, f uint16) { startIdx := 4 + 1 + 4 + 4 + 4 binary.LittleEndian.PutUint16(e.RawData[startIdx:startIdx+2], newF) e.Header.Flags = newF + + // re-calculate checksum if needed + if eventFormat == nil || eventFormat.ChecksumAlgorithm != replication.BINLOG_CHECKSUM_ALG_CRC32 { + return + } + calculatedPart := e.RawData[0 : len(e.RawData)-replication.BinlogChecksumLength] + checksum := crc32.ChecksumIEEE(calculatedPart) + binary.LittleEndian.PutUint32(e.RawData[len(e.RawData)-replication.BinlogChecksumLength:], checksum) } // detectGap detects whether gap exists in relay log file diff --git a/syncer/syncer.go b/syncer/syncer.go index cb1e21ade4..e941c719d3 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -375,8 +375,6 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { s.runFatalChan = make(chan *pb.ProcessError, s.cfg.WorkerCount+1) s.execErrorDetected.Set(false) - // rollback un-flushed checkpoints - s.checkpoint.Rollback() errs := make([]*pb.ProcessError, 0, 2) if s.cfg.IsSharding { @@ -440,6 +438,15 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { // pause because of error occurred s.Pause() } + + // try to rollback checkpoints, if they already flushed, no effect + prePos := s.checkpoint.GlobalPoint() + s.checkpoint.Rollback() + currPos := s.checkpoint.GlobalPoint() + if prePos.Compare(currPos) != 0 { + log.Warnf("[syncer] rollback global checkpoint from %v to %v because error occurred", prePos, currPos) + } + pr <- pb.ProcessResult{ IsCanceled: isCanceled, Errors: errs, @@ -586,17 +593,9 @@ func (s *Syncer) addJob(job *job) error { if wait { s.jobWg.Wait() s.c.reset() - - if s.execErrorDetected.Get() { - // detected errors for executing SQls, skip save checkpoints and return - // can not test len(runFatalChan), it's read by another goroutine - // when recovering the sync from error, checkpoints should be rollback and safe-mode should be enabled - return nil - } - } - if wait { return errors.Trace(s.flushCheckPoints()) } + return nil } @@ -607,7 +606,22 @@ func (s *Syncer) saveGlobalPoint(globalPoint mysql.Position) { s.checkpoint.SaveGlobalPoint(globalPoint) } +// flushCheckPoints flushes previous saved checkpoint in memory to persistent storage, like TiDB +// we flush checkpoints in three cases: +// 1. DDL executed +// 2. at intervals (and job executed) +// 3. pausing / stopping the sync (driven by `s.flushJobs`) +// but when error occurred, we can not flush checkpoint, otherwise data may lost +// and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before +// this should be handled when `s.Run` returned +// +// we may need to refactor the concurrency model to make the work-flow more clearer later func (s *Syncer) flushCheckPoints() error { + if s.execErrorDetected.Get() { + log.Warnf("[syncer] error detected when executing SQL job, skip flush checkpoint (%v)", s.checkpoint.GlobalPoint()) + return nil + } + var exceptTables [][]string if s.cfg.IsSharding { // flush all checkpoints except tables which are unresolved for sharding DDL @@ -808,6 +822,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { log.Errorf("panic. err: %s, stack: %s", err1, debug.Stack()) err = errors.Errorf("panic error: %v", err1) } + // flush the jobs channels, but if error occurred, we should not flush the checkpoints if err1 := s.flushJobs(); err1 != nil { log.Errorf("fail to finish all jobs error: %v", err1) } @@ -949,6 +964,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } s.binlogSizeCount.Add(int64(e.Header.EventSize)) + log.Debugf("[syncer] receive binlog event with header %+v", e.Header) switch ev := e.Event.(type) { case *replication.RotateEvent: currentPos = mysql.Position{