From f2d9e087266352d276b3cb951b4c668c114b8b75 Mon Sep 17 00:00:00 2001 From: Boris Granveaud Date: Mon, 16 Oct 2023 17:59:15 +0200 Subject: [PATCH] Implementation of ListPartitionReassignments API (#1203) * implemented ListPartitionReassignments API. * fix nullable * fix lint * fix tag. --- listpartitionreassignments.go | 135 ++++++++++++++++++ listpartitionreassignments_test.go | 50 +++++++ .../listpartitionreassignments.go | 70 +++++++++ .../listpartitionreassignments_test.go | 41 ++++++ 4 files changed, 296 insertions(+) create mode 100644 listpartitionreassignments.go create mode 100644 listpartitionreassignments_test.go create mode 100644 protocol/listpartitionreassignments/listpartitionreassignments.go create mode 100644 protocol/listpartitionreassignments/listpartitionreassignments_test.go diff --git a/listpartitionreassignments.go b/listpartitionreassignments.go new file mode 100644 index 00000000..aa01fff3 --- /dev/null +++ b/listpartitionreassignments.go @@ -0,0 +1,135 @@ +package kafka + +import ( + "context" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/listpartitionreassignments" +) + +// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API. +type ListPartitionReassignmentsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // Topics we want reassignments for, mapped by their name, or nil to list everything. + Topics map[string]ListPartitionReassignmentsRequestTopic + + // Timeout is the amount of time to wait for the request to complete. + Timeout time.Duration +} + +// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single +// topic. +type ListPartitionReassignmentsRequestTopic struct { + // The partitions to list partition reassignments for. + PartitionIndexes []int +} + +// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API. +type ListPartitionReassignmentsResponse struct { + // Error is set to a non-nil value including the code and message if a top-level + // error was encountered. + Error error + + // Topics contains results for each topic, mapped by their name. + Topics map[string]ListPartitionReassignmentsResponseTopic +} + +// ListPartitionReassignmentsResponseTopic contains the detailed result of +// ongoing reassignments for a topic. +type ListPartitionReassignmentsResponseTopic struct { + // Partitions contains result for topic partitions. + Partitions []ListPartitionReassignmentsResponsePartition +} + +// ListPartitionReassignmentsResponsePartition contains the detailed result of +// ongoing reassignments for a single partition. +type ListPartitionReassignmentsResponsePartition struct { + // PartitionIndex contains index of the partition. + PartitionIndex int + + // Replicas contains the current replica set. + Replicas []int + + // AddingReplicas contains the set of replicas we are currently adding. + AddingReplicas []int + + // RemovingReplicas contains the set of replicas we are currently removing. + RemovingReplicas []int +} + +func (c *Client) ListPartitionReassignments( + ctx context.Context, + req *ListPartitionReassignmentsRequest, +) (*ListPartitionReassignmentsResponse, error) { + apiReq := &listpartitionreassignments.Request{ + TimeoutMs: int32(req.Timeout.Milliseconds()), + } + + for topicName, topicReq := range req.Topics { + apiReq.Topics = append( + apiReq.Topics, + listpartitionreassignments.RequestTopic{ + Name: topicName, + PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes), + }, + ) + } + + protoResp, err := c.roundTrip( + ctx, + req.Addr, + apiReq, + ) + if err != nil { + return nil, err + } + apiResp := protoResp.(*listpartitionreassignments.Response) + + resp := &ListPartitionReassignmentsResponse{ + Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage), + Topics: make(map[string]ListPartitionReassignmentsResponseTopic), + } + + for _, topicResult := range apiResp.Topics { + respTopic := ListPartitionReassignmentsResponseTopic{} + for _, partitionResult := range topicResult.Partitions { + respTopic.Partitions = append( + respTopic.Partitions, + ListPartitionReassignmentsResponsePartition{ + PartitionIndex: int(partitionResult.PartitionIndex), + Replicas: int32ToIntArray(partitionResult.Replicas), + AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas), + RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas), + }, + ) + } + resp.Topics[topicResult.Name] = respTopic + } + + return resp, nil +} + +func intToInt32Array(arr []int) []int32 { + if arr == nil { + return nil + } + res := make([]int32, len(arr)) + for i := range arr { + res[i] = int32(arr[i]) + } + return res +} + +func int32ToIntArray(arr []int32) []int { + if arr == nil { + return nil + } + res := make([]int, len(arr)) + for i := range arr { + res[i] = int(arr[i]) + } + return res +} diff --git a/listpartitionreassignments_test.go b/listpartitionreassignments_test.go new file mode 100644 index 00000000..fd2c3178 --- /dev/null +++ b/listpartitionreassignments_test.go @@ -0,0 +1,50 @@ +package kafka + +import ( + "context" + "testing" + + ktesting "github.com/segmentio/kafka-go/testing" +) + +func TestClientListPartitionReassignments(t *testing.T) { + if !ktesting.KafkaIsAtLeast("2.4.0") { + return + } + + ctx := context.Background() + client, shutdown := newLocalClient() + defer shutdown() + + topic := makeTopic() + createTopic(t, topic, 2) + defer deleteTopic(t, topic) + + // Can't really get an ongoing partition reassignment with local Kafka, so just do a superficial test here. + resp, err := client.ListPartitionReassignments( + ctx, + &ListPartitionReassignmentsRequest{ + Topics: map[string]ListPartitionReassignmentsRequestTopic{ + topic: {PartitionIndexes: []int{0, 1}}, + }, + }, + ) + + if err != nil { + t.Fatal(err) + } + if resp.Error != nil { + t.Error( + "Unexpected error in response", + "expected", nil, + "got", resp.Error, + ) + } + if len(resp.Topics) != 0 { + t.Error( + "Unexpected length of topic results", + "expected", 0, + "got", len(resp.Topics), + ) + } +} diff --git a/protocol/listpartitionreassignments/listpartitionreassignments.go b/protocol/listpartitionreassignments/listpartitionreassignments.go new file mode 100644 index 00000000..d26a6410 --- /dev/null +++ b/protocol/listpartitionreassignments/listpartitionreassignments.go @@ -0,0 +1,70 @@ +package listpartitionreassignments + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListPartitionReassignments. + +type Request struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + TimeoutMs int32 `kafka:"min=v0,max=v0"` + Topics []RequestTopic `kafka:"min=v0,max=v0,nullable"` +} + +type RequestTopic struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Name string `kafka:"min=v0,max=v0"` + PartitionIndexes []int32 `kafka:"min=v0,max=v0"` +} + +func (r *Request) ApiKey() protocol.ApiKey { + return protocol.ListPartitionReassignments +} + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type Response struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` + ErrorCode int16 `kafka:"min=v0,max=v0"` + ErrorMessage string `kafka:"min=v0,max=v0,nullable"` + Topics []ResponseTopic `kafka:"min=v0,max=v0"` +} + +type ResponseTopic struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Name string `kafka:"min=v0,max=v0"` + Partitions []ResponsePartition `kafka:"min=v0,max=v0"` +} + +type ResponsePartition struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + PartitionIndex int32 `kafka:"min=v0,max=v0"` + Replicas []int32 `kafka:"min=v0,max=v0"` + AddingReplicas []int32 `kafka:"min=v0,max=v0"` + RemovingReplicas []int32 `kafka:"min=v0,max=v0"` +} + +func (r *Response) ApiKey() protocol.ApiKey { + return protocol.ListPartitionReassignments +} diff --git a/protocol/listpartitionreassignments/listpartitionreassignments_test.go b/protocol/listpartitionreassignments/listpartitionreassignments_test.go new file mode 100644 index 00000000..e32869fa --- /dev/null +++ b/protocol/listpartitionreassignments/listpartitionreassignments_test.go @@ -0,0 +1,41 @@ +package listpartitionreassignments_test + +import ( + "testing" + + "github.com/segmentio/kafka-go/protocol/listpartitionreassignments" + "github.com/segmentio/kafka-go/protocol/prototest" +) + +const ( + v0 = 0 +) + +func TestListPartitionReassignmentsRequest(t *testing.T) { + prototest.TestRequest(t, v0, &listpartitionreassignments.Request{ + Topics: []listpartitionreassignments.RequestTopic{ + { + Name: "topic-1", + PartitionIndexes: []int32{1, 2, 3}, + }, + }, + }) +} + +func TestListPartitionReassignmentsResponse(t *testing.T) { + prototest.TestResponse(t, v0, &listpartitionreassignments.Response{ + Topics: []listpartitionreassignments.ResponseTopic{ + { + Name: "topic-1", + Partitions: []listpartitionreassignments.ResponsePartition{ + { + PartitionIndex: 1, + Replicas: []int32{1, 2, 3}, + AddingReplicas: []int32{4, 5, 6}, + RemovingReplicas: []int32{7, 8, 9}, + }, + }, + }, + }, + }) +}