Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of ListPartitionReassignments API #1203

Merged
merged 4 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions listpartitionreassignments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package kafka

import (
"context"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
)

// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.
type ListPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// Topics we want reassignments for, mapped by their name, or nil to list everything.
Topics map[string]ListPartitionReassignmentsRequestTopic

// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}

// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single
// topic.
type ListPartitionReassignmentsRequestTopic struct {
// The partitions to list partition reassignments for.
PartitionIndexes []int
}

// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.
type ListPartitionReassignmentsResponse struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered.
Error error

// Topics contains results for each topic, mapped by their name.
Topics map[string]ListPartitionReassignmentsResponseTopic
}

// ListPartitionReassignmentsResponseTopic contains the detailed result of
// ongoing reassignments for a topic.
type ListPartitionReassignmentsResponseTopic struct {
// Partitions contains result for topic partitions.
Partitions []ListPartitionReassignmentsResponsePartition
}

// ListPartitionReassignmentsResponsePartition contains the detailed result of
// ongoing reassignments for a single partition.
type ListPartitionReassignmentsResponsePartition struct {
// PartitionIndex contains index of the partition.
PartitionIndex int

// Replicas contains the current replica set.
Replicas []int

// AddingReplicas contains the set of replicas we are currently adding.
AddingReplicas []int

// RemovingReplicas contains the set of replicas we are currently removing.
RemovingReplicas []int
}

func (c *Client) ListPartitionReassignments(
ctx context.Context,
req *ListPartitionReassignmentsRequest,
) (*ListPartitionReassignmentsResponse, error) {
apiReq := &listpartitionreassignments.Request{
TimeoutMs: int32(req.Timeout.Milliseconds()),
}

for topicName, topicReq := range req.Topics {
apiReq.Topics = append(
apiReq.Topics,
listpartitionreassignments.RequestTopic{
Name: topicName,
PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes),
},
)
}

protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protoResp.(*listpartitionreassignments.Response)

resp := &ListPartitionReassignmentsResponse{
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
Topics: make(map[string]ListPartitionReassignmentsResponseTopic),
}

for _, topicResult := range apiResp.Topics {
respTopic := ListPartitionReassignmentsResponseTopic{}
for _, partitionResult := range topicResult.Partitions {
respTopic.Partitions = append(
respTopic.Partitions,
ListPartitionReassignmentsResponsePartition{
PartitionIndex: int(partitionResult.PartitionIndex),
Replicas: int32ToIntArray(partitionResult.Replicas),
AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas),
RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas),
},
)
}
resp.Topics[topicResult.Name] = respTopic
}

return resp, nil
}

func intToInt32Array(arr []int) []int32 {
if arr == nil {
return nil
}
res := make([]int32, len(arr))
for i := range arr {
res[i] = int32(arr[i])
}
return res
}

func int32ToIntArray(arr []int32) []int {
if arr == nil {
return nil
}
res := make([]int, len(arr))
for i := range arr {
res[i] = int(arr[i])
}
return res
}
50 changes: 50 additions & 0 deletions listpartitionreassignments_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientListPartitionReassignments(t *testing.T) {
if !ktesting.KafkaIsAtLeast("2.4.0") {
return
}

ctx := context.Background()
client, shutdown := newLocalClient()
defer shutdown()

topic := makeTopic()
createTopic(t, topic, 2)
defer deleteTopic(t, topic)

// Can't really get an ongoing partition reassignment with local Kafka, so just do a superficial test here.
resp, err := client.ListPartitionReassignments(
ctx,
&ListPartitionReassignmentsRequest{
Topics: map[string]ListPartitionReassignmentsRequestTopic{
topic: {PartitionIndexes: []int{0, 1}},
},
},
)

if err != nil {
t.Fatal(err)
}
if resp.Error != nil {
t.Error(
"Unexpected error in response",
"expected", nil,
"got", resp.Error,
)
}
if len(resp.Topics) != 0 {
t.Error(
"Unexpected length of topic results",
"expected", 0,
"got", len(resp.Topics),
)
}
}
58 changes: 58 additions & 0 deletions protocol/listpartitionreassignments/listpartitionreassignments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package listpartitionreassignments

import "github.com/segmentio/kafka-go/protocol"

func init() {
protocol.Register(&Request{}, &Response{})
}

// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListPartitionReassignments.

type Request struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v0,max=v0,tag"`

TimeoutMs int32 `kafka:"min=v0,max=v0"`
Topics []RequestTopic `kafka:"min=v0,max=v0,nullable"`
}

type RequestTopic struct {
Name string `kafka:"min=v0,max=v0"`
petedannemann marked this conversation as resolved.
Show resolved Hide resolved
PartitionIndexes []int32 `kafka:"min=v0,max=v0"`
}

func (r *Request) ApiKey() protocol.ApiKey {
return protocol.ListPartitionReassignments
}

func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
return cluster.Brokers[cluster.Controller], nil
}

type Response struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v0,max=v0,tag"`

ThrottleTimeMs int32 `kafka:"min=v0,max=v0"`
ErrorCode int16 `kafka:"min=v0,max=v0"`
ErrorMessage string `kafka:"min=v0,max=v0,nullable"`
Topics []ResponseTopic `kafka:"min=v0,max=v0"`
}

type ResponseTopic struct {
Name string `kafka:"min=v0,max=v0"`
Partitions []ResponsePartition `kafka:"min=v0,max=v0"`
}

type ResponsePartition struct {
PartitionIndex int32 `kafka:"min=v0,max=v0"`
Replicas []int32 `kafka:"min=v0,max=v0"`
AddingReplicas []int32 `kafka:"min=v0,max=v0"`
RemovingReplicas []int32 `kafka:"min=v0,max=v0"`
}

func (r *Response) ApiKey() protocol.ApiKey {
return protocol.ListPartitionReassignments
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package listpartitionreassignments_test

import (
"testing"

"github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
"github.com/segmentio/kafka-go/protocol/prototest"
)

const (
v0 = 0
)

func TestListPartitionReassignmentsRequest(t *testing.T) {
prototest.TestRequest(t, v0, &listpartitionreassignments.Request{
Topics: []listpartitionreassignments.RequestTopic{
{
Name: "topic-1",
PartitionIndexes: []int32{1, 2, 3},
},
},
})
}

func TestListPartitionReassignmentsResponse(t *testing.T) {
prototest.TestResponse(t, v0, &listpartitionreassignments.Response{
Topics: []listpartitionreassignments.ResponseTopic{
{
Name: "topic-1",
Partitions: []listpartitionreassignments.ResponsePartition{
{
PartitionIndex: 1,
Replicas: []int32{1, 2, 3},
AddingReplicas: []int32{4, 5, 6},
RemovingReplicas: []int32{7, 8, 9},
},
},
},
},
})
}