Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add support for PostgreSQL 12 #727

Merged
merged 1 commit into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .agola/config.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ local ci_runtime(pgversion, arch) = {
arch: arch,
containers: [
{
image: 'sorintlab/stolon-ci-image:v0.1.0-pg' + pgversion,
image: 'sorintlab/stolon-ci-image:v0.2.0-pg' + pgversion,
volumes: [
{
path: '/stolontemp',
Expand Down Expand Up @@ -172,15 +172,15 @@ local task_build_push_images(name, pgversions, istag, push) =
task_integration_tests(store, pgversion, 'amd64'),
]
for store in ['etcdv2', 'consul']
for pgversion in ['11' /*, '12' */]
for pgversion in ['12']
]) + std.flattenArrays([
[
task_integration_tests(store, pgversion, 'amd64'),
]
for store in ['etcdv3']
for pgversion in ['9.5', '9.6', '10', '11' /*, '12' */]
for pgversion in ['9.5', '9.6', '10', '11', '12']
]) + [
task_build_push_images('test build docker "stolon" images', '9.4 9.5 9.6 10 11', false, false)
task_build_push_images('test build docker "stolon" images', '9.4 9.5 9.6 10 11 12', false, false)
+ {
when: {
branch: {
Expand All @@ -190,13 +190,13 @@ local task_build_push_images(name, pgversions, istag, push) =
ref: '#refs/pull/\\d+/head#',
},
},
task_build_push_images('build and push docker "stolon" master branch images', '9.4 9.5 9.6 10 11', false, true)
task_build_push_images('build and push docker "stolon" master branch images', '9.4 9.5 9.6 10 11 12', false, true)
+ {
when: {
branch: 'master',
},
},
task_build_push_images('build and push docker "stolon" tag images', '9.4 9.5 9.6 10 11', true, true)
task_build_push_images('build and push docker "stolon" tag images', '9.4 9.5 9.6 10 11 12', true, true)
+ {
when: {
tag: '#v.*#',
Expand Down
66 changes: 40 additions & 26 deletions cmd/keeper/cmd/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@ var managedPGParameters = []string{
"max_wal_senders",
"wal_log_hints",
"synchronous_standby_names",

// parameters moved from recovery.conf to postgresql.conf in PostgresSQL 12
"primary_conninfo",
"primary_slot_name",
"recovery_min_apply_delay",
"restore_command",
"recovery_target_timeline",
"recovery_target",
"recovery_target_lsn",
"recovery_target_name",
"recovery_target_time",
"recovery_target_xid",
"recovery_target_timeline",
"recovery_target_action",
}

func readPasswordFromFile(filepath string) (string, error) {
Expand Down Expand Up @@ -370,13 +384,9 @@ func (p *PostgresKeeper) createPGParameters(db *cluster.DB) common.Parameters {
return parameters
}

func (p *PostgresKeeper) createRecoveryParameters(standbyMode bool, standbySettings *cluster.StandbySettings, archiveRecoverySettings *cluster.ArchiveRecoverySettings, recoveryTargetSettings *cluster.RecoveryTargetSettings) common.Parameters {
func (p *PostgresKeeper) createRecoveryOptions(recoveryMode postgresql.RecoveryMode, standbySettings *cluster.StandbySettings, archiveRecoverySettings *cluster.ArchiveRecoverySettings, recoveryTargetSettings *cluster.RecoveryTargetSettings) *postgresql.RecoveryOptions {
parameters := common.Parameters{}

if standbyMode {
parameters["standby_mode"] = "on"
}

if standbySettings != nil {
if standbySettings.PrimaryConninfo != "" {
parameters["primary_conninfo"] = standbySettings.PrimaryConninfo
Expand Down Expand Up @@ -417,7 +427,10 @@ func (p *PostgresKeeper) createRecoveryParameters(standbyMode bool, standbySetti
parameters["recovery_target_action"] = "promote"
}

return parameters
return &postgresql.RecoveryOptions{
RecoveryMode: recoveryMode,
RecoveryParameters: parameters,
}
}

type PostgresKeeper struct {
Expand Down Expand Up @@ -847,7 +860,7 @@ func (p *PostgresKeeper) resync(db, masterDB, followedDB *cluster.DB, tryPgrewin
// log pg_rewind error and fallback to pg_basebackup
log.Errorw("error syncing with pg_rewind", zap.Error(err))
} else {
pgm.SetRecoveryParameters(p.createRecoveryParameters(true, standbySettings, nil, nil))
pgm.SetRecoveryOptions(p.createRecoveryOptions(postgresql.RecoveryModeStandby, standbySettings, nil, nil))
return nil
}
}
Expand Down Expand Up @@ -876,7 +889,7 @@ func (p *PostgresKeeper) resync(db, masterDB, followedDB *cluster.DB, tryPgrewin
}
log.Infow("sync succeeded")

pgm.SetRecoveryParameters(p.createRecoveryParameters(true, standbySettings, nil, nil))
pgm.SetRecoveryOptions(p.createRecoveryOptions(postgresql.RecoveryModeStandby, standbySettings, nil, nil))

return nil
}
Expand Down Expand Up @@ -1086,7 +1099,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
}
log.Infow("current db UID different than cluster data db UID", "db", p.dbLocalState.UID, "cdDB", db.UID)

pgm.SetRecoveryParameters(nil)
pgm.SetRecoveryOptions(nil)
p.waitSyncStandbysSynced = false

switch db.Spec.InitMode {
Expand Down Expand Up @@ -1192,27 +1205,28 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
return
}

standbyMode := false
recoveryMode := postgresql.RecoveryModeRecovery
var standbySettings *cluster.StandbySettings
if db.Spec.FollowConfig != nil && db.Spec.FollowConfig.Type == cluster.FollowTypeExternal {
standbyMode = true
recoveryMode = postgresql.RecoveryModeStandby
standbySettings = db.Spec.FollowConfig.StandbySettings
}

// if we are initializing a standby cluster then enable standby_mode to not stop recovery
pgm.SetRecoveryParameters(p.createRecoveryParameters(standbyMode, standbySettings, db.Spec.PITRConfig.ArchiveRecoverySettings, db.Spec.PITRConfig.RecoveryTargetSettings))
pgm.SetRecoveryOptions(p.createRecoveryOptions(recoveryMode, standbySettings, db.Spec.PITRConfig.ArchiveRecoverySettings, db.Spec.PITRConfig.RecoveryTargetSettings))

if err = pgm.StartTmpMerged(); err != nil {
log.Errorw("failed to start instance", zap.Error(err))
return
}

if !standbyMode {
if recoveryMode == postgresql.RecoveryModeRecovery {
// wait for the db having replyed all the wals
log.Infof("waiting for recovery to be completed")
if err = pgm.WaitRecoveryDone(cd.Cluster.DefSpec().SyncTimeout.Duration); err != nil {
log.Errorw("recovery not finished", zap.Error(err))
return
}
log.Infof("recovery completed")
}
if err = pgm.WaitReady(cd.Cluster.DefSpec().SyncTimeout.Duration); err != nil {
log.Errorw("timeout waiting for instance to be ready", zap.Error(err))
Expand Down Expand Up @@ -1476,7 +1490,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {

if localRole == common.RoleStandby {
log.Infow("promoting to master")
pgm.SetRecoveryParameters(nil)
pgm.SetRecoveryOptions(nil)
if err = pgm.Promote(); err != nil {
log.Errorw("failed to promote instance", zap.Error(err))
return
Expand Down Expand Up @@ -1522,7 +1536,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
return
}
if !started {
pgm.SetRecoveryParameters(p.createRecoveryParameters(true, standbySettings, nil, nil))
pgm.SetRecoveryOptions(p.createRecoveryOptions(postgresql.RecoveryModeStandby, standbySettings, nil, nil))
if err = pgm.Start(); err != nil {
log.Errorw("failed to start postgres", zap.Error(err))
return
Expand All @@ -1543,13 +1557,13 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {

standbySettings := &cluster.StandbySettings{PrimaryConninfo: newReplConnParams.ConnString(), PrimarySlotName: common.StolonName(db.UID)}

curRecoveryParameters := pgm.CurRecoveryParameters()
newRecoveryParameters := p.createRecoveryParameters(true, standbySettings, nil, nil)
curRecoveryOptions := pgm.CurRecoveryOptions()
newRecoveryOptions := p.createRecoveryOptions(postgresql.RecoveryModeStandby, standbySettings, nil, nil)

// Update recovery conf if parameters has changed
if !curRecoveryParameters.Equals(newRecoveryParameters) {
log.Infow("recovery parameters changed, restarting postgres instance", "curRecoveryParameters", curRecoveryParameters, "newRecoveryParameters", newRecoveryParameters)
pgm.SetRecoveryParameters(newRecoveryParameters)
if !curRecoveryOptions.RecoveryParameters.Equals(newRecoveryOptions.RecoveryParameters) {
log.Infow("recovery parameters changed, restarting postgres instance", "curRecoveryParameters", curRecoveryOptions.RecoveryParameters, "newRecoveryParameters", newRecoveryOptions.RecoveryParameters)
pgm.SetRecoveryOptions(newRecoveryOptions)

if err = pgm.Restart(true); err != nil {
log.Errorw("failed to restart postgres instance", zap.Error(err))
Expand All @@ -1562,13 +1576,13 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
}

case cluster.FollowTypeExternal:
curRecoveryParameters := pgm.CurRecoveryParameters()
newRecoveryParameters := p.createRecoveryParameters(true, db.Spec.FollowConfig.StandbySettings, db.Spec.FollowConfig.ArchiveRecoverySettings, nil)
curRecoveryOptions := pgm.CurRecoveryOptions()
newRecoveryOptions := p.createRecoveryOptions(postgresql.RecoveryModeStandby, db.Spec.FollowConfig.StandbySettings, db.Spec.FollowConfig.ArchiveRecoverySettings, nil)

// Update recovery conf if parameters has changed
if !curRecoveryParameters.Equals(newRecoveryParameters) {
log.Infow("recovery parameters changed, restarting postgres instance", "curRecoveryParameters", curRecoveryParameters, "newRecoveryParameters", newRecoveryParameters)
pgm.SetRecoveryParameters(newRecoveryParameters)
if !curRecoveryOptions.RecoveryParameters.Equals(newRecoveryOptions.RecoveryParameters) {
log.Infow("recovery parameters changed, restarting postgres instance", "curRecoveryParameters", curRecoveryOptions.RecoveryParameters, "newRecoveryParameters", newRecoveryOptions.RecoveryParameters)
pgm.SetRecoveryOptions(newRecoveryOptions)

if err = pgm.Restart(true); err != nil {
log.Errorw("failed to restart postgres instance", zap.Error(err))
Expand Down
Loading