Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: catch up with upstream (1/2) #14

Merged
merged 7 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Set up Go 1.16
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.16
go-version: 1.17

- name: Build
run: script/cibuild
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: "CodeQL analysis"

on:
push:
pull_request:
schedule:
- cron: '0 0 * * 0'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.16
go-version: 1.17
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
4 changes: 2 additions & 2 deletions .github/workflows/replica-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Set up Go 1.16
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.16
go-version: 1.17

- name: migration tests
env:
Expand Down
4 changes: 3 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ linters:
enable:
- gosimple
- govet
- unused
- noctx
- rowserrcheck
- sqlclosecheck
- unused

2 changes: 1 addition & 1 deletion Dockerfile.packaging
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.16.4
FROM golang:1.17

RUN apt-get update
RUN apt-get install -y ruby ruby-dev rubygems build-essential
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.test
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.16.4
FROM golang:1.17
LABEL maintainer="github@github.com"

RUN apt-get update
Expand Down
13 changes: 12 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/github/gh-ost

go 1.16
go 1.17

require (
github.com/go-ini/ini v1.62.0
Expand All @@ -13,6 +13,17 @@ require (
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7
golang.org/x/text v0.3.6
)

require (
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 // indirect
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
)
34 changes: 34 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type MigrationContext struct {
AlterStatement string
AlterStatementOptions string // anything following the 'ALTER TABLE [schema.]table' from AlterStatement

countMutex sync.Mutex
countTableRowsCancelFunc func()
CountTableRows bool
ConcurrentCountTableRows bool
AllowedRunningOnMaster bool
Expand Down Expand Up @@ -185,7 +187,9 @@ type MigrationContext struct {
CurrentLag int64
currentProgress uint64
etaNanoseonds int64
ThrottleHTTPIntervalMillis int64
ThrottleHTTPStatusCode int64
ThrottleHTTPTimeoutMillis int64
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
TotalDMLEventsApplied int64
Expand Down Expand Up @@ -438,6 +442,36 @@ func (this *MigrationContext) IsTransactionalTable() bool {
return false
}

// SetCountTableRowsCancelFunc sets the cancel function for the CountTableRows query context
func (this *MigrationContext) SetCountTableRowsCancelFunc(f func()) {
this.countMutex.Lock()
defer this.countMutex.Unlock()

this.countTableRowsCancelFunc = f
}

// IsCountingTableRows returns true if the migration has a table count query running
func (this *MigrationContext) IsCountingTableRows() bool {
this.countMutex.Lock()
defer this.countMutex.Unlock()

return this.countTableRowsCancelFunc != nil
}

// CancelTableRowsCount cancels the CountTableRows query context. It is safe to
// call function even when IsCountingTableRows is false.
func (this *MigrationContext) CancelTableRowsCount() {
this.countMutex.Lock()
defer this.countMutex.Unlock()

if this.countTableRowsCancelFunc == nil {
return
}

this.countTableRowsCancelFunc()
this.countTableRowsCancelFunc = nil
}

// ElapsedTime returns time since very beginning of the process
func (this *MigrationContext) ElapsedTime() time.Duration {
return time.Since(this.StartTime)
Expand Down
47 changes: 21 additions & 26 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,24 @@ type GoMySQLReader struct {
LastAppliedRowsEventHint mysql.BinlogCoordinates
}

func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *GoMySQLReader, err error) {
binlogReader = &GoMySQLReader{
func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
connectionConfig := migrationContext.InspectorConnectionConfig
return &GoMySQLReader{
migrationContext: migrationContext,
connectionConfig: migrationContext.InspectorConnectionConfig,
connectionConfig: connectionConfig,
currentCoordinates: mysql.BinlogCoordinates{},
currentCoordinatesMutex: &sync.Mutex{},
binlogSyncer: nil,
binlogStreamer: nil,
binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{
ServerID: uint32(migrationContext.ReplicaServerId),
Flavor: gomysql.MySQLFlavor,
Host: connectionConfig.Key.Hostname,
Port: uint16(connectionConfig.Key.Port),
User: connectionConfig.User,
Password: connectionConfig.Password,
TLSConfig: connectionConfig.TLSConfig(),
UseDecimal: true,
}),
}

serverId := uint32(migrationContext.ReplicaServerId)

binlogSyncerConfig := replication.BinlogSyncerConfig{
ServerID: serverId,
Flavor: "mysql",
Host: binlogReader.connectionConfig.Key.Hostname,
Port: uint16(binlogReader.connectionConfig.Key.Port),
User: binlogReader.connectionConfig.User,
Password: binlogReader.connectionConfig.Password,
TLSConfig: binlogReader.connectionConfig.TLSConfig(),
UseDecimal: true,
}
binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig)

return binlogReader, err
}

// ConnectBinlogStreamer
Expand Down Expand Up @@ -145,15 +138,17 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
}()
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {

switch binlogEvent := ev.Event.(type) {
case *replication.RotateEvent:
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
this.currentCoordinates.LogFile = string(binlogEvent.NextLogName)
}()
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), rotateEvent.NextLogName)
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), binlogEvent.NextLogName)
case *replication.RowsEvent:
if err := this.handleRowsEvent(ev, binlogEvent, entriesChannel); err != nil {
return err
}
}
Expand Down
6 changes: 4 additions & 2 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2016 GitHub Inc.
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

Expand Down Expand Up @@ -111,6 +111,8 @@ func main() {
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
flag.Int64Var(&migrationContext.ThrottleHTTPIntervalMillis, "throttle-http-interval-millis", 100, "Number of milliseconds to wait before triggering another HTTP throttle check")
flag.Int64Var(&migrationContext.ThrottleHTTPTimeoutMillis, "throttle-http-timeout-millis", 1000, "Number of milliseconds to use as an HTTP throttle check timeout")
ignoreHTTPErrors := flag.Bool("ignore-http-errors", false, "ignore HTTP connection errors during throttle check")
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
Expand Down Expand Up @@ -299,7 +301,7 @@ func main() {
log.Infof("starting gh-ost %+v", AppVersion)
acceptSignals(migrationContext)

migrator := logic.NewMigrator(migrationContext)
migrator := logic.NewMigrator(migrationContext, AppVersion)
err := migrator.Migrate()
if err != nil {
migrator.ExecOnFailureHook()
Expand Down
22 changes: 21 additions & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,28 @@ func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
return err
}

// ReadMigrationRangeValues reads min/max values that will be used for rowcopy
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy.
// Before read min/max, write a changelog state into the ghc table to avoid lost data in mysql two-phase commit.
/*
Detail description of the lost data in mysql two-phase commit issue by @Fanduzi:
When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC,
if an INSERT statement is being committed but blocks due to an unmet ack count,
the data inserted by the transaction is not visible to ReadMigrationRangeValues,
so the copy of the existing data in the table does not include the new row inserted by the transaction.
However, the binlog event for the transaction is already written to the binlog,
so the addDMLEventsListener only captures the binlog event after the transaction,
and thus the transaction's binlog event is not captured, resulting in data loss.

If write a changelog into ghc table before ReadMigrationRangeValues, and the transaction commit blocks
because the ack is not met, then the changelog will not be able to write, so the ReadMigrationRangeValues
will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the
newly inserted data, thus Avoiding data loss due to the above problem.
*/
func (this *Applier) ReadMigrationRangeValues() error {
if _, err := this.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil {
return err
}

if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
return err
}
Expand Down
37 changes: 34 additions & 3 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package logic

import (
"context"
gosql "database/sql"
"fmt"
"reflect"
Expand Down Expand Up @@ -532,18 +533,48 @@ func (this *Inspector) estimateTableRowsViaExplain() error {
return nil
}

// Kill kills a query for connectionID.
// - @amason: this should go somewhere _other_ than `logic`, but I couldn't decide
// between `base`, `sql`, or `mysql`.
func Kill(db *gosql.DB, connectionID string) error {
_, err := db.Exec(`KILL QUERY %s`, connectionID)
return err
}

// CountTableRows counts exact number of rows on the original table
func (this *Inspector) CountTableRows() error {
func (this *Inspector) CountTableRows(ctx context.Context) error {
atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 0)

this.migrationContext.Log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")

conn, err := this.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()

var connectionID string
if err := conn.QueryRowContext(ctx, `SELECT /* gh-ost */ CONNECTION_ID()`).Scan(&connectionID); err != nil {
return err
}

query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
var rowsEstimate int64
if err := this.db.QueryRow(query).Scan(&rowsEstimate); err != nil {
return err
if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil {
switch err {
case context.Canceled, context.DeadlineExceeded:
this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err())
return Kill(this.db, connectionID)
default:
return err
}
}

// row count query finished. nil out the cancel func, so the main migration thread
// doesn't bother calling it after row copy is done.
this.migrationContext.SetCountTableRowsCancelFunc(nil)

atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate)
this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate

Expand Down
Loading