Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context/timeout to HTTP throttle check #1131

Merged
merged 21 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ linters:
enable:
- gosimple
- govet
- noctx
- rowserrcheck
- sqlclosecheck
- unused

2 changes: 2 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ type MigrationContext struct {
CurrentLag int64
currentProgress uint64
etaNanoseonds int64
ThrottleHTTPIntervalMillis int64
ThrottleHTTPStatusCode int64
ThrottleHTTPTimeoutMillis int64
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
TotalDMLEventsApplied int64
Expand Down
6 changes: 4 additions & 2 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2016 GitHub Inc.
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

Expand Down Expand Up @@ -110,6 +110,8 @@ func main() {
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
flag.Int64Var(&migrationContext.ThrottleHTTPIntervalMillis, "throttle-http-interval-millis", 100, "Number of milliseconds to wait before triggering another HTTP throttle check")
flag.Int64Var(&migrationContext.ThrottleHTTPTimeoutMillis, "throttle-http-timeout-millis", 1000, "Number of milliseconds to use as an HTTP throttle check timeout")
ignoreHTTPErrors := flag.Bool("ignore-http-errors", false, "ignore HTTP connection errors during throttle check")
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
Expand Down Expand Up @@ -297,7 +299,7 @@ func main() {
log.Infof("starting gh-ost %+v", AppVersion)
acceptSignals(migrationContext)

migrator := logic.NewMigrator(migrationContext)
migrator := logic.NewMigrator(migrationContext, AppVersion)
err := migrator.Migrate()
if err != nil {
migrator.ExecOnFailureHook()
Expand Down
6 changes: 4 additions & 2 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (

// Migrator is the main schema migration flow manager.
type Migrator struct {
appVersion string
parser *sql.AlterTableParser
inspector *Inspector
applier *Applier
Expand All @@ -87,8 +88,9 @@ type Migrator struct {
finishedMigrating int64
}

func NewMigrator(context *base.MigrationContext) *Migrator {
func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {
migrator := &Migrator{
appVersion: appVersion,
migrationContext: context,
parser: sql.NewAlterTableParser(),
ghostTableMigrated: make(chan bool),
Expand Down Expand Up @@ -1068,7 +1070,7 @@ func (this *Migrator) addDMLEventsListener() error {

// initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error {
this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector)
this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector, this.appVersion)

go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
this.migrationContext.Log.Infof("Waiting for first throttle metrics to be collected")
Expand Down
24 changes: 21 additions & 3 deletions go/logic/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package logic

import (
"context"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -42,16 +43,22 @@ const frenoMagicHint = "freno"
// Throttler collects metrics related to throttling and makes informed decision
// whether throttling should take place.
type Throttler struct {
appVersion string
migrationContext *base.MigrationContext
applier *Applier
httpClient *http.Client
httpClientTimeout time.Duration
inspector *Inspector
finishedMigrating int64
}

func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler {
func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector, appVersion string) *Throttler {
return &Throttler{
appVersion: appVersion,
migrationContext: migrationContext,
applier: applier,
httpClient: &http.Client{},
httpClientTimeout: time.Duration(migrationContext.ThrottleHTTPTimeoutMillis) * time.Millisecond,
inspector: inspector,
finishedMigrating: 0,
}
Expand Down Expand Up @@ -285,7 +292,17 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-
if url == "" {
return true, nil
}
resp, err := http.Head(url)

ctx, cancel := context.WithTimeout(context.Background(), this.httpClientTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
return false, err
}
req.Header.Set("User-Agent", fmt.Sprintf("gh-ost/%s", this.appVersion))

resp, err := this.httpClient.Do(req)
if err != nil {
return false, err
}
Expand All @@ -303,7 +320,8 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-

firstThrottlingCollected <- true

ticker := time.Tick(100 * time.Millisecond)
collectInterval := time.Duration(this.migrationContext.ThrottleHTTPIntervalMillis) * time.Millisecond
ticker := time.Tick(collectInterval)
for range ticker {
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return
Expand Down