Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(proto): doc CreateTopics/JoinGroup fields #2627

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions create_topics_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
)

type CreateTopicsRequest struct {
// Version defines the protocol version to use for encode and decode
Version int16

// TopicDetails contains the topics to create.
TopicDetails map[string]*TopicDetail
Timeout time.Duration
// Timeout contains how long to wait before timing out the request.
Timeout time.Duration
// ValidateOnly if true, check that the topics can be created as specified,
// but don't create anything.
ValidateOnly bool
}

Expand Down Expand Up @@ -103,10 +107,19 @@ func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
}

type TopicDetail struct {
NumPartitions int32
// NumPartitions contains the number of partitions to create in the topic, or
// -1 if we are either specifying a manual partition assignment or using the
// default partitions.
NumPartitions int32
// ReplicationFactor contains the number of replicas to create for each
// partition in the topic, or -1 if we are either specifying a manual
// partition assignment or using the default replication factor.
ReplicationFactor int16
// ReplicaAssignment contains the manual partition assignment, or the empty
// array if we are using automatic assignment.
ReplicaAssignment map[int32][]int32
ConfigEntries map[string]*string
// ConfigEntries contains the custom topic configurations to set.
ConfigEntries map[string]*string
}

func (t *TopicDetail) encode(pe packetEncoder) error {
Expand Down
8 changes: 6 additions & 2 deletions create_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import (
)

type CreateTopicsResponse struct {
Version int16
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTime contains the duration for which the request was throttled due
// to a quota violation, or zero if the request did not violate any quota.
ThrottleTime time.Duration
TopicErrors map[string]*TopicError
// TopicErrors contains a map of any errors for the topics we tried to create.
TopicErrors map[string]*TopicError
}

func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
Expand Down
36 changes: 27 additions & 9 deletions join_group_request.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package sarama

type GroupProtocol struct {
Name string
// Name contains the protocol name.
Name string
// Metadata contains the protocol metadata.
Metadata []byte
}

Expand All @@ -25,14 +27,30 @@ func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
}

type JoinGroupRequest struct {
Version int16
GroupId string
SessionTimeout int32
RebalanceTimeout int32
MemberId string
GroupInstanceId *string
ProtocolType string
GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols
// Version defines the protocol version to use for encode and decode
Version int16
// GroupId contains the group identifier.
GroupId string
// SessionTimeout specifies that the coordinator should consider the consumer
// dead if it receives no heartbeat after this timeout in milliseconds.
SessionTimeout int32
// RebalanceTimeout contains the maximum time in milliseconds that the
// coordinator will wait for each member to rejoin when rebalancing the
// group.
RebalanceTimeout int32
// MemberId contains the member id assigned by the group coordinator.
MemberId string
// GroupInstanceId contains the unique identifier of the consumer instance
// provided by end user.
GroupInstanceId *string
// ProtocolType contains the unique name the for class of protocols
// implemented by the group we want to join.
ProtocolType string
// GroupProtocols contains the list of protocols that the member supports.
// deprecated; use OrderedGroupProtocols
GroupProtocols map[string][]byte
// OrderedGroupProtocols contains an ordered list of protocols that the member
// supports.
OrderedGroupProtocols []*GroupProtocol
}

Expand Down
31 changes: 22 additions & 9 deletions join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,33 @@ package sarama
import "time"

type JoinGroupResponse struct {
Version int16
ThrottleTime int32
Err KError
GenerationId int32
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTime contains the duration for which the request was throttled due
// to a quota violation, or zero if the request did not violate any quota.
ThrottleTime int32
// Err contains the error code, or 0 if there was no error.
Err KError
// GenerationId contains the generation ID of the group.
GenerationId int32
// GroupProtocol contains the group protocol selected by the coordinator.
GroupProtocol string
LeaderId string
MemberId string
Members []GroupMember
// LeaderId contains the leader of the group.
LeaderId string
// MemberId contains the member ID assigned by the group coordinator.
MemberId string
// Members contains the per-group-member information.
Members []GroupMember
}

type GroupMember struct {
MemberId string
// MemberId contains the group member ID.
MemberId string
// GroupInstanceId contains the unique identifier of the consumer instance
// provided by end user.
GroupInstanceId *string
Metadata []byte
// Metadata contains the group member metadata.
Metadata []byte
}

func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
Expand Down