diff --git a/create_topics_request.go b/create_topics_request.go index 7bad5427e..8382d17c2 100644 --- a/create_topics_request.go +++ b/create_topics_request.go @@ -5,10 +5,14 @@ import ( ) type CreateTopicsRequest struct { + // Version defines the protocol version to use for encode and decode Version int16 - + // TopicDetails contains the topics to create. TopicDetails map[string]*TopicDetail - Timeout time.Duration + // Timeout contains how long to wait before timing out the request. + Timeout time.Duration + // ValidateOnly if true, check that the topics can be created as specified, + // but don't create anything. ValidateOnly bool } @@ -103,10 +107,19 @@ func (c *CreateTopicsRequest) requiredVersion() KafkaVersion { } type TopicDetail struct { - NumPartitions int32 + // NumPartitions contains the number of partitions to create in the topic, or + // -1 if we are either specifying a manual partition assignment or using the + // default partitions. + NumPartitions int32 + // ReplicationFactor contains the number of replicas to create for each + // partition in the topic, or -1 if we are either specifying a manual + // partition assignment or using the default replication factor. ReplicationFactor int16 + // ReplicaAssignment contains the manual partition assignment, or the empty + // array if we are using automatic assignment. ReplicaAssignment map[int32][]int32 - ConfigEntries map[string]*string + // ConfigEntries contains the custom topic configurations to set. + ConfigEntries map[string]*string } func (t *TopicDetail) encode(pe packetEncoder) error { diff --git a/create_topics_response.go b/create_topics_response.go index d8c9a174e..85bd4c0b9 100644 --- a/create_topics_response.go +++ b/create_topics_response.go @@ -6,9 +6,13 @@ import ( ) type CreateTopicsResponse struct { - Version int16 + // Version defines the protocol version to use for encode and decode + Version int16 + // ThrottleTime contains the duration for which the request was throttled due + // to a quota violation, or zero if the request did not violate any quota. ThrottleTime time.Duration - TopicErrors map[string]*TopicError + // TopicErrors contains a map of any errors for the topics we tried to create. + TopicErrors map[string]*TopicError } func (c *CreateTopicsResponse) encode(pe packetEncoder) error { diff --git a/join_group_request.go b/join_group_request.go index b574ce475..3ab69c498 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -1,7 +1,9 @@ package sarama type GroupProtocol struct { - Name string + // Name contains the protocol name. + Name string + // Metadata contains the protocol metadata. Metadata []byte } @@ -25,14 +27,30 @@ func (p *GroupProtocol) encode(pe packetEncoder) (err error) { } type JoinGroupRequest struct { - Version int16 - GroupId string - SessionTimeout int32 - RebalanceTimeout int32 - MemberId string - GroupInstanceId *string - ProtocolType string - GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols + // Version defines the protocol version to use for encode and decode + Version int16 + // GroupId contains the group identifier. + GroupId string + // SessionTimeout specifies that the coordinator should consider the consumer + // dead if it receives no heartbeat after this timeout in milliseconds. + SessionTimeout int32 + // RebalanceTimeout contains the maximum time in milliseconds that the + // coordinator will wait for each member to rejoin when rebalancing the + // group. + RebalanceTimeout int32 + // MemberId contains the member id assigned by the group coordinator. + MemberId string + // GroupInstanceId contains the unique identifier of the consumer instance + // provided by end user. + GroupInstanceId *string + // ProtocolType contains the unique name the for class of protocols + // implemented by the group we want to join. + ProtocolType string + // GroupProtocols contains the list of protocols that the member supports. + // deprecated; use OrderedGroupProtocols + GroupProtocols map[string][]byte + // OrderedGroupProtocols contains an ordered list of protocols that the member + // supports. OrderedGroupProtocols []*GroupProtocol } diff --git a/join_group_response.go b/join_group_response.go index 76645242e..643fddc6b 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -3,20 +3,33 @@ package sarama import "time" type JoinGroupResponse struct { - Version int16 - ThrottleTime int32 - Err KError - GenerationId int32 + // Version defines the protocol version to use for encode and decode + Version int16 + // ThrottleTime contains the duration for which the request was throttled due + // to a quota violation, or zero if the request did not violate any quota. + ThrottleTime int32 + // Err contains the error code, or 0 if there was no error. + Err KError + // GenerationId contains the generation ID of the group. + GenerationId int32 + // GroupProtocol contains the group protocol selected by the coordinator. GroupProtocol string - LeaderId string - MemberId string - Members []GroupMember + // LeaderId contains the leader of the group. + LeaderId string + // MemberId contains the member ID assigned by the group coordinator. + MemberId string + // Members contains the per-group-member information. + Members []GroupMember } type GroupMember struct { - MemberId string + // MemberId contains the group member ID. + MemberId string + // GroupInstanceId contains the unique identifier of the consumer instance + // provided by end user. GroupInstanceId *string - Metadata []byte + // Metadata contains the group member metadata. + Metadata []byte } func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {