Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keeper: improve db start logic #401

Merged
merged 1 commit into from
Jan 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions cmd/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorw("failed to start instance", zap.Error(err))
return
}
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}
pgParameters, err = pgm.GetConfigFilePGParameters()
if err != nil {
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
Expand All @@ -1018,6 +1022,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorw("failed to start instance", zap.Error(err))
return
}
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}
}

log.Infow("setting roles")
Expand Down Expand Up @@ -1083,7 +1091,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
}
// wait for the db having replyed all the wals
if err = pgm.WaitReady(cd.Cluster.DefSpec().SyncTimeout.Duration); err != nil {
log.Errorw("instance not ready", zap.Error(err))
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}

Expand Down Expand Up @@ -1179,7 +1187,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
return
}
if err = pgm.Start(); err != nil {
log.Errorw("err", zap.Error(err))
log.Errorw("failed to start instance", zap.Error(err))
return
}
started = true
Expand All @@ -1188,7 +1196,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
fullResync := false
// if not accepting connection assume that it's blocked waiting for missing wal
// (see above TODO), so do a full resync using pg_basebackup.
if err = pgm.Ping(); err != nil {
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("pg_rewinded standby is not accepting connection. it's probably waiting for unavailable wals. Forcing a full resync")
fullResync = true
} else {
Expand Down Expand Up @@ -1250,6 +1258,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorw("failed to start instance", zap.Error(err))
return
}
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}
pgParameters, err = pgm.GetConfigFilePGParameters()
if err != nil {
log.Errorw("failed to retrieve postgres parameters", zap.Error(err))
Expand All @@ -1263,6 +1275,10 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorw("failed to start instance", zap.Error(err))
return
}
if err = pgm.WaitReady(cluster.DefaultDBWaitReadyTimeout); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
return
}
}
log.Infow("updating our db UID with the cluster data provided db UID")
// replace our current db uid with the required one.
Expand Down
2 changes: 2 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
DefaultProxyCheckInterval = 5 * time.Second
DefaultProxyTimeoutInterval = 15 * time.Second

DefaultDBWaitReadyTimeout = 60 * time.Second

DefaultDBNotIncreasingXLogPosTimes = 10

DefaultSleepInterval = 5 * time.Second
Expand Down
98 changes: 79 additions & 19 deletions pkg/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"syscall"
"time"
Expand All @@ -38,6 +39,8 @@ import (
const (
postgresConf = "postgresql.conf"
tmpPostgresConf = "stolon-temp-postgresql.conf"

startTimeout = 60 * time.Second
)

var log = slog.S()
Expand Down Expand Up @@ -156,7 +159,7 @@ func (p *Manager) Init(initConfig *InitConfig) error {
cmd.Args = append(cmd.Args, "--data-checksums")
}

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err = cmd.Run(); err != nil {
Expand All @@ -183,7 +186,7 @@ func (p *Manager) Restore(command string) error {
cmd = exec.Command("/bin/sh", "-c", command)
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err = cmd.Run(); err != nil {
Expand All @@ -199,7 +202,7 @@ out:
return nil
}

func (p *Manager) StartTmpMerged(args ...string) error {
func (p *Manager) StartTmpMerged() error {
// start postgres with a conf file different then postgresql.conf so we don't have to touch it
f, err := os.Create(filepath.Join(p.dataDir, tmpPostgresConf))
if err != nil {
Expand Down Expand Up @@ -231,7 +234,7 @@ func (p *Manager) StartTmpMerged(args ...string) error {
if err := p.writePgHba(); err != nil {
return fmt.Errorf("error writing conf file: %v", err)
}
return p.start("-o", fmt.Sprintf("-c config_file=%s", f.Name()))
return p.start("-c", fmt.Sprintf("config_file=%s", f.Name()))
}

func (p *Manager) Start() error {
Expand All @@ -244,26 +247,83 @@ func (p *Manager) Start() error {
return p.start()
}

// start starts the instance. A success means that the instance has been
// successfully started BUT doesn't mean that the instance is ready to accept
// connections (i.e. it's waiting for some missing wals etc...).
// Note that also on error an instance may still be active and, if needed,
// should be manually stopped calling Stop.
func (p *Manager) start(args ...string) error {
// pg_ctl for postgres < 10 with -w will exit after the timeout and return 0
// also if the instance isn't ready to accept connections, while for
// postgres >= 10 it will return a non 0 exit code making it impossible to
// distinguish between problems starting an instance (i.e. wrong parameters)
// or an instance started but not ready to accept connections.
// To work with all the versions and since we want to distinguish between a
// failed start and a started but not ready instance we are forced to not
// use pg_ctl and write part of its logic here (I hate to do this).

// A difference between directly calling postgres instead of pg_ctl is that
// the instance parent is the keeper instead of the defined system reaper
// (since pg_ctl forks and then exits leaving the postmaster orphaned).

log.Infow("starting database")
name := filepath.Join(p.pgBinPath, "pg_ctl")
args = append([]string{"start", "-w", "--timeout", "60", "-D", p.dataDir, "-o", "-c unix_socket_directories=" + common.PgUnixSocketDirectories}, args...)
name := filepath.Join(p.pgBinPath, "postgres")
args = append([]string{"-D", p.dataDir, "-c", "unix_socket_directories=" + common.PgUnixSocketDirectories}, args...)
cmd := exec.Command(name, args...)
log.Debugw("execing cmd", "cmd", cmd)
//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {

if err := cmd.Start(); err != nil {
return fmt.Errorf("error: %v", err)
}

// execute child wait in a goroutine so we'll wait for it to exit without
// leaving zombie childs
exited := make(chan struct{})
go func() {
cmd.Wait()
close(exited)
}()

pid := cmd.Process.Pid

// Wait for the correct pid file to appear or for the process to exit
ok := false
start := time.Now()
for time.Now().Add(-startTimeout).Before(start) {
fh, err := os.Open(filepath.Join(p.dataDir, "postmaster.pid"))
if err == nil {
scanner := bufio.NewScanner(fh)
scanner.Split(bufio.ScanLines)
if scanner.Scan() {
fpid := scanner.Text()
if fpid == strconv.Itoa(pid) {
ok = true
fh.Close()
break
}
}
}
fh.Close()

select {
case <-exited:
return fmt.Errorf("postgres exited unexpectedly")
default:
}

time.Sleep(200 * time.Millisecond)
}

if !ok {
return fmt.Errorf("instance still starting")
}

p.UpdateCurParameters()
p.UpdateCurHba()

// pg_ctl with -w will exit after the timeout and return 0 also if the
// instance isn't accepting connection because already in recovery (usually
// waiting for wals during a pitr or a pg_rewind)
// so a start doesn't mean the instance is ready.
return nil
}

Expand All @@ -276,7 +336,7 @@ func (p *Manager) Stop(fast bool) error {
}
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand All @@ -287,7 +347,7 @@ func (p *Manager) Stop(fast bool) error {

func (p *Manager) IsStarted() (bool, error) {
name := filepath.Join(p.pgBinPath, "pg_ctl")
cmd := exec.Command(name, "status", "-w", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories)
cmd := exec.Command(name, "status", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories)
_, err := cmd.CombinedOutput()
if err != nil {
if _, ok := err.(*exec.ExitError); ok {
Expand All @@ -313,7 +373,7 @@ func (p *Manager) Reload() error {
cmd := exec.Command(name, "reload", "-D", p.dataDir, "-o", "-c unix_socket_directories="+common.PgUnixSocketDirectories)
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down Expand Up @@ -343,7 +403,7 @@ func (p *Manager) WaitReady(timeout time.Duration) error {
if err := p.Ping(); err == nil {
return nil
}
time.Sleep(1 * time.Second)
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for db ready")
}
Expand All @@ -354,7 +414,7 @@ func (p *Manager) Promote() error {
cmd := exec.Command(name, "promote", "-w", "-D", p.dataDir)
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down Expand Up @@ -707,7 +767,7 @@ func (p *Manager) SyncFromFollowedPGRewind(followedConnParams ConnParams, passwo
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSFILE=%s", pgpass.Name()))
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down Expand Up @@ -748,7 +808,7 @@ func (p *Manager) SyncFromFollowed(followedConnParams ConnParams) error {
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSFILE=%s", pgpass.Name()))
log.Debugw("execing cmd", "cmd", cmd)

//Pipe command's std[err|out] to parent.
// Pipe command's std[err|out] to parent.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down