diff --git a/src/go/rpk/go.mod b/src/go/rpk/go.mod index 5963080dd5a6..cef292696cdf 100644 --- a/src/go/rpk/go.mod +++ b/src/go/rpk/go.mod @@ -37,7 +37,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.53.0 - github.com/redpanda-data/common-go/rpadmin v0.1.1 + github.com/redpanda-data/common-go/rpadmin v0.1.2 github.com/rs/xid v1.5.0 github.com/safchain/ethtool v0.3.0 github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 diff --git a/src/go/rpk/go.sum b/src/go/rpk/go.sum index fedc592504db..0682f5505d51 100644 --- a/src/go/rpk/go.sum +++ b/src/go/rpk/go.sum @@ -212,8 +212,8 @@ github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+a github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= github.com/redpanda-data/common-go/net v0.1.0 h1:JnJioRJuL961r1QXiJQ1tW9+yEaJfu8FpXnUmvQbwNM= github.com/redpanda-data/common-go/net v0.1.0/go.mod h1:iOdNkjxM7a1T8F3cYHTaKIPFCHzzp/ia6TN+Z+7Tt5w= -github.com/redpanda-data/common-go/rpadmin v0.1.1 h1:ENlG3Irs5Wiwew5TD8h9l0HGE2Ww2VWhclGrshFpM74= -github.com/redpanda-data/common-go/rpadmin v0.1.1/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4= +github.com/redpanda-data/common-go/rpadmin v0.1.2 h1:TGdV7ZUcwOt9Z6uTtQK7KrxoIlWnX9rDAe666eLthFk= +github.com/redpanda-data/common-go/rpadmin v0.1.2/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4= github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525 h1:vskZrV6q8W8flL0Ud23AJUYAd8ZgTadO45+loFnG2G0= github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525/go.mod h1:3YqAM7pgS5vW/EH7naCjFqnAajSgi0f0CfMe1HGhLxQ= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= diff --git a/src/go/rpk/pkg/cli/cluster/partitions/move.go b/src/go/rpk/pkg/cli/cluster/partitions/move.go index 7f2428e31714..f6aca2192130 100644 --- a/src/go/rpk/pkg/cli/cluster/partitions/move.go +++ b/src/go/rpk/pkg/cli/cluster/partitions/move.go @@ -10,15 +10,19 @@ package partitions import ( + "context" "errors" "fmt" "math/rand" + "reflect" "regexp" "strconv" "strings" "sync" "time" + "go.uber.org/zap" + "github.com/redpanda-data/common-go/rpadmin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" @@ -30,20 +34,17 @@ import ( "golang.org/x/sync/errgroup" ) +type newAssignment struct { + Namespace string `json:"ns"` + Topic string `json:"topic"` + Partition int `json:"partition_id"` + OldReplicas rpadmin.Replicas `json:"old_replicas"` + NewReplicas rpadmin.Replicas `json:"new_replicas"` + Error string `json:"error,omitempty"` +} + func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Command { - var ( - ns string - topic string - partitions []string - ) - type newAssignment struct { - Namespace string `json:"ns"` - Topic string `json:"topic"` - Partition int `json:"partition_id"` - OldReplicas rpadmin.Replicas `json:"old_replicas"` - NewReplicas rpadmin.Replicas `json:"new_replicas"` - Error string `json:"error,omitempty"` - } + var partitionsFlag []string cmd := &cobra.Command{ Use: "move", Short: "Move partition replicas across nodes / cores", @@ -53,7 +54,6 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma if h, ok := f.Help([]newAssignment{}); ok { out.Exit(h) } - p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) config.CheckExitCloudAdmin(p) @@ -61,109 +61,27 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma cl, err := adminapi.NewClient(fs, p) out.MaybeDie(err, "unable to initialize admin client: %v", err) - var ( - mu sync.Mutex - newAssignmentList []newAssignment - wg sync.WaitGroup - brokerReqs = make(map[int]struct{}) - knownNodeCore = make(map[int]int) - ) - - // Concurrently parse the requested partitions and - // find current replica assignments - g, egCtx := errgroup.WithContext(cmd.Context()) - for _, partition := range partitions { - partition := partition - g.Go(func() error { - if len(topics) > 0 { // foo -p 0:1,2,3 - for _, t := range topics { - _, _, part, err := extractNTP(t, partition) - out.MaybeDie(err, "failed to extract topic/partition: %s\n", err) - - if nt := strings.Split(t, "/"); len(nt) == 1 { - ns = "kafka" - topic = nt[0] - } else { - ns = nt[0] - topic = nt[1] - } - - current, err := cl.GetPartition(egCtx, ns, topic, part) - out.MaybeDie(err, "unable to get partition: %s\n", err) - - newReplicas, err := configureReplicas(partition, current.Replicas) - out.MaybeDie(err, "unable to configure new replicas: %v\n", err) - - mu.Lock() - newAssignmentList = append(newAssignmentList, newAssignment{ - Namespace: ns, - Topic: topic, - Partition: part, - OldReplicas: current.Replicas, - NewReplicas: newReplicas, - }) - mu.Unlock() - } - } else { // -p foo/0:1,2,3 - ns, topic, part, err := extractNTP("", partition) - out.MaybeDie(err, "failed to extract topic/partition: %s\n", err) - - current, err := cl.GetPartition(egCtx, ns, topic, part) - out.MaybeDie(err, "unable to get partition: %s\n", err) - - newReplicas, err := configureReplicas(partition, current.Replicas) - out.MaybeDie(err, "unable to configure new replicas: %v\n", err) - - mu.Lock() - newAssignmentList = append(newAssignmentList, newAssignment{ - Namespace: ns, - Topic: topic, - Partition: part, - OldReplicas: current.Replicas, - NewReplicas: newReplicas, - }) - mu.Unlock() - } - return nil - }) + newAssignmentList, coreAssignmentList, err := parseAssignments(cmd.Context(), cl, partitionsFlag, topics) + out.MaybeDie(err, "unable to parse new assignments: %v", err) + if len(newAssignmentList) == 0 && len(coreAssignmentList) == 0 { + out.Exit("No movements are required.") } - if err := g.Wait(); err != nil { - out.Die("failed to parse the arguments: %v\n", err) - } + coreAssignmentOn, err := isLocalCoreAssignmentOn(cmd.Context(), cl) + out.MaybeDie(err, "unable to determine if node_local_core_assignment feature is active: %v", err) + zap.L().Sugar().Debugf("feature node_local_core_assignment, active: %v", coreAssignmentOn) - for _, newa := range newAssignmentList { - for _, nr := range newa.NewReplicas { - if nr.Core == -1 { - brokerReqs[nr.NodeID] = struct{}{} - } - } - } - for node := range brokerReqs { - node := node - g.Go(func() error { - broker, err := cl.Broker(cmd.Context(), node) - mu.Lock() - defer mu.Unlock() - knownNodeCore[node] = broker.NumCores - return err - }) - } - - if err := g.Wait(); err != nil { - out.Die("unable to find core counts", err) - } - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) + // We fill the missing core assignments (-1) with random cores. For + // old brokers this is expected, for new brokers (coreAssignmentOn), + // the core value is arbitrary, so it does not matter. + newAssignmentList, err = fillAssignmentList(cmd.Context(), cl, newAssignmentList) + out.MaybeDie(err, "unable to parse assignment list with existing partition replicas: %v", err) + var ( + wg sync.WaitGroup + mu sync.Mutex + ) for i, newa := range newAssignmentList { - i := i - newa := newa - for j, nr := range newa.NewReplicas { - if nr.Core == -1 { - numCore := knownNodeCore[nr.NodeID] - newa.NewReplicas[j].Core = rng.Intn(numCore) - } - } + i, newa := i, newa wg.Add(1) go func(newa newAssignment) { defer wg.Done() @@ -181,6 +99,31 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma }(newa) } wg.Wait() + if coreAssignmentOn { + for i, newCore := range coreAssignmentList { + i, newCore := i, newCore + for _, rc := range newCore.NewReplicas { + rc := rc + wg.Add(1) + go func(newCore newAssignment, rc rpadmin.Replica) { + defer wg.Done() + zap.L().Sugar().Debugf("Partition %v: assigning core %v in node %v ", newCore.Partition, rc.Core, rc.NodeID) + err := cl.UpdatePartitionReplicaCore(cmd.Context(), newCore.Namespace, newCore.Topic, newCore.Partition, rc.NodeID, rc.Core) + mu.Lock() + defer mu.Unlock() + if he := (*rpadmin.HTTPResponseError)(nil); errors.As(err, &he) { + body, bodyErr := he.DecodeGenericErrorBody() + if bodyErr == nil { + coreAssignmentList[i].Error = body.Message + } else { + coreAssignmentList[i].Error = err.Error() + } + } + }(newCore, rc) + } + } + } + wg.Wait() types.Sort(newAssignmentList) @@ -199,54 +142,212 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma } } tw.Flush() - fmt.Println() - fmt.Printf("Successfully began %d partition movement(s).\n\nCheck the movement status with 'rpk cluster partitions move-status' or see new assignments with 'rpk topic describe -p TOPIC'.\n", successes) + if successes > 0 { + fmt.Println() + fmt.Printf("Successfully began %d partition movement(s).\n\nCheck the movement status with 'rpk cluster partitions move-status' or see new assignments with 'rpk topic describe -p TOPIC'.\n", successes) + } }, } - cmd.Flags().StringArrayVarP(&partitions, "partition", "p", nil, "Topic-partitions to move and new replica locations (repeatable)") + cmd.Flags().StringArrayVarP(&partitionsFlag, "partition", "p", nil, "Topic-partitions to move and new replica locations (repeatable)") cmd.MarkFlagRequired("partition") p.InstallFormatFlag(cmd) return cmd } +// parseAssignments parses the arguments and partition flag of the partition +// move command, returning the new node and core assignments. +func parseAssignments(ctx context.Context, cl *rpadmin.AdminAPI, partitionsFlag, topics []string) (nodeAssignmentList, coreAssignmentList []newAssignment, err error) { + var mu sync.Mutex + // Concurrently parse the requested partitions and + // find current replica assignments + g, egCtx := errgroup.WithContext(ctx) + for _, pFlag := range partitionsFlag { + pFlag := pFlag + g.Go(func() error { + if len(topics) > 0 { // foo -p 0:1,2,3 + for _, t := range topics { + _, _, part, err := extractNTP(t, pFlag) + if err != nil { + return fmt.Errorf("failed to extract topic/partition: %s", err) + } + var ns, topic string + if nt := strings.Split(t, "/"); len(nt) == 1 { + ns = "kafka" + topic = nt[0] + } else { + ns = nt[0] + topic = nt[1] + } + current, err := cl.GetPartition(egCtx, ns, topic, part) + if err != nil { + return fmt.Errorf("unable to get partition: %s", err) + } + newReplicas, coreChanges, err := extractReplicaChanges(pFlag, current.Replicas) + if err != nil { + return fmt.Errorf("unable to configure new replicas: %v", err) + } + + if areReplicasEqual(current.Replicas, newReplicas) { + continue // noNewAssignment + } + mu.Lock() + nodeAssignmentList = append(nodeAssignmentList, newAssignment{ + Namespace: ns, + Topic: topic, + Partition: part, + OldReplicas: current.Replicas, + NewReplicas: newReplicas, + }) + coreAssignmentList = append(coreAssignmentList, newAssignment{ + Namespace: ns, + Topic: topic, + Partition: part, + OldReplicas: current.Replicas, + NewReplicas: coreChanges, + }) + mu.Unlock() + } + } else { // -p foo/0:1,2,3 + ns, topic, part, err := extractNTP("", pFlag) + if err != nil { + return fmt.Errorf("failed to extract topic/partition: %s", err) + } + current, err := cl.GetPartition(egCtx, ns, topic, part) + if err != nil { + return fmt.Errorf("unable to get partition: %s", err) + } + newReplicas, coreChanges, err := extractReplicaChanges(pFlag, current.Replicas) + if err != nil { + return fmt.Errorf("unable to configure new replicas: %v", err) + } + if areReplicasEqual(current.Replicas, newReplicas) { + return nil // noNewAssignment + } + mu.Lock() + nodeAssignmentList = append(nodeAssignmentList, newAssignment{ + Namespace: ns, + Topic: topic, + Partition: part, + OldReplicas: current.Replicas, + NewReplicas: newReplicas, + }) + coreAssignmentList = append(coreAssignmentList, newAssignment{ + Namespace: ns, + Topic: topic, + Partition: part, + OldReplicas: current.Replicas, + NewReplicas: coreChanges, + }) + mu.Unlock() + } + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, nil, fmt.Errorf("failed to parse the arguments: %v", err) + } + types.Sort(nodeAssignmentList) + types.Sort(coreAssignmentList) + return nodeAssignmentList, coreAssignmentList, nil +} + +// fillAssignmentList checks for the replicas in the assignment list that has +// a core value -1 and fills it with a random core value. +func fillAssignmentList(ctx context.Context, cl *rpadmin.AdminAPI, assignmentList []newAssignment) ([]newAssignment, error) { + var ( + brokerReqs = make(map[int]struct{}) + knownNodeCore = make(map[int]int) + mu sync.Mutex + ) + for _, newa := range assignmentList { + for _, nr := range newa.NewReplicas { + if nr.Core == -1 { + brokerReqs[nr.NodeID] = struct{}{} + } + } + } + g, egCtx := errgroup.WithContext(ctx) + for node := range brokerReqs { + node := node + g.Go(func() error { + broker, err := cl.Broker(egCtx, node) + mu.Lock() + defer mu.Unlock() + knownNodeCore[node] = broker.NumCores + return err + }) + } + if err := g.Wait(); err != nil { + return nil, fmt.Errorf("unable to find core counts: %v", err) + } + + var filledAssignments []newAssignment + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, newa := range assignmentList { + newa := newa + for j, nr := range newa.NewReplicas { + if nr.Core == -1 { + numCore := knownNodeCore[nr.NodeID] + newa.NewReplicas[j].Core = rng.Intn(numCore) + } + } + filledAssignments = append(filledAssignments, newa) + } + return filledAssignments, nil +} + var ( ntpRe *regexp.Regexp ntpReOnce sync.Once ) -// extractNTP parses the partition flag with format; foo/0:1,2,3 or 0:1,2,3. -// It extracts letters before the colon and formats it. -func extractNTP(topic string, ntp string) (ns string, t string, p int, err error) { +// extractNTP parses the namespace/topic/partition out of the partition flag +// with format; foo/0:1,2,3 or 0:1,2,3. +func extractNTP(topic string, partitionFlag string) (ns string, t string, p int, err error) { ntpReOnce.Do(func() { + // This regexp captures a sequence of characters before a colon. + // It can be either a number alone, or a number preceded by any number + // of '[strings]/' segments. ntpRe = regexp.MustCompile(`^((?:[^:]+/)?\d+):.*$`) }) - m := ntpRe.FindStringSubmatch(ntp) + // Match[0]: Full Match. + // Match[1]: String before the colon ([ns/]t/p). + m := ntpRe.FindStringSubmatch(partitionFlag) if len(m) == 0 { - return "", "", -1, fmt.Errorf("invalid format for %s", ntp) + return "", "", -1, fmt.Errorf("invalid format for %s; check --help text", partitionFlag) } beforeColon := m[1] + // If a topic is provided, we only parse the partition and return. if topic != "" { + if strings.Contains(beforeColon, "/") { + return "", "", -1, fmt.Errorf("unable to parse --partition %v with topic %q: providing a topic as an argument and in the --partition flag is not allowed", partitionFlag, topic) + } p, err = strconv.Atoi(beforeColon) if err != nil { return "", "", -1, fmt.Errorf("%s", err) } - } else if n := strings.Split(beforeColon, "/"); len(n) == 3 { + return + } + // Parse either namespace/topic/partition or topic/partition: + n := strings.Split(beforeColon, "/") + switch len(n) { + case 3: ns = n[0] t = n[1] p, err = strconv.Atoi(n[2]) if err != nil { return "", "", -1, fmt.Errorf("%s", err) } - } else if len(n) == 2 { + case 2: ns = "kafka" t = n[0] p, err = strconv.Atoi(n[1]) if err != nil { return "", "", -1, fmt.Errorf("%s", err) } - } else { - return "", "", -1, fmt.Errorf("invalid format for %s", ntp) + default: + return "", "", -1, fmt.Errorf("invalid format for %s; check --help text", partitionFlag) } return ns, t, p, nil } @@ -256,35 +357,46 @@ var ( replicaReOnce sync.Once ) -// configureReplicas parses the partition flag with format; foo/0:1-0,2-1,3-2 or 0:1,2,3 -// It extracts letters after the colon and return as adminapi.Replicas. -func configureReplicas(partition string, currentReplicas rpadmin.Replicas) (rpadmin.Replicas, error) { +// extractReplicaChanges parses the node-core from the partition flag with format: +// foo/0:1-0,2-1,3-2 or 0:1,2,3. It returns the full rpadmin.Replicas changes, +// and a subset of only the core changes requested. +func extractReplicaChanges(partitionFlag string, currentReplicas rpadmin.Replicas) (allReplicas, coreReplicas rpadmin.Replicas, err error) { replicaReOnce.Do(func() { + // This regexp captures the sequence of numbers (nodes) after a colon. + // The sequence can be a single number, two numbers separated by a + // hyphen (node-core), or multiple of such numbers separated by commas. replicaRe = regexp.MustCompile(`^[^:]+:(\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*)$`) }) - m := replicaRe.FindStringSubmatch(partition) + // Match[0]: Full Match. + // Match[1]: String after the colon. (node[-core],...) + m := replicaRe.FindStringSubmatch(partitionFlag) if len(m) == 0 { - return nil, fmt.Errorf("invalid format for %s", partition) + return nil, nil, fmt.Errorf("invalid format for %s; check --help text", partitionFlag) + } + nodeCoresChanges := strings.Split(m[1], ",") + if len(nodeCoresChanges) != len(currentReplicas) { + return nil, nil, fmt.Errorf("cannot configure %s; cannot modify replication factor, current replica count: %v; replica count requested: %v. You may use 'rpk topic alter-config [TOPIC] -s replication.factor=X' to modify the replication factor", partitionFlag, len(currentReplicas), len(nodeCoresChanges)) } - var newReplicas rpadmin.Replicas - for _, nodeCore := range strings.Split(m[1], ",") { - if split := strings.Split(nodeCore, "-"); len(split) == 1 { + for _, nodeCore := range nodeCoresChanges { + if split := strings.Split(nodeCore, "-"); len(split) == 1 { // Node change: -p 0:1,2,3 node, _ := strconv.Atoi(split[0]) core := findCore(node, currentReplicas) - newReplicas = append(newReplicas, rpadmin.Replica{ - NodeID: node, - Core: core, - }) - } else { + allReplicas = append(allReplicas, rpadmin.Replica{NodeID: node, Core: core}) + } else { // Node/Core changes: -p 0:1-0,2-3,3 node, _ := strconv.Atoi(split[0]) - core, _ := strconv.Atoi(split[1]) - newReplicas = append(newReplicas, rpadmin.Replica{ - NodeID: node, - Core: core, - }) + currentCore := findCore(node, currentReplicas) + if currentCore == -1 { // -1 == not found == new node + return nil, nil, fmt.Errorf("node '%v': this command does not support updating cores for replicas on new nodes", node) + } + newCore, _ := strconv.Atoi(split[1]) + allReplicas = append(allReplicas, rpadmin.Replica{NodeID: node, Core: newCore}) + // We only append core changes. If the requested core is the same, we don't bother. + if currentCore != newCore { + coreReplicas = append(coreReplicas, rpadmin.Replica{NodeID: node, Core: newCore}) + } } } - return newReplicas, nil + return } // findCore finds a shard (CPU core) where an existing replica is @@ -298,6 +410,30 @@ func findCore(nodeID int, currentReplicas rpadmin.Replicas) int { return -1 } +func isLocalCoreAssignmentOn(ctx context.Context, cl *rpadmin.AdminAPI) (bool, error) { + features, err := cl.GetFeatures(ctx) + if err != nil { + return false, fmt.Errorf("error trying to get feature lists: %v", err) + } + var foundActive bool + for _, f := range features.Features { + if f.Name == "node_local_core_assignment" && f.State == "active" { + foundActive = true + } + } + return foundActive, nil +} + +// areReplicasEqual check if both replicas are equal. +func areReplicasEqual(current, requested rpadmin.Replicas) bool { + if len(current) != len(requested) { + return false + } + types.Sort(current) + types.Sort(requested) + return reflect.DeepEqual(current, requested) +} + const helpAlterAssignments = `Move partition replicas across nodes / cores. This command changes replica assignments for given partitions. By default, it diff --git a/src/go/rpk/pkg/cli/cluster/partitions/move_test.go b/src/go/rpk/pkg/cli/cluster/partitions/move_test.go index b2935fcf5fec..1c4a81d25285 100644 --- a/src/go/rpk/pkg/cli/cluster/partitions/move_test.go +++ b/src/go/rpk/pkg/cli/cluster/partitions/move_test.go @@ -1,7 +1,15 @@ package partitions import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" "testing" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/stretchr/testify/require" ) func Test_extractNTP(t *testing.T) { @@ -101,3 +109,413 @@ func Test_extractNTP(t *testing.T) { }) } } + +func Test_extractReplicaChanges(t *testing.T) { + tests := []struct { + name string + partition string + currentReplica rpadmin.Replicas + expAllReplicas rpadmin.Replicas + expCoreReplicas rpadmin.Replicas + expErrContain string + }{ + { + name: "only node changes", + partition: "0:1,2,4", + currentReplica: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 1}, + }, + expAllReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 1}, + {NodeID: 4, Core: -1}, // new node + }, + }, + { + name: "only core changes", + partition: "myTopic/1:1-1,2-3,3-4", + currentReplica: rpadmin.Replicas{ + {NodeID: 1, Core: 3}, + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 2}, + }, + expAllReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 1}, + {NodeID: 2, Core: 3}, + {NodeID: 3, Core: 4}, + }, + expCoreReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 1}, + {NodeID: 2, Core: 3}, + {NodeID: 3, Core: 4}, + }, + }, + { + name: "mixed changes", + partition: "kafka/cipot/1:1,2-3,3", + currentReplica: rpadmin.Replicas{ + {NodeID: 1, Core: 3}, + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 2}, + }, + expAllReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 3}, + {NodeID: 2, Core: 3}, + {NodeID: 3, Core: 2}, + }, + expCoreReplicas: rpadmin.Replicas{ + {NodeID: 2, Core: 3}, + }, + }, + { + name: "core changes, but already exist (no changes)", + partition: "kafka/cipot/1:1-1,2-2,3-3", + currentReplica: rpadmin.Replicas{ + {NodeID: 1, Core: 1}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 3}, + }, + expAllReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 1}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 3}, + }, + expCoreReplicas: nil, + }, + { + name: "core changes in new node (-1)", + partition: "3:1-2,2-1,4-3", // 4 is the new node. + currentReplica: rpadmin.Replicas{ + {NodeID: 1, Core: 1}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 3}, + }, + expErrContain: "this command does not support updating cores for replicas on new nodes", + }, + { + name: "replication factor change", + partition: "3:1-2", // from 3 to 1 + currentReplica: rpadmin.Replicas{ + {NodeID: 1, Core: 1}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 3}, + }, + expErrContain: "cannot modify replication factor", + }, + { + name: "invalid partition partition format", + partition: "3:1-2-3", + expErrContain: "invalid format", + }, + { + name: "invalid partition ntp format", + partition: "1-2,3,2", + expErrContain: "invalid format", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + all, core, err := extractReplicaChanges(tt.partition, tt.currentReplica) + if tt.expErrContain != "" { + require.Error(t, err) + require.ErrorContains(t, err, tt.expErrContain) + return + } + require.NoError(t, err) + + require.Equal(t, all, tt.expAllReplicas) + require.Equal(t, core, tt.expCoreReplicas) + }) + } +} + +func Test_areReplicasEqual(t *testing.T) { + tests := []struct { + name string + a rpadmin.Replicas + b rpadmin.Replicas + exp bool + }{ + { + a: rpadmin.Replicas{}, + b: rpadmin.Replicas{}, + exp: true, + }, + { + a: rpadmin.Replicas{{NodeID: 1, Core: 3}}, + b: rpadmin.Replicas{{NodeID: 1, Core: 3}}, + exp: true, + }, + { + a: rpadmin.Replicas{{NodeID: 1, Core: 3}, {NodeID: 2, Core: 1}}, + b: rpadmin.Replicas{{NodeID: 2, Core: 1}, {NodeID: 1, Core: 3}}, + exp: true, + }, + { + a: rpadmin.Replicas{{NodeID: 1, Core: 3}, {NodeID: 2, Core: 1}}, + b: rpadmin.Replicas{{NodeID: 2, Core: 1}, {NodeID: 1, Core: 2}}, + exp: false, + }, + { + a: rpadmin.Replicas{}, b: nil, exp: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.exp, areReplicasEqual(tt.a, tt.b)) + }) + } +} + +func Test_fillAssignmentList(t *testing.T) { + tests := []struct { + name string + coreCount int + list []newAssignment + }{ + { + name: "fill all", + coreCount: 3, + list: []newAssignment{ + { + NewReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 1}, + {NodeID: 2, Core: -1}, + {NodeID: 3, Core: 3}, + }, + }, + { + NewReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: -1}, + {NodeID: 2, Core: -1}, + {NodeID: 3, Core: -1}, + }, + }, + { + NewReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 2}, + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 3}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + ts := httptest.NewServer(brokerHandler(tt.coreCount)) + defer ts.Close() + cl, err := rpadmin.NewClient([]string{ts.URL}, nil, new(rpadmin.NopAuth), false) + require.NoError(t, err) + + got, err := fillAssignmentList(ctx, cl, tt.list) + require.NoError(t, err) + for _, a := range got { + for _, r := range a.NewReplicas { + // We can't check for an exact value because + // fillAssignmentList assigns a random core. + require.NotEqual(t, -1, r.Core) + } + } + }) + } +} + +func brokerHandler(numCores int) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + resp := fmt.Sprintf(`{"num_cores":%v}`, numCores) + w.Write([]byte(resp)) + } +} + +func Test_parseAssignments(t *testing.T) { + tests := []struct { + name string + partitionsFlag []string + topics []string + currentReplicas rpadmin.Replicas + expNodeAssignmentList []newAssignment + expCoreAssignmentList []newAssignment + expErrContains string + }{ + { + name: "node and core changes, topic in partition flag", + partitionsFlag: []string{"foo/0:4,2-1,3-0"}, + currentReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 4}, + }, + expNodeAssignmentList: []newAssignment{ + { + Namespace: "kafka", + Topic: "foo", + Partition: 0, + OldReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 4}, + }, + NewReplicas: rpadmin.Replicas{ + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 0}, + {NodeID: 4, Core: -1}, + }, + }, + }, + expCoreAssignmentList: []newAssignment{ + { + Namespace: "kafka", + Topic: "foo", + Partition: 0, + OldReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 4}, + }, + NewReplicas: rpadmin.Replicas{ + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 0}, + }, + }, + }, + }, + { + name: "multiple changes, topics in partition flag", + partitionsFlag: []string{"foo/0:4,2-1,3-0", "kafka_internal/bar/0:1-2,2-4,3"}, + currentReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 4}, + }, + expNodeAssignmentList: []newAssignment{ + { + Namespace: "kafka", + Topic: "foo", + Partition: 0, + OldReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 4}, + }, + NewReplicas: rpadmin.Replicas{ + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 0}, + {NodeID: 4, Core: -1}, + }, + }, + { + Namespace: "kafka_internal", + Topic: "bar", + Partition: 0, + OldReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 4}, + }, + NewReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 2}, + {NodeID: 2, Core: 4}, + {NodeID: 3, Core: 4}, + }, + }, + }, + expCoreAssignmentList: []newAssignment{ + { + Namespace: "kafka", + Topic: "foo", + Partition: 0, + OldReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 4}, + }, + NewReplicas: rpadmin.Replicas{ + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 0}, + }, + }, + { + Namespace: "kafka_internal", + Topic: "bar", + Partition: 0, + OldReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 2}, + {NodeID: 3, Core: 4}, + }, + NewReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 2}, + {NodeID: 2, Core: 4}, + }, + }, + }, + }, + { + name: "no node and no core changes, topic in args", + partitionsFlag: []string{"0:1,2-1,3-0"}, // The same as the current replicas + topics: []string{"foo"}, + currentReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 0}, + }, + }, + { + name: "err: replication change", + partitionsFlag: []string{"kafka/myTopic/0:1,2-1"}, + currentReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 0}, + }, + expErrContains: "cannot modify replication factor", + }, + { + name: "err: core change on new node", + partitionsFlag: []string{"kafka/myTopic/0:1,2-3,4-2"}, + currentReplicas: rpadmin.Replicas{ + {NodeID: 1, Core: 0}, + {NodeID: 2, Core: 1}, + {NodeID: 3, Core: 0}, + }, + expErrContains: "does not support updating cores for replicas on new nodes", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + ts := httptest.NewServer(partitionHandler(tt.currentReplicas)) + defer ts.Close() + cl, err := rpadmin.NewClient([]string{ts.URL}, nil, new(rpadmin.NopAuth), false) + require.NoError(t, err) + + gotNodeAssignmentList, gotCoreAssignmentList, err := parseAssignments(ctx, cl, tt.partitionsFlag, tt.topics) + if tt.expErrContains != "" { + require.Error(t, err) + require.ErrorContains(t, err, tt.expErrContains) + return + } + require.NoError(t, err) + + require.Equal(t, tt.expNodeAssignmentList, gotNodeAssignmentList) + require.Equal(t, tt.expCoreAssignmentList, gotCoreAssignmentList) + }) + } +} + +func partitionHandler(replicas rpadmin.Replicas) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + partition := rpadmin.Partition{ + Replicas: replicas, + } + resp, _ := json.Marshal(partition) + w.Write(resp) + } +}