Skip to content

Commit

Permalink
Merge branch 'segmentio:master' into shimon.support_fail_safe_approac…
Browse files Browse the repository at this point in the history
…h_for_the_apply_cmd
  • Loading branch information
shimonturjeman authored Apr 2, 2024
2 parents 0af1bd3 + 25a6f37 commit 44deeff
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
19 changes: 3 additions & 16 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -362,22 +362,9 @@ jobs:
KAFKA_CFG_ADVERTISED_PORT: 9092
KAFKA_CFG_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true

snyk:
runs-on: ubuntu-latest
needs: [test010, test270, test360]
steps:
- uses: actions/checkout@v3
- name: Run Snyk to check for vulnerabilities
uses: snyk/actions/golang@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --severity-threshold=high --fail-on=upgradable



publish-ghcr:
needs: [setup, snyk]
needs: [setup, test010, test270, test360]
runs-on: ubuntu-latest
if: ${{ ( github.ref_type == 'branch' ) && (( github.ref_name == 'master' ) || ( github.ref_name == 'v0' )) }}
steps:
Expand Down Expand Up @@ -419,7 +406,7 @@ jobs:
- run: echo "GHCR PUBLISH SUCCESSFUL"

publish-dockerhub:
needs: [setup, snyk]
needs: [setup, test010, test270, test360]
environment: CICD
env:
RELEASE_TAG: ${{ needs.setup.outputs.version-tag }}
Expand Down
8 changes: 8 additions & 0 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type applyCmdConfig struct {
autoContinueRebalance bool
retentionDropStepDurationStr string
skipConfirm bool
ignoreFewerPartitionsError bool
sleepLoopDuration time.Duration
failFast bool

Expand Down Expand Up @@ -100,6 +101,12 @@ func init() {
false,
"Skip confirmation prompts during apply process",
)
applyCmd.Flags().BoolVar(
&applyConfig.ignoreFewerPartitionsError,
"ignore-fewer-partitions-error",
false,
"Don't return error when topic's config specifies fewer partitions than it currently has",
)
applyCmd.Flags().DurationVar(
&applyConfig.sleepLoopDuration,
"sleep-loop-duration",
Expand Down Expand Up @@ -251,6 +258,7 @@ func applyTopic(
AutoContinueRebalance: applyConfig.autoContinueRebalance,
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
SkipConfirm: applyConfig.skipConfirm,
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
SleepLoopDuration: applyConfig.sleepLoopDuration,
TopicConfig: topicConfig,
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
log "github.com/sirupsen/logrus"
)

var ErrFewerPartitions = errors.New("fewer partitions in topic config")

// TopicApplierConfig contains the configuration for a TopicApplier struct.
type TopicApplierConfig struct {
BrokerThrottleMBsOverride int
Expand All @@ -34,6 +36,7 @@ type TopicApplierConfig struct {
AutoContinueRebalance bool
RetentionDropStepDuration time.Duration
SkipConfirm bool
IgnoreFewerPartitionsError bool
SleepLoopDuration time.Duration
TopicConfig config.TopicConfig
}
Expand Down Expand Up @@ -213,6 +216,11 @@ func (t *TopicApplier) applyExistingTopic(
}

if err := t.updatePartitions(ctx, topicInfo); err != nil {
if errors.Is(err, ErrFewerPartitions) && t.config.IgnoreFewerPartitionsError {
log.Warnf("UpdatePartitions failure ignored. topic: %v, error: %v", t.topicName, err)
return nil
}

return err
}

Expand Down Expand Up @@ -477,7 +485,8 @@ func (t *TopicApplier) updatePartitions(

if currPartitions > t.topicConfig.Spec.Partitions {
return fmt.Errorf(
"Fewer partitions in topic config (%d) than observed (%d); this cannot be resolved by topicctl",
"%w (%d) than observed (%d); this cannot be resolved by topicctl",
ErrFewerPartitions,
t.topicConfig.Spec.Partitions,
currPartitions,
)
Expand Down

0 comments on commit 44deeff

Please sign in to comment.