Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resuming binlog streaming #156 #160

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 122 additions & 25 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,66 @@ import (

const caughtUpThreshold = 10 * time.Second

type BinlogStreamer struct {
DB *sql.DB
DBConfig *DatabaseConfig
MyServerId uint32
ErrorHandler ErrorHandler
Filter CopyFilter
type BinlogPosition struct {
// A binlog position emitted by the binlog-streamer consists of two parts:
// First, the last emitted event position, which refers to the event that
// we received from the MySQL master and that we hand to clients. Second,
// a position from which we can resume a binlog-streamer.
// Ideally, these two values would be the same, but in reality they are
// not, because some events are streamed in a series (e.g. DML events
// require a table-map events to be seen before).
// As a result, we always stream event positions as a pair - if a binlog
// streamer is resumed from an event that is not safe to resume from, we
// resume from the most recent (earlier) event from which we can safely
// resume and simply suppress emitting these events up to the point of the
// last event returned.
//
// the actual binlog position of an event emitted by the streamer
EventPosition mysql.Position
// the position from which one needs to point the streamer if we want to
// resume from after this event
ResumePosition mysql.Position
}

func NewResumableBinlogPosition(pos mysql.Position) BinlogPosition {
return BinlogPosition{pos, pos}
}

TableSchema TableSchemaCache
func (p BinlogPosition) Compare(o BinlogPosition) int {
// comparison always happens on the actual event
return p.EventPosition.Compare(o.EventPosition)
}

binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
lastStreamedBinlogPosition mysql.Position
targetBinlogPosition mysql.Position
lastProcessedEventTime time.Time
lastLagMetricEmittedTime time.Time
func (b BinlogPosition) String() string {
return fmt.Sprintf("Position(event %s, resume at %s)", b.EventPosition, b.ResumePosition)
}

type BinlogStreamer struct {
DB *sql.DB
DBConfig *DatabaseConfig
MyServerId uint32
ErrorHandler ErrorHandler
Filter CopyFilter

TableSchema TableSchemaCache

binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
// what is the last event that we ever received from the streamer
lastStreamedBinlogPosition mysql.Position
// what is the last event that we received and from which it is possible
// to resume
lastResumeBinlogPosition mysql.Position
// if we have resumed from an earlier position than where we last streamed
// to (that is, if lastResumeBinlogPosition is before
// lastStreamedBinlogPosition when resuming), up to what event should we
// suppress emitting events
suppressEmitUpToBinlogPosition mysql.Position
// up to what position to we want to continue streaming (if a stop was
// requested)
targetBinlogPosition mysql.Position
lastProcessedEventTime time.Time
lastLagMetricEmittedTime time.Time

stopRequested bool

Expand Down Expand Up @@ -77,40 +122,49 @@ func (s *BinlogStreamer) createBinlogSyncer() error {
return nil
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error) {
func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error) {
s.ensureLogger()

currentPosition, err := ShowMasterStatusBinlogPosition(s.DB)
if err != nil {
s.logger.WithError(err).Error("failed to read current binlog position")
return mysql.Position{}, err
return BinlogPosition{}, err
}

return s.ConnectBinlogStreamerToMysqlFrom(currentPosition)
return s.ConnectBinlogStreamerToMysqlFrom(NewResumableBinlogPosition(currentPosition))
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error) {
func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition BinlogPosition) (BinlogPosition, error) {
s.ensureLogger()

err := s.createBinlogSyncer()
if err != nil {
return mysql.Position{}, err
return BinlogPosition{}, err
}

s.lastStreamedBinlogPosition = startFromBinlogPosition
if startFromBinlogPosition.EventPosition.Compare(startFromBinlogPosition.ResumePosition) < 0 {
err = fmt.Errorf("invalid resume position %s: last event must not be before resume position", startFromBinlogPosition)
return BinlogPosition{}, err
}

s.lastStreamedBinlogPosition = startFromBinlogPosition.EventPosition
s.suppressEmitUpToBinlogPosition = startFromBinlogPosition.EventPosition
s.lastResumeBinlogPosition = startFromBinlogPosition.ResumePosition

s.logger.WithFields(logrus.Fields{
"file": s.lastStreamedBinlogPosition.Name,
"pos": s.lastStreamedBinlogPosition.Pos,
"stream.file": s.lastStreamedBinlogPosition.Name,
"stream.pos": s.lastStreamedBinlogPosition.Pos,
"resume.file": s.lastResumeBinlogPosition.Name,
"resume.pos": s.lastResumeBinlogPosition.Pos,
}).Info("starting binlog streaming")

s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition)
s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastResumeBinlogPosition)
if err != nil {
s.logger.WithError(err).Error("unable to start binlog streamer")
return mysql.Position{}, err
return BinlogPosition{}, err
}

return s.lastStreamedBinlogPosition, err
return startFromBinlogPosition, err
}

func (s *BinlogStreamer) Run() {
Expand Down Expand Up @@ -233,13 +287,44 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(ev *replication.BinlogEven
eventTime := time.Unix(int64(ev.Header.Timestamp), 0)
s.lastProcessedEventTime = eventTime

if resumablePosition, evIsResumable := s.getResumePositionForEvent(ev); evIsResumable {
s.lastResumeBinlogPosition = resumablePosition
}

if time.Since(s.lastLagMetricEmittedTime) >= time.Second {
lag := time.Since(eventTime)
metrics.Gauge("BinlogStreamer.Lag", lag.Seconds(), nil, 1.0)
s.lastLagMetricEmittedTime = time.Now()
}
}

func (s *BinlogStreamer) getResumePositionForEvent(ev *replication.BinlogEvent) (resumablePosition mysql.Position, evIsResumable bool) {
// resuming from a RowsEvent is not possible, as it may be followed by
// another rows-event without a subsequent TableMapEvent. Thus, if we have
// a rows-event, we need to keep resuming from whatever last non-rows-event
//
// The same is true for TableMapEvents themselves, as we cannot resume right
// after the event: we need to re-stream the event itself to get ready for
// a RowsEvent
switch ev.Event.(type) {
case *replication.RowsEvent, *replication.TableMapEvent:
// it's not resumable - we need to return whatever was save to resume
// from before
resumablePosition = s.lastResumeBinlogPosition
default:
// it is safe to resume from here
evIsResumable = true
resumablePosition = mysql.Position{
// The filename is only changed and visible during the RotateEvent, which
// is handled transparently in Run().
Name: s.lastStreamedBinlogPosition.Name,
Pos: ev.Header.LogPos,
}
}

return
}

func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
eventTime := time.Unix(int64(ev.Header.Timestamp), 0)
rowsEvent := ev.Event.(*replication.RowsEvent)
Expand All @@ -256,12 +341,24 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
Pos: ev.Header.LogPos,
}

// we may still be searching for the first event to stream to listeners, if
// we resumed reading upstream events from an earlier event
if pos.Compare(s.suppressEmitUpToBinlogPosition) <= 0 {
return nil
}

resumePosition, _ := s.getResumePositionForEvent(ev)
binlogPosition := BinlogPosition{
EventPosition: pos,
ResumePosition: resumePosition,
}

table := s.TableSchema.Get(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table))
if table == nil {
return nil
}

dmlEvs, err := NewBinlogDMLEvents(table, ev, pos)
dmlEvs, err := NewBinlogDMLEvents(table, ev, binlogPosition)
if err != nil {
return err
}
Expand Down
15 changes: 7 additions & 8 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/shopspring/decimal"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
)
Expand Down Expand Up @@ -43,13 +42,13 @@ type DMLEvent interface {
OldValues() RowData
NewValues() RowData
PaginationKey() (uint64, error)
BinlogPosition() mysql.Position
BinlogPosition() BinlogPosition
}

// The base of DMLEvent to provide the necessary methods.
type DMLEventBase struct {
table *TableSchema
pos mysql.Position
pos BinlogPosition
}

func (e *DMLEventBase) Database() string {
Expand All @@ -64,7 +63,7 @@ func (e *DMLEventBase) TableSchema() *TableSchema {
return e.table
}

func (e *DMLEventBase) BinlogPosition() mysql.Position {
func (e *DMLEventBase) BinlogPosition() BinlogPosition {
return e.pos
}

Expand All @@ -73,7 +72,7 @@ type BinlogInsertEvent struct {
*DMLEventBase
}

func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
insertEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -117,7 +116,7 @@ type BinlogUpdateEvent struct {
*DMLEventBase
}

func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
// UPDATE events have two rows in the RowsEvent. The first row is the
// entries of the old record (for WHERE) and the second row is the
// entries of the new record (for SET).
Expand Down Expand Up @@ -177,7 +176,7 @@ func (e *BinlogDeleteEvent) NewValues() RowData {
return nil
}

func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
deleteEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -205,7 +204,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) {
return paginationKeyFromEventData(e.table, e.oldValues)
}

func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos BinlogPosition) ([]DMLEvent, error) {
rowsEvent := ev.Event.(*replication.RowsEvent)

for _, row := range rowsEvent.Rows {
Expand Down
8 changes: 4 additions & 4 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (f *Ferry) Start() error {
// miss some records that are inserted between the time the
// DataIterator determines the range of IDs to copy and the time that
// the starting binlog coordinates are determined.
var pos siddontangmysql.Position
var pos BinlogPosition
var err error
if f.StateToResumeFrom != nil {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition())
Expand All @@ -504,9 +504,9 @@ func (f *Ferry) Start() error {
return err
}

// If we don't set this now, there is a race condition where Ghostferry
// If we don't set this now, there is a race condition where ghostferry
// is terminated with some rows copied but no binlog events are written.
// This guarentees that we are able to restart from a valid location.
// This guarantees that we are able to restart from a valid location.
f.StateTracker.UpdateLastWrittenBinlogPosition(pos)
if f.inlineVerifier != nil {
f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos)
Expand Down Expand Up @@ -749,7 +749,7 @@ func (f *Ferry) Progress() *Progress {
s.Throttled = f.Throttler.Throttled()

// Binlog Progress
s.LastSuccessfulBinlogPos = f.BinlogStreamer.lastStreamedBinlogPosition
s.LastSuccessfulBinlogPos = f.BinlogStreamer.GetLastStreamedBinlogPosition()
s.BinlogStreamerLag = time.Now().Sub(f.BinlogStreamer.lastProcessedEventTime).Seconds()
s.FinalBinlogPos = f.BinlogStreamer.targetBinlogPosition

Expand Down
5 changes: 2 additions & 3 deletions sharding/test/copy_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/sharding"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -142,15 +141,15 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() {
}

for _, tenantId := range tenantIds {
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{})
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), ghostferry.BinlogPosition{})
applicable, err := t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Nil(err)
t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId))
}
}

func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() {
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{})
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), ghostferry.BinlogPosition{})
_, err = t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error())
}
Expand Down
18 changes: 8 additions & 10 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"math"
"sync"
"time"

"github.com/siddontang/go-mysql/mysql"
)

// StateTracker design
Expand Down Expand Up @@ -36,13 +34,13 @@ type SerializableState struct {

LastSuccessfulPaginationKeys map[string]uint64
CompletedTables map[string]bool
LastWrittenBinlogPosition mysql.Position
LastStoredBinlogPositionForInlineVerifier mysql.Position
LastWrittenBinlogPosition BinlogPosition
LastStoredBinlogPositionForInlineVerifier BinlogPosition
BinlogVerifyStore BinlogVerifySerializedStore
}

func (s *SerializableState) MinBinlogPosition() mysql.Position {
nilPosition := mysql.Position{}
func (s *SerializableState) MinBinlogPosition() BinlogPosition {
nilPosition := BinlogPosition{}
if s.LastWrittenBinlogPosition == nilPosition {
return s.LastStoredBinlogPositionForInlineVerifier
}
Expand Down Expand Up @@ -82,8 +80,8 @@ type StateTracker struct {
BinlogRWMutex *sync.RWMutex
CopyRWMutex *sync.RWMutex

lastWrittenBinlogPosition mysql.Position
lastStoredBinlogPositionForInlineVerifier mysql.Position
lastWrittenBinlogPosition BinlogPosition
lastStoredBinlogPositionForInlineVerifier BinlogPosition

lastSuccessfulPaginationKeys map[string]uint64
completedTables map[string]bool
Expand Down Expand Up @@ -113,14 +111,14 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri
return s
}

func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) {
func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos BinlogPosition) {
s.BinlogRWMutex.Lock()
defer s.BinlogRWMutex.Unlock()

s.lastWrittenBinlogPosition = pos
}

func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos mysql.Position) {
func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos BinlogPosition) {
s.BinlogRWMutex.Lock()
defer s.BinlogRWMutex.Unlock()

Expand Down
Loading