Skip to content

Commit

Permalink
Fix the case where the process group gets removed without the address…
Browse files Browse the repository at this point in the history
…es being included (#2147)

* Fix the case where the process group gets removed without the addresses being included
  • Loading branch information
johscheuer authored Oct 18, 2024
1 parent 232da61 commit e5fd090
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 44 deletions.
46 changes: 29 additions & 17 deletions controllers/remove_process_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (u removeProcessGroups) reconcile(ctx context.Context, r *FoundationDBClust
removedProcessGroups := r.removeProcessGroups(ctx, logger, cluster, zoneRemovals, zonedRemovals[removals.TerminatingZone])
err = includeProcessGroup(ctx, logger, r, cluster, removedProcessGroups, status, adminClient)
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
// If the inclusion is blocked or another issues happened we will retry in 60 seconds.
return &requeue{curError: err, delayedRequeue: true, delay: 60 * time.Second}
}

return nil
Expand Down Expand Up @@ -214,7 +215,7 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
return false, false, nil
}

// Pod is in terminating state so we don't want to block but we also don't want to include it
// Pod is in terminating state so we don't want to block, but we also don't want to include it
canBeIncluded = false
}

Expand All @@ -231,7 +232,7 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
return false, false, nil
}

// PVC is in terminating state so we don't want to block but we also don't want to include it
// PVC is in terminating state so we don't want to block, but we also don't want to include it
canBeIncluded = false
} else if len(pvcs.Items) > 1 {
return false, false, fmt.Errorf("multiple PVCs found for cluster %s, processGroupID %s", cluster.Name, processGroup.ProcessGroupID)
Expand All @@ -251,20 +252,27 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
return false, false, nil
}

// Service is in terminating state so we don't want to block but we also don't want to include it
// Service is in terminating state so we don't want to block, but we also don't want to include it
canBeIncluded = false
}

return true, canBeIncluded, nil
}

func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool, status *fdbv1beta2.FoundationDBStatus, adminClient fdbadminclient.AdminClient) error {
fdbProcessesToInclude, err := getProcessesToInclude(logger, cluster, removedProcessGroups, status)
fdbProcessesToInclude, newProcessGroups, err := getProcessesToInclude(logger, cluster, removedProcessGroups, status)
if err != nil {
return err
}

if len(fdbProcessesToInclude) == 0 {
// In case that the operator was removing a process group without exclusion.
// We can update the process groups at this stage, as no other processes must be included.
if len(cluster.Status.ProcessGroups) != len(newProcessGroups) {
cluster.Status.ProcessGroups = newProcessGroups
return r.updateOrApply(ctx, cluster)
}

return nil
}

Expand Down Expand Up @@ -293,59 +301,63 @@ func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationD
return err
}

// Reset the SecondsSinceLastRecovered sine the operator just included some processes, which will cause a recovery.
// Reset the SecondsSinceLastRecovered since the operator just included some processes, which will cause a recovery.
status.Cluster.RecoveryState.SecondsSinceLastRecovered = 0.0
// Update the process group list and remove all removed and included process groups.
cluster.Status.ProcessGroups = newProcessGroups

return r.updateOrApply(ctx, cluster)
}

func getProcessesToInclude(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool, status *fdbv1beta2.FoundationDBStatus) ([]fdbv1beta2.ProcessAddress, error) {
func getProcessesToInclude(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool, status *fdbv1beta2.FoundationDBStatus) ([]fdbv1beta2.ProcessAddress, []*fdbv1beta2.ProcessGroupStatus, error) {
fdbProcessesToInclude := make([]fdbv1beta2.ProcessAddress, 0)

if len(removedProcessGroups) == 0 {
return fdbProcessesToInclude, nil
return fdbProcessesToInclude, nil, nil
}

excludedServers, err := fdbstatus.GetExclusions(status)
if err != nil {
return fdbProcessesToInclude, fmt.Errorf("unable to get excluded servers from status, %w", err)
return fdbProcessesToInclude, nil, fmt.Errorf("unable to get excluded servers from status, %w", err)
}
excludedServersMap := make(map[string]fdbv1beta2.None, len(excludedServers))
for _, excludedServer := range excludedServers {
excludedServersMap[excludedServer.String()] = fdbv1beta2.None{}
}

processGroups := cluster.Status.DeepCopy().ProcessGroups
idx := 0
for _, processGroup := range cluster.Status.ProcessGroups {
for _, processGroup := range processGroups {
if processGroup.IsMarkedForRemoval() && removedProcessGroups[processGroup.ProcessGroupID] {
foundInExcludedServerList := false
exclusionString := processGroup.GetExclusionString()
if _, ok := excludedServersMap[exclusionString]; ok {
fdbProcessesToInclude = append(fdbProcessesToInclude, fdbv1beta2.ProcessAddress{StringAddress: exclusionString})
foundInExcludedServerList = true
}

for _, pAddr := range processGroup.Addresses {
if _, ok := excludedServersMap[pAddr]; ok {
fdbProcessesToInclude = append(fdbProcessesToInclude, fdbv1beta2.ProcessAddress{IPAddress: net.ParseIP(pAddr)})
foundInExcludedServerList = true
}
}
if !foundInExcludedServerList {

if !foundInExcludedServerList && !processGroup.ExclusionSkipped {
// This means that the process is marked for exclusion and is also removed in the previous step but is missing
// its entry in the excluded servers in the status. This should not throw an error as this will block the
// inclusion for other processes, but we should have a record of this event happening in the logs.
logger.Info("processGroup is included but is missing from excluded server list", "processGroup", processGroup)
logger.Info("processGroup should be included but is missing from excluded server list", "processGroup", processGroup)
}

continue
}
cluster.Status.ProcessGroups[idx] = processGroup

processGroups[idx] = processGroup
idx++
}

// Remove the trailing duplicates.
cluster.Status.ProcessGroups = cluster.Status.ProcessGroups[:idx]

return fdbProcessesToInclude, nil
return fdbProcessesToInclude, processGroups[:idx], nil
}

func (r *FoundationDBClusterReconciler) getProcessGroupsToRemove(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, remainingMap map[string]bool, cordSet map[string]fdbv1beta2.None) (bool, bool, []*fdbv1beta2.ProcessGroupStatus) {
Expand Down
52 changes: 29 additions & 23 deletions controllers/remove_process_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,11 @@ var _ = Describe("remove_process_groups", func() {

When("including no process", func() {
It("should not include any process", func() {
processesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(processesToInclude)).To(Equal(0))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(16))
Expect(processesToInclude).To(BeEmpty())
Expect(newProcessGroups).To(BeEmpty())
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})

Expand All @@ -580,11 +581,12 @@ var _ = Describe("remove_process_groups", func() {
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal("1.1.1.1"))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
Expect(processesToInclude).To(HaveLen(1))
Expect(fdbv1beta2.ProcessAddressesString(processesToInclude, " ")).To(Equal("1.1.1.1"))
Expect(newProcessGroups).To(HaveLen(15))
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})
})
Expand All @@ -596,10 +598,11 @@ var _ = Describe("remove_process_groups", func() {

When("including no process", func() {
It("should not include any process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(0))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(16))
Expect(processesToInclude).To(BeEmpty())
Expect(newProcessGroups).To(BeEmpty())
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})

Expand All @@ -615,11 +618,12 @@ var _ = Describe("remove_process_groups", func() {
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(removedProcessGroup.GetExclusionString()))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
Expect(processesToInclude).To(HaveLen(1))
Expect(fdbv1beta2.ProcessAddressesString(processesToInclude, " ")).To(Equal(removedProcessGroup.GetExclusionString()))
Expect(newProcessGroups).To(HaveLen(15))
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})

Expand All @@ -636,12 +640,13 @@ var _ = Describe("remove_process_groups", func() {
removedProcessGroups[removedProcessGroup.ProcessGroupID] = true
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
It("should include two process addresses", func() {
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(2))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(fmt.Sprintf("%s %s", removedProcessGroup.GetExclusionString(), removedProcessGroup.Addresses[0])))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
Expect(processesToInclude).To(HaveLen(2))
Expect(fdbv1beta2.ProcessAddressesString(processesToInclude, " ")).To(Equal(fmt.Sprintf("%s %s", removedProcessGroup.GetExclusionString(), removedProcessGroup.Addresses[0])))
Expect(newProcessGroups).To(HaveLen(15))
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})

Expand All @@ -663,11 +668,12 @@ var _ = Describe("remove_process_groups", func() {
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(removedProcessGroup2.GetExclusionString()))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(14))
Expect(processesToInclude).To(HaveLen(1))
Expect(fdbv1beta2.ProcessAddressesString(processesToInclude, " ")).To(Equal(removedProcessGroup2.GetExclusionString()))
Expect(newProcessGroups).To(HaveLen(14))
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})
})
Expand Down
4 changes: 0 additions & 4 deletions setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ func StartManager(
clusterReconciler.MaintenanceListWaitDuration = operatorOpts.MaintenanceListWaitDuration
clusterReconciler.MinimumRecoveryTimeForInclusion = operatorOpts.MinimumRecoveryTimeForInclusion
clusterReconciler.MinimumRecoveryTimeForExclusion = operatorOpts.MinimumRecoveryTimeForExclusion
clusterReconciler.MaintenanceListStaleDuration = operatorOpts.MaintenanceListStaleDuration
clusterReconciler.MaintenanceListWaitDuration = operatorOpts.MaintenanceListWaitDuration
clusterReconciler.MinimumRecoveryTimeForInclusion = operatorOpts.MinimumRecoveryTimeForInclusion
clusterReconciler.MinimumRecoveryTimeForExclusion = operatorOpts.MinimumRecoveryTimeForExclusion
clusterReconciler.ClusterLabelKeyForNodeTrigger = strings.Trim(operatorOpts.ClusterLabelKeyForNodeTrigger, "\"")
clusterReconciler.Namespace = operatorOpts.WatchNamespace

Expand Down

0 comments on commit e5fd090

Please sign in to comment.