-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#109] Pulsar binary protocol documentation #111
Conversation
CLA is valid! |
1 similar comment
CLA is valid! |
<!-- /TOC --> | ||
|
||
Pulsar uses a custom binary protocol for communications between | ||
producers/consumers and brokers. The protocol was design to support all the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"is" instead of "was"
formatted as binary [Protocol Buffers](https://developers.google.com/protocol-buffers/) | ||
messages. | ||
|
||
The format of the protobuf commands is specified in the `PulsarApi.proto` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"The format of protobuf command"
[pulsar-common/src/main/proto/PulsarApi.proto](https://github.com/yahoo/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto) | ||
|
||
Commands for different producers and consumers can be sent through the same | ||
connection, without any restriction to the interleaving of such commands. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commands for different producers and consumers can be interleaved and sent through the same connection without any restriction.
|
||
|
||
All the commands are embedded in a [`BaseCommand`](https://github.com/yahoo/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto#L283) protobuf object that includes a type enum and all the possible commands as | ||
optional fields. At any time, one `BaseCommand` can only have set a single |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call it BaseCommand and SubCommand? Then it would be:
All the commands are embedded in a BaseCommand protobuf object that includes a type enum and all possible SubCommands as optional field. At any given time, one BaseCommand can only have one SubCommand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SubCommand
would imply there's a class with that name.. what about referring to "sub-command" ?
@saandrews updated |
[TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] | ||
``` | ||
|
||
* `TOTAL_SIZE` → Size of the frame in bytes. Counting everything that come after it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comes after
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
The maximum size for a single frame is 5 Mbytes. | ||
|
||
In Pulsar protocol we have 2 types of commands: | ||
1. Simple commands that do not carry payloads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commads here is plural
``` | ||
|
||
* `TOTAL_SIZE` → Size of the frame in bytes. Counting everything that come after it | ||
* `CMD_SIZE` → Size of the protobuf serialized command |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also specify(4 bytes) like you have it for magic_number. Even though its called out before, adding it here would be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, added the size to all the sizes..
|
||
Fields: | ||
* `producer_name` → Name of the producer that published the message | ||
* `sequence_id` → Sequence id of the messages, assigned by producer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* `publish_time` → Publish timestamp. Number of ms since Jan 1st 1970 in UTC | ||
* `properties` → Sequence of `Pair<String, String>`. These are application | ||
defined keys and values with no meaning to Pulsar | ||
* `replicated_from` → *(optional)* Indicated that the messages has been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indicates the name of the cluster which replicated the message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Command `Message` is used by the broker to push messages to an existing consumer, | ||
within the limits of the given permits. | ||
|
||
This command is used in a frame that includes as well the message payload, for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that includes the message payload as well
* `ack_type` → Type of acknowledgment: `Individual` or `Cumulative` | ||
* `message_id` → Id of the message to acknowledge | ||
* `validation_error` → *(optional)* Indicates that the consumer has discarded | ||
the messages because of some problem like: `UncompressedSizeCorruption`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has discarded the messages due to:
|
||
##### Command RedeliverUnacknowledgedMessages | ||
|
||
A consumer can ask the broker to redeliver some or all the pending messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some or all of the
pending messages. | ||
|
||
With redelivery can be sent to the same consumer or spread across all available | ||
consumers in the case of a shared subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of Shared Subscription, redelivered messages can be sent to the same consumer or to all available consumers.
* `topic` → Topic name to lookup | ||
* `request_id` → Id of the request that will be passed with its response | ||
* `authoritative` → Flag that needs to be set to true only after a redirection | ||
response has the same flag set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it say,
only after a redirection response has this flag set to true?
@saandrews Updated again |
👍 |
Fixes apache#111 Subscriber.Subscription() should return the subscription name instead of returns ""
…pache#112) Fix apache#111 If a read for a topic is not sequentially, there would be several non-durable cursor created for each jump read. and made the backlog not cleared on time. this change check the expired cursor and remove them in the TopicConsumerManager. * remove expired reader cursor * mv scheduled task from KafkaTopicConsumerManager to KafkaTopicManager * fix jar position in tar.gz
Motivation
Added documentation for binary protocol specification as discussed on issue #109