Skip to content

Commit

Permalink
fix(cnpgi-plugins): add archive and backup capabilities fields to con…
Browse files Browse the repository at this point in the history
…figuration (cloudnative-pg#6593)

This patch provides the configuration of plugin capabilities for WAL archiving.

Ensure that one and only one plugin can be used for WAL Archiving if the in-tree
barman-cloud support is not used.

Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
Signed-off-by: Tao Li <tao.li@enterprisedb.com>
Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
Co-authored-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
Co-authored-by: Gabriele Quaresima <gabriele.quaresima@enterprisedb.com>
Co-authored-by: Tao Li <tao.li@enterprisedb.com>
Co-authored-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
(cherry picked from commit 7434655)
(cherry picked from commit 089c54e)
  • Loading branch information
armru authored and leonardoce committed Feb 24, 2025
1 parent f894f40 commit fec41a3
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .wordlist-en-custom.txt
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ VolumeSnapshotConfiguration
VolumeSnapshots
WAL
WAL's
WALArchiver
WALBackupConfiguration
WALCapabilities
WALs
Expand Down Expand Up @@ -848,6 +849,8 @@ ip
ipcs
ips
isPrimary
isTemplate
isWALArchiver
issuecomment
italy
jdbc
Expand Down
12 changes: 12 additions & 0 deletions api/v1/cluster_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,3 +1420,15 @@ func (p *Probe) ApplyInto(k8sProbe *corev1.Probe) {
k8sProbe.TerminationGracePeriodSeconds = p.TerminationGracePeriodSeconds
}
}

// GetEnabledWALArchivePluginName returns the name of the enabled backup plugin or an empty string
// if no backup plugin is enabled
func (cluster *Cluster) GetEnabledWALArchivePluginName() string {
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsWALArchiver != nil && *plugin.IsWALArchiver {
return plugin.Name
}
}

return ""
}
6 changes: 6 additions & 0 deletions api/v1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,12 @@ type PluginConfiguration struct {
// Name is the plugin name
Name string `json:"name"`

// Only one plugin can be declared as WALArchiver.
// Cannot be active if ".spec.backup.barmanObjectStore" configuration is present.
// +kubebuilder:default:=false
// +optional
IsWALArchiver *bool `json:"isWALArchiver,omitempty"`

// Parameters is the configuration of the plugin
// +optional
Parameters map[string]string `json:"parameters,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/postgresql.cnpg.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3889,6 +3889,12 @@ spec:
PluginConfiguration specifies a plugin that need to be loaded for this
cluster to be reconciled
properties:
isWALArchiver:
default: false
description: |-
Only one plugin can be declared as WALArchiver.
Cannot be active if ".spec.backup.barmanObjectStore" configuration is present.
type: boolean
name:
description: Name is the plugin name
type: string
Expand Down
8 changes: 8 additions & 0 deletions docs/src/cloudnative-pg.v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -3268,6 +3268,14 @@ cluster to be reconciled</p>
<p>Name is the plugin name</p>
</td>
</tr>
<tr><td><code>isWALArchiver</code><br/>
<i>bool</i>
</td>
<td>
<p>Only one plugin can be declared as WALArchiver.
Cannot be active if &quot;.spec.backup.barmanObjectStore&quot; configuration is present.</p>
</td>
</tr>
<tr><td><code>parameters</code><br/>
<i>map[string]string</i>
</td>
Expand Down
18 changes: 17 additions & 1 deletion internal/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewBackupReconciler(
// +kubebuilder:rbac:groups="",resources=pods,verbs=get

// Reconcile is the main reconciliation loop
// nolint: gocognit
// nolint: gocognit,gocyclo
func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
contextLogger, ctx := log.SetupLogger(ctx)
contextLogger.Debug(fmt.Sprintf("reconciling object %#q", req.NamespacedName))
Expand Down Expand Up @@ -135,6 +135,22 @@ func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, nil
}

if backup.Spec.Method == apiv1.BackupMethodPlugin && len(cluster.Spec.Plugins) == 0 {
message := "cannot proceed with the backup as the cluster has no plugin configured"
contextLogger.Warning(message)
r.Recorder.Event(&backup, "Warning", "ClusterHasNoBackupExecutorPlugin", message)
tryFlagBackupAsFailed(ctx, r.Client, &backup, errors.New(message))
return ctrl.Result{}, nil
}

if backup.Spec.Method != apiv1.BackupMethodPlugin && cluster.Spec.Backup == nil {
message := "cannot proceed with the backup as the cluster has no backup section"
contextLogger.Warning(message)
r.Recorder.Event(&backup, "Warning", "ClusterHasBackupConfigured", message)
tryFlagBackupAsFailed(ctx, r.Client, &backup, errors.New(message))
return ctrl.Result{}, nil
}

// Load the required plugins
pluginClient, err := cnpgiClient.WithPlugins(
ctx,
Expand Down
34 changes: 34 additions & 0 deletions internal/webhook/v1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (v *ClusterCustomValidator) validate(r *apiv1.Cluster) (allErrs field.Error
v.validateResources,
v.validateHibernationAnnotation,
v.validatePromotionToken,
v.validatePluginConfiguration,
}

for _, validate := range validations {
Expand Down Expand Up @@ -2349,3 +2350,36 @@ func (v *ClusterCustomValidator) validateHibernationAnnotation(r *apiv1.Cluster)
),
}
}

func (v *ClusterCustomValidator) validatePluginConfiguration(r *apiv1.Cluster) field.ErrorList {
if len(r.Spec.Plugins) == 0 {
return nil
}
isBarmanObjectStoreConfigured := r.Spec.Backup != nil && r.Spec.Backup.BarmanObjectStore != nil
var walArchiverEnabled []string

for _, plugin := range r.Spec.Plugins {
if plugin.IsWALArchiver != nil && *plugin.IsWALArchiver {
walArchiverEnabled = append(walArchiverEnabled, plugin.Name)
}
}

var errorList field.ErrorList
if isBarmanObjectStoreConfigured {
if len(walArchiverEnabled) > 0 {
errorList = append(errorList, field.Invalid(
field.NewPath("spec", "plugins"),
walArchiverEnabled,
"Cannot enable a WAL archiver plugin when barmanObjectStore is configured"))
}
}

if len(walArchiverEnabled) > 1 {
errorList = append(errorList, field.Invalid(
field.NewPath("spec", "plugins"),
walArchiverEnabled,
"Cannot enable more than one WAL archiver plugin"))
}

return errorList
}
49 changes: 49 additions & 0 deletions internal/webhook/v1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4864,3 +4864,52 @@ var _ = Describe("ServiceTemplate Validation", func() {
})
})
})

var _ = Describe("validatePluginConfiguration", func() {
var v *ClusterCustomValidator
var cluster *apiv1.Cluster
walPlugin1 := apiv1.PluginConfiguration{
Name: "walArchiverPlugin1",
IsWALArchiver: ptr.To(true),
}
walPlugin2 := apiv1.PluginConfiguration{
Name: "walArchiverPlugin2",
IsWALArchiver: ptr.To(true),
}

BeforeEach(func() {
v = &ClusterCustomValidator{}
cluster = &apiv1.Cluster{
Spec: apiv1.ClusterSpec{
Plugins: []apiv1.PluginConfiguration{},
},
}
})

It("returns no errors if no plugins are enabled", func() {
Expect(v.validatePluginConfiguration(cluster)).To(BeNil())
})

It("returns an error if a WAL archiver plugin is enabled when barmanObjectStore is configured", func() {
cluster.Spec.Backup = &apiv1.BackupConfiguration{
BarmanObjectStore: &apiv1.BarmanObjectStoreConfiguration{},
}
cluster.Spec.Plugins = append(cluster.Spec.Plugins, walPlugin1)
errs := v.validatePluginConfiguration(cluster)
Expect(errs).To(HaveLen(1))
Expect(errs[0].Error()).To(ContainSubstring(
"Cannot enable a WAL archiver plugin when barmanObjectStore is configured"))
})

It("returns an error if more than one WAL archiver plugin is enabled", func() {
cluster.Spec.Plugins = append(cluster.Spec.Plugins, walPlugin1, walPlugin2)
errs := v.validatePluginConfiguration(cluster)
Expect(errs).To(HaveLen(1))
Expect(errs[0].Error()).To(ContainSubstring("Cannot enable more than one WAL archiver plugin"))
})

It("returns no errors when WAL archiver is enabled", func() {
cluster.Spec.Plugins = append(cluster.Spec.Plugins, walPlugin1)
Expect(v.validatePluginConfiguration(cluster)).To(BeNil())
})
})
18 changes: 16 additions & 2 deletions pkg/management/postgres/archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,19 @@ func internalRun(
contextLog := log.FromContext(ctx)
startTime := time.Now()

// Request the plugins to archive this WAL
// We allow plugins to archive WALs even if there is no plugin
// directly enabled by the user, to retain compatibility with
// the old API.
if err := archiveWALViaPlugins(ctx, cluster, path.Join(pgData, walName)); err != nil {
return err
}

// If the used chosen a plugin to do WAL archiving, we don't
// trigger the legacy archiving process.
if cluster.GetEnabledWALArchivePluginName() != "" {
return nil
}

// Request Barman Cloud to archive this WAL
if cluster.Spec.Backup == nil || cluster.Spec.Backup.BarmanObjectStore == nil {
// Backup not configured, skipping WAL
Expand Down Expand Up @@ -266,11 +274,17 @@ func archiveWALViaPlugins(
availablePluginNamesSet := stringset.From(availablePluginNames)
enabledPluginNamesSet := stringset.From(
apiv1.GetPluginConfigurationEnabledPluginNames(cluster.Spec.Plugins))
availableAndEnabled := stringset.From(availablePluginNamesSet.Intersect(enabledPluginNamesSet).ToList())

enabledArchiverPluginName := cluster.GetEnabledWALArchivePluginName()
if enabledArchiverPluginName != "" && !availableAndEnabled.Has(enabledArchiverPluginName) {
return fmt.Errorf("wal archive plugin is not available: %s", enabledArchiverPluginName)
}

client, err := pluginClient.WithPlugins(
ctx,
plugins,
availablePluginNamesSet.Intersect(enabledPluginNamesSet).ToList()...,
availableAndEnabled.ToList()...,
)
if err != nil {
contextLogger.Error(err, "Error while loading required plugins")
Expand Down
21 changes: 19 additions & 2 deletions pkg/management/postgres/webserver/plugin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package webserver

import (
"context"
"fmt"
"time"

"github.com/cloudnative-pg/machinery/pkg/log"
pgTime "github.com/cloudnative-pg/machinery/pkg/postgres/time"
"github.com/cloudnative-pg/machinery/pkg/stringset"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -85,15 +87,30 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) {
"backupNamespace", b.Backup.Name)

plugins := repository.New()
if _, err := plugins.RegisterUnixSocketPluginsInPath(configuration.Current.PluginSocketDir); err != nil {
availablePlugins, err := plugins.RegisterUnixSocketPluginsInPath(configuration.Current.PluginSocketDir)
if err != nil {
contextLogger.Error(err, "Error while discovering plugins")
}
defer plugins.Close()

availablePluginNamesSet := stringset.From(availablePlugins)

enabledPluginNamesSet := stringset.From(
apiv1.GetPluginConfigurationEnabledPluginNames(b.Cluster.Spec.Plugins))
availableAndEnabled := stringset.From(availablePluginNamesSet.Intersect(enabledPluginNamesSet).ToList())

if !availableAndEnabled.Has(b.Backup.Spec.PluginConfiguration.Name) {
b.markBackupAsFailed(
ctx,
fmt.Errorf("requested plugin is not available: %s", b.Backup.Spec.PluginConfiguration.Name),
)
return
}

cli, err := pluginClient.WithPlugins(
ctx,
plugins,
apiv1.GetPluginConfigurationEnabledPluginNames(b.Cluster.Spec.Plugins)...,
availableAndEnabled.ToList()...,
)
if err != nil {
b.markBackupAsFailed(ctx, err)
Expand Down

0 comments on commit fec41a3

Please sign in to comment.