Skip to content

Commit

Permalink
Changed AlterPartitionReassignments request to support multiple topics (
Browse files Browse the repository at this point in the history
#1204)

* changed AlterPartitionReassignments request to support multiple topics

* keep compatibility with existing code.
  • Loading branch information
Boris Granveaud authored Oct 16, 2023
1 parent f48706e commit 5b97cf9
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 12 deletions.
41 changes: 30 additions & 11 deletions alterpartitionreassignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ type AlterPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// Topic is the name of the topic to alter partitions in.
// Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to
// reassign to multiple topics.
Topic string

// Assignments is the list of partition reassignments to submit to the API.
Expand All @@ -26,10 +27,13 @@ type AlterPartitionReassignmentsRequest struct {
// AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
// partition.
type AlterPartitionReassignmentsRequestAssignment struct {
// Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.
Topic string

// PartitionID is the ID of the partition to make the reassignments in.
PartitionID int

// BrokerIDs is a slice of brokers to set the partition replicas to.
// BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.
BrokerIDs []int
}

Expand All @@ -46,6 +50,9 @@ type AlterPartitionReassignmentsResponse struct {
// AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
// doing reassignments for a single partition.
type AlterPartitionReassignmentsResponsePartitionResult struct {
// Topic is the topic name.
Topic string

// PartitionID is the ID of the partition that was altered.
PartitionID int

Expand All @@ -58,16 +65,29 @@ func (c *Client) AlterPartitionReassignments(
ctx context.Context,
req *AlterPartitionReassignmentsRequest,
) (*AlterPartitionReassignmentsResponse, error) {
apiPartitions := []alterpartitionreassignments.RequestPartition{}
apiTopicMap := make(map[string]*alterpartitionreassignments.RequestTopic)

for _, assignment := range req.Assignments {
topic := assignment.Topic
if topic == "" {
topic = req.Topic
}

apiTopic := apiTopicMap[topic]
if apiTopic == nil {
apiTopic = &alterpartitionreassignments.RequestTopic{
Name: topic,
}
apiTopicMap[topic] = apiTopic
}

replicas := []int32{}
for _, brokerID := range assignment.BrokerIDs {
replicas = append(replicas, int32(brokerID))
}

apiPartitions = append(
apiPartitions,
apiTopic.Partitions = append(
apiTopic.Partitions,
alterpartitionreassignments.RequestPartition{
PartitionIndex: int32(assignment.PartitionID),
Replicas: replicas,
Expand All @@ -77,12 +97,10 @@ func (c *Client) AlterPartitionReassignments(

apiReq := &alterpartitionreassignments.Request{
TimeoutMs: int32(req.Timeout.Milliseconds()),
Topics: []alterpartitionreassignments.RequestTopic{
{
Name: req.Topic,
Partitions: apiPartitions,
},
},
}

for _, apiTopic := range apiTopicMap {
apiReq.Topics = append(apiReq.Topics, *apiTopic)
}

protoResp, err := c.roundTrip(
Expand All @@ -104,6 +122,7 @@ func (c *Client) AlterPartitionReassignments(
resp.PartitionResults = append(
resp.PartitionResults,
AlterPartitionReassignmentsResponsePartitionResult{
Topic: topicResult.Name,
PartitionID: int(partitionResult.PartitionIndex),
Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
},
Expand Down
61 changes: 61 additions & 0 deletions alterpartitionreassignments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,64 @@ func TestClientAlterPartitionReassignments(t *testing.T) {
)
}
}

func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) {
if !ktesting.KafkaIsAtLeast("2.4.0") {
return
}

ctx := context.Background()
client, shutdown := newLocalClient()
defer shutdown()

topic1 := makeTopic()
topic2 := makeTopic()
createTopic(t, topic1, 2)
createTopic(t, topic2, 2)
defer func() {
deleteTopic(t, topic1)
deleteTopic(t, topic2)
}()

// Local kafka only has 1 broker, so any partition reassignments are really no-ops.
resp, err := client.AlterPartitionReassignments(
ctx,
&AlterPartitionReassignmentsRequest{
Assignments: []AlterPartitionReassignmentsRequestAssignment{
{
Topic: topic1,
PartitionID: 0,
BrokerIDs: []int{1},
},
{
Topic: topic1,
PartitionID: 1,
BrokerIDs: []int{1},
},
{
Topic: topic2,
PartitionID: 0,
BrokerIDs: []int{1},
},
},
},
)

if err != nil {
t.Fatal(err)
}
if resp.Error != nil {
t.Error(
"Unexpected error in response",
"expected", nil,
"got", resp.Error,
)
}
if len(resp.PartitionResults) != 3 {
t.Error(
"Unexpected length of partition results",
"expected", 3,
"got", len(resp.PartitionResults),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type RequestTopic struct {

type RequestPartition struct {
PartitionIndex int32 `kafka:"min=v0,max=v0"`
Replicas []int32 `kafka:"min=v0,max=v0"`
Replicas []int32 `kafka:"min=v0,max=v0,nullable"`
}

func (r *Request) ApiKey() protocol.ApiKey {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package alterpartitionreassignments_test

import (
"testing"

"github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
"github.com/segmentio/kafka-go/protocol/prototest"
)

const (
v0 = 0
)

func TestAlterPartitionReassignmentsRequest(t *testing.T) {
prototest.TestRequest(t, v0, &alterpartitionreassignments.Request{
TimeoutMs: 1,
Topics: []alterpartitionreassignments.RequestTopic{
{
Name: "topic-1",
Partitions: []alterpartitionreassignments.RequestPartition{
{
PartitionIndex: 1,
Replicas: []int32{1, 2, 3},
},
{
PartitionIndex: 2,
},
},
},
},
})
}

func TestAlterPartitionReassignmentsResponse(t *testing.T) {
prototest.TestResponse(t, v0, &alterpartitionreassignments.Response{
ErrorCode: 1,
ErrorMessage: "error",
ThrottleTimeMs: 1,
Results: []alterpartitionreassignments.ResponseResult{
{
Name: "topic-1",
Partitions: []alterpartitionreassignments.ResponsePartition{
{
PartitionIndex: 1,
ErrorMessage: "error",
ErrorCode: 1,
},
{
PartitionIndex: 2,
},
},
},
},
})
}

0 comments on commit 5b97cf9

Please sign in to comment.