From 25a6f37d385a9548e6421b86046f0c26322c6980 Mon Sep 17 00:00:00 2001 From: Shimon Turjeman Date: Tue, 2 Apr 2024 02:03:56 +0300 Subject: [PATCH] apply: Allow ignoring a specific error returned from updatePartitions() (#184) When applying topic's configuration, we'd like to be able avoid failing in case the desired topic's partition count is smaller than the actual topic's partitions count (on the broker). We know that Kafka doesn't allow partitions decrease, so instead of failing the operation we'd like to continue without doing anything. This is very convenient when trying to apply a batch of topics in which case, we don't want to fail the rest of the batch for one topic that its partition count configuration is lower than the actual partitions count. This change is backward compatible, as the default behavior is kept. Signed-off-by: shimon-armis --- cmd/topicctl/subcmd/apply.go | 8 ++++++++ pkg/apply/apply.go | 11 ++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 14b22d8..cf4635c 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -35,6 +35,7 @@ type applyCmdConfig struct { autoContinueRebalance bool retentionDropStepDurationStr string skipConfirm bool + ignoreFewerPartitionsError bool sleepLoopDuration time.Duration shared sharedOptions @@ -99,6 +100,12 @@ func init() { false, "Skip confirmation prompts during apply process", ) + applyCmd.Flags().BoolVar( + &applyConfig.ignoreFewerPartitionsError, + "ignore-fewer-partitions-error", + false, + "Don't return error when topic's config specifies fewer partitions than it currently has", + ) applyCmd.Flags().DurationVar( &applyConfig.sleepLoopDuration, "sleep-loop-duration", @@ -231,6 +238,7 @@ func applyTopic( AutoContinueRebalance: applyConfig.autoContinueRebalance, RetentionDropStepDuration: applyConfig.retentionDropStepDuration, SkipConfirm: applyConfig.skipConfirm, + IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError, SleepLoopDuration: applyConfig.sleepLoopDuration, TopicConfig: topicConfig, } diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 25b0084..20eede1 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -23,6 +23,8 @@ import ( log "github.com/sirupsen/logrus" ) +var ErrFewerPartitions = errors.New("fewer partitions in topic config") + // TopicApplierConfig contains the configuration for a TopicApplier struct. type TopicApplierConfig struct { BrokerThrottleMBsOverride int @@ -34,6 +36,7 @@ type TopicApplierConfig struct { AutoContinueRebalance bool RetentionDropStepDuration time.Duration SkipConfirm bool + IgnoreFewerPartitionsError bool SleepLoopDuration time.Duration TopicConfig config.TopicConfig } @@ -213,6 +216,11 @@ func (t *TopicApplier) applyExistingTopic( } if err := t.updatePartitions(ctx, topicInfo); err != nil { + if errors.Is(err, ErrFewerPartitions) && t.config.IgnoreFewerPartitionsError { + log.Warnf("UpdatePartitions failure ignored. topic: %v, error: %v", t.topicName, err) + return nil + } + return err } @@ -477,7 +485,8 @@ func (t *TopicApplier) updatePartitions( if currPartitions > t.topicConfig.Spec.Partitions { return fmt.Errorf( - "Fewer partitions in topic config (%d) than observed (%d); this cannot be resolved by topicctl", + "%w (%d) than observed (%d); this cannot be resolved by topicctl", + ErrFewerPartitions, t.topicConfig.Spec.Partitions, currPartitions, )