Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 15 additions & 52 deletions cmd/postgres-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,23 @@ func main() {
must(manager.Add(registrar))
token, _ := registrar.CheckToken()

bridgeURL := os.Getenv("PGO_BRIDGE_URL")
bridgeClient := func() *bridge.Client {
client := bridge.NewClient(bridgeURL, versionString)
client.Transport = otelTransportWrapper()(http.DefaultTransport)
return client
}

// add all PostgreSQL Operator controllers to the runtime manager
addControllersToManager(manager, log, registrar)
must(pgupgrade.ManagedReconciler(manager, registrar))
must(standalone_pgadmin.ManagedReconciler(manager))
must(crunchybridgecluster.ManagedReconciler(manager, func() bridge.ClientInterface {
return bridgeClient()
}))

if features.Enabled(feature.BridgeIdentifiers) {
url := os.Getenv("PGO_BRIDGE_URL")
constructor := func() *bridge.Client {
client := bridge.NewClient(url, versionString)
client.Transport = otelTransportWrapper()(http.DefaultTransport)
return client
}

must(bridge.ManagedInstallationReconciler(manager, constructor))
must(bridge.ManagedInstallationReconciler(manager, bridgeClient))
}

// Enable upgrade checking
Expand Down Expand Up @@ -307,55 +312,13 @@ func main() {
func addControllersToManager(mgr runtime.Manager, log logging.Logger, reg registration.Registration) {
pgReconciler := &postgrescluster.Reconciler{
Client: mgr.GetClient(),
Owner: postgrescluster.ControllerName,
Recorder: mgr.GetEventRecorderFor(postgrescluster.ControllerName),
Owner: naming.ControllerPostgresCluster,
Recorder: mgr.GetEventRecorderFor(naming.ControllerPostgresCluster),
Registration: reg,
}

if err := pgReconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create PostgresCluster controller")
os.Exit(1)
}

upgradeReconciler := &pgupgrade.PGUpgradeReconciler{
Client: mgr.GetClient(),
Owner: "pgupgrade-controller",
Recorder: mgr.GetEventRecorderFor("pgupgrade-controller"),
Registration: reg,
}

if err := upgradeReconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create PGUpgrade controller")
os.Exit(1)
}

pgAdminReconciler := &standalone_pgadmin.PGAdminReconciler{
Client: mgr.GetClient(),
Owner: "pgadmin-controller",
Recorder: mgr.GetEventRecorderFor(naming.ControllerPGAdmin),
}

if err := pgAdminReconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create PGAdmin controller")
os.Exit(1)
}

constructor := func() bridge.ClientInterface {
client := bridge.NewClient(os.Getenv("PGO_BRIDGE_URL"), versionString)
client.Transport = otelTransportWrapper()(http.DefaultTransport)
return client
}

crunchyBridgeClusterReconciler := &crunchybridgecluster.CrunchyBridgeClusterReconciler{
Client: mgr.GetClient(),
Owner: "crunchybridgecluster-controller",
// TODO(crunchybridgecluster): recorder?
// Recorder: mgr.GetEventRecorderFor(naming...),
NewClient: constructor,
}

if err := crunchyBridgeClusterReconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create CrunchyBridgeCluster controller")
os.Exit(1)
}
}
20 changes: 3 additions & 17 deletions internal/bridge/crunchybridgecluster/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// patch sends patch to object's endpoint in the Kubernetes API and updates
// object with any returned content. The fieldManager is set to r.Owner, but
// can be overridden in options.
// - https://docs.k8s.io/reference/using-api/server-side-apply/#managers
//
// NOTE: This function is duplicated from a version in the postgrescluster package
func (r *CrunchyBridgeClusterReconciler) patch(
ctx context.Context, object client.Object,
patch client.Patch, options ...client.PatchOption,
) error {
options = append([]client.PatchOption{r.Owner}, options...)
return r.Patch(ctx, object, patch, options...)
}

// apply sends an apply patch to object's endpoint in the Kubernetes API and
// updates object with any returned content. The fieldManager is set to
// r.Owner and the force parameter is true.
// updates object with any returned content. The fieldManager is set by
// r.Writer and the force parameter is true.
// - https://docs.k8s.io/reference/using-api/server-side-apply/#managers
// - https://docs.k8s.io/reference/using-api/server-side-apply/#conflicts
//
Expand All @@ -40,7 +26,7 @@ func (r *CrunchyBridgeClusterReconciler) apply(ctx context.Context, object clien

// Send the apply-patch with force=true.
if err == nil {
err = r.patch(ctx, object, apply, client.ForceOwnership)
err = r.Writer.Patch(ctx, object, apply, client.ForceOwnership)
}

return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,38 @@ import (

// CrunchyBridgeClusterReconciler reconciles a CrunchyBridgeCluster object
type CrunchyBridgeClusterReconciler struct {
client.Client

Owner client.FieldOwner

// For this iteration, we will only be setting conditions rather than
// setting conditions and emitting events. That may change in the future,
// so we're leaving this EventRecorder here for now.
// record.EventRecorder

// NewClient is called each time a new Client is needed.
// NewClient is called each time a new bridge.Client is needed.
NewClient func() bridge.ClientInterface

Reader interface {
Get(context.Context, client.ObjectKey, client.Object, ...client.GetOption) error
List(context.Context, client.ObjectList, ...client.ListOption) error
}
Writer interface {
Delete(context.Context, client.Object, ...client.DeleteOption) error
Patch(context.Context, client.Object, client.Patch, ...client.PatchOption) error
Update(context.Context, client.Object, ...client.UpdateOption) error
}
StatusWriter interface {
Patch(context.Context, client.Object, client.Patch, ...client.SubResourcePatchOption) error
}
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={list,watch}
//+kubebuilder:rbac:groups="",resources="secrets",verbs={list,watch}

// SetupWithManager sets up the controller with the Manager.
func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
mgr ctrl.Manager,
) error {
return ctrl.NewControllerManagedBy(mgr).
// ManagedReconciler creates a [CrunchyBridgeClusterReconciler] and adds it to m.
func ManagedReconciler(m ctrl.Manager, newClient func() bridge.ClientInterface) error {
kubernetes := client.WithFieldOwner(m.GetClient(), naming.ControllerCrunchyBridgeCluster)

reconciler := &CrunchyBridgeClusterReconciler{
NewClient: newClient,
Reader: kubernetes,
StatusWriter: kubernetes.Status(),
Writer: kubernetes,
}

return ctrl.NewControllerManagedBy(m).
For(&v1beta1.CrunchyBridgeCluster{}).
Owns(&corev1.Secret{}).
// Wake periodically to check Bridge API for all CrunchyBridgeClusters.
Expand All @@ -63,7 +74,7 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
runtime.NewTickerImmediate(5*time.Minute, event.GenericEvent{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, _ client.Object) []ctrl.Request {
var list v1beta1.CrunchyBridgeClusterList
_ = r.List(ctx, &list)
_ = reconciler.Reader.List(ctx, &list)
return runtime.Requests(initialize.Pointers(list.Items...)...)
}),
),
Expand All @@ -72,10 +83,10 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
Watches(
&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, secret client.Object) []ctrl.Request {
return runtime.Requests(r.findCrunchyBridgeClustersForSecret(ctx, client.ObjectKeyFromObject(secret))...)
return runtime.Requests(reconciler.findCrunchyBridgeClustersForSecret(ctx, client.ObjectKeyFromObject(secret))...)
}),
).
Complete(r)
Complete(reconciler)
}

// The owner reference created by controllerutil.SetControllerReference blocks
Expand All @@ -91,7 +102,7 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
func (r *CrunchyBridgeClusterReconciler) setControllerReference(
owner *v1beta1.CrunchyBridgeCluster, controlled client.Object,
) error {
return controllerutil.SetControllerReference(owner, controlled, r.Scheme())
return controllerutil.SetControllerReference(owner, controlled, runtime.Scheme)
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={get,patch,update}
Expand All @@ -113,14 +124,14 @@ func (r *CrunchyBridgeClusterReconciler) Reconcile(ctx context.Context, req ctrl
// copy before returning from its cache.
// - https://github.com/kubernetes-sigs/controller-runtime/issues/1235
crunchybridgecluster := &v1beta1.CrunchyBridgeCluster{}
err := r.Get(ctx, req.NamespacedName, crunchybridgecluster)
err := r.Reader.Get(ctx, req.NamespacedName, crunchybridgecluster)

if err == nil {
// Write any changes to the crunchybridgecluster status on the way out.
before := crunchybridgecluster.DeepCopy()
defer func() {
if !equality.Semantic.DeepEqual(before.Status, crunchybridgecluster.Status) {
status := r.Status().Patch(ctx, crunchybridgecluster, client.MergeFrom(before), r.Owner)
status := r.StatusWriter.Patch(ctx, crunchybridgecluster, client.MergeFrom(before))

if err == nil && status != nil {
err = status
Expand Down Expand Up @@ -684,7 +695,7 @@ func (r *CrunchyBridgeClusterReconciler) GetSecretKeys(
}}

err := errors.WithStack(
r.Get(ctx, client.ObjectKeyFromObject(existing), existing))
r.Reader.Get(ctx, client.ObjectKeyFromObject(existing), existing))

if err == nil {
if existing.Data["key"] != nil && existing.Data["team"] != nil {
Expand All @@ -707,7 +718,7 @@ func (r *CrunchyBridgeClusterReconciler) deleteControlled(
version := object.GetResourceVersion()
exactly := client.Preconditions{UID: &uid, ResourceVersion: &version}

return r.Delete(ctx, object, exactly)
return r.Writer.Delete(ctx, object, exactly)
}

return nil
Expand Down
Loading
Loading