diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 14b22d8..cf4635c 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -35,6 +35,7 @@ type applyCmdConfig struct { autoContinueRebalance bool retentionDropStepDurationStr string skipConfirm bool + ignoreFewerPartitionsError bool sleepLoopDuration time.Duration shared sharedOptions @@ -99,6 +100,12 @@ func init() { false, "Skip confirmation prompts during apply process", ) + applyCmd.Flags().BoolVar( + &applyConfig.ignoreFewerPartitionsError, + "ignore-fewer-partitions-error", + false, + "Don't return error when topic's config specifies fewer partitions than it currently has", + ) applyCmd.Flags().DurationVar( &applyConfig.sleepLoopDuration, "sleep-loop-duration", @@ -231,6 +238,7 @@ func applyTopic( AutoContinueRebalance: applyConfig.autoContinueRebalance, RetentionDropStepDuration: applyConfig.retentionDropStepDuration, SkipConfirm: applyConfig.skipConfirm, + IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError, SleepLoopDuration: applyConfig.sleepLoopDuration, TopicConfig: topicConfig, } diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 25b0084..20eede1 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -23,6 +23,8 @@ import ( log "github.com/sirupsen/logrus" ) +var ErrFewerPartitions = errors.New("fewer partitions in topic config") + // TopicApplierConfig contains the configuration for a TopicApplier struct. type TopicApplierConfig struct { BrokerThrottleMBsOverride int @@ -34,6 +36,7 @@ type TopicApplierConfig struct { AutoContinueRebalance bool RetentionDropStepDuration time.Duration SkipConfirm bool + IgnoreFewerPartitionsError bool SleepLoopDuration time.Duration TopicConfig config.TopicConfig } @@ -213,6 +216,11 @@ func (t *TopicApplier) applyExistingTopic( } if err := t.updatePartitions(ctx, topicInfo); err != nil { + if errors.Is(err, ErrFewerPartitions) && t.config.IgnoreFewerPartitionsError { + log.Warnf("UpdatePartitions failure ignored. topic: %v, error: %v", t.topicName, err) + return nil + } + return err } @@ -477,7 +485,8 @@ func (t *TopicApplier) updatePartitions( if currPartitions > t.topicConfig.Spec.Partitions { return fmt.Errorf( - "Fewer partitions in topic config (%d) than observed (%d); this cannot be resolved by topicctl", + "%w (%d) than observed (%d); this cannot be resolved by topicctl", + ErrFewerPartitions, t.topicConfig.Spec.Partitions, currPartitions, )