Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting the max bytes to fetch per request #468

Merged
merged 2 commits into from
Nov 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_by
cluster: @cluster,
logger: @logger,
min_bytes: min_bytes,
max_bytes: max_bytes,
max_wait_time: max_wait_time,
)

Expand Down
13 changes: 10 additions & 3 deletions lib/kafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ def paused?(topic, partition)
# @param min_bytes [Integer] the minimum number of bytes to read before
# returning messages from each broker; if `max_wait_time` is reached, this
# is ignored.
# @param max_bytes [Integer] the maximum number of bytes to read before
# returning messages from each broker.
# @param max_wait_time [Integer, Float] the maximum duration of time to wait before
# returning messages from each broker, in seconds.
# @param automatically_mark_as_processed [Boolean] whether to automatically
Expand All @@ -190,10 +192,11 @@ def paused?(topic, partition)
# The original exception will be returned by calling `#cause` on the
# {Kafka::ProcessingError} instance.
# @return [nil]
def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed: true)
def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
consumer_loop do
batches = fetch_batches(
min_bytes: min_bytes,
max_bytes: max_bytes,
max_wait_time: max_wait_time,
automatically_mark_as_processed: automatically_mark_as_processed
)
Expand Down Expand Up @@ -253,6 +256,8 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed
# @param min_bytes [Integer] the minimum number of bytes to read before
# returning messages from each broker; if `max_wait_time` is reached, this
# is ignored.
# @param max_bytes [Integer] the maximum number of bytes to read before
# returning messages from each broker.
# @param max_wait_time [Integer, Float] the maximum duration of time to wait before
# returning messages from each broker, in seconds.
# @param automatically_mark_as_processed [Boolean] whether to automatically
Expand All @@ -261,10 +266,11 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed
# messages can be committed to Kafka.
# @yieldparam batch [Kafka::FetchedBatch] a message batch fetched from Kafka.
# @return [nil]
def each_batch(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed: true)
def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
consumer_loop do
batches = fetch_batches(
min_bytes: min_bytes,
max_bytes: max_bytes,
max_wait_time: max_wait_time,
automatically_mark_as_processed: automatically_mark_as_processed
)
Expand Down Expand Up @@ -400,7 +406,7 @@ def join_group
end
end

def fetch_batches(min_bytes:, max_wait_time:, automatically_mark_as_processed:)
def fetch_batches(min_bytes:, max_bytes:, max_wait_time:, automatically_mark_as_processed:)
join_group unless @group.member?

subscribed_partitions = @group.subscribed_partitions
Expand All @@ -411,6 +417,7 @@ def fetch_batches(min_bytes:, max_wait_time:, automatically_mark_as_processed:)
cluster: @cluster,
logger: @logger,
min_bytes: min_bytes,
max_bytes: max_bytes,
max_wait_time: max_wait_time,
)

Expand Down
4 changes: 3 additions & 1 deletion lib/kafka/fetch_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ module Kafka
# operation.execute
#
class FetchOperation
def initialize(cluster:, logger:, min_bytes: 1, max_wait_time: 5)
def initialize(cluster:, logger:, min_bytes: 1, max_bytes: 10485760, max_wait_time: 5)
@cluster = cluster
@logger = logger
@min_bytes = min_bytes
@max_bytes = max_bytes
@max_wait_time = max_wait_time
@topics = {}
end
Expand Down Expand Up @@ -66,6 +67,7 @@ def execute
options = {
max_wait_time: @max_wait_time * 1000, # Kafka expects ms, not secs
min_bytes: @min_bytes,
max_bytes: @max_bytes,
topics: topics,
}

Expand Down
6 changes: 4 additions & 2 deletions lib/kafka/protocol/fetch_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ class FetchRequest
# @param max_wait_time [Integer]
# @param min_bytes [Integer]
# @param topics [Hash]
def initialize(max_wait_time:, min_bytes:, topics:)
def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:)
@replica_id = REPLICA_ID
@max_wait_time = max_wait_time
@min_bytes = min_bytes
@max_bytes = max_bytes
@topics = topics
end

Expand All @@ -31,7 +32,7 @@ def api_key
end

def api_version
2
3
end

def response_class
Expand All @@ -42,6 +43,7 @@ def encode(encoder)
encoder.write_int32(@replica_id)
encoder.write_int32(@max_wait_time)
encoder.write_int32(@min_bytes)
encoder.write_int32(@max_bytes)

encoder.write_array(@topics) do |topic, partitions|
encoder.write_string(topic)
Expand Down
1 change: 1 addition & 0 deletions spec/broker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def send_request(request)
actual_response = broker.fetch_messages(
max_wait_time: 0,
min_bytes: 0,
max_bytes: 10 * 1024,
topics: {}
)

Expand Down