Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase topic partition count API #533

Merged
merged 13 commits into from
Jan 24, 2018
6 changes: 6 additions & 0 deletions lib/kafka/broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ def delete_topics(**options)
send_request(request)
end

def create_partitions(**options)
request = Protocol::CreatePartitionsRequest.new(**options)

send_request(request)
end

def api_versions
request = Protocol::ApiVersionsRequest.new

Expand Down
21 changes: 21 additions & 0 deletions lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,18 @@ def delete_topic(name, timeout: 30)
@cluster.delete_topic(name, timeout: timeout)
end

# Create partitions for a topic.
#
# @param name [String] the name of the topic.
# @param num_partitions [Integer] the number of desired partitions for
# the topic
# @param timeout [Integer] a duration of time to wait for the new
# partitions to be added.
# @return [nil]
def create_partitions_for(name, num_partitions: 1, timeout: 30)
@cluster.create_partitions_for(name, num_partitions: num_partitions, timeout: timeout)
end

# Lists all topics in the cluster.
#
# @return [Array<String>] the list of topic names.
Expand Down Expand Up @@ -526,6 +538,15 @@ def last_offsets_for(*topics)
}.to_h
end

# Check whether current cluster supports a specific version or not
#
# @param api_key [Integer] API key.
# @param version [Integer] API version.
# @return [Boolean]
def supports_api?(api_key, version = nil)
@cluster.supports_api?(api_key, version)
end

def apis
@cluster.apis
end
Expand Down
31 changes: 31 additions & 0 deletions lib/kafka/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ def api_info(api_key)
apis.find {|api| api.api_key == api_key }
end

def supports_api?(api_key, version = nil)
info = api_info(api_key)
if info.nil?
return false
elsif version.nil?
return true
else
return (info.min_version..info.max_version).include?(version)
end
end

def apis
@apis ||=
begin
Expand Down Expand Up @@ -199,6 +210,26 @@ def delete_topic(name, timeout:)
@logger.info "Topic `#{name}` was deleted"
end

def create_partitions_for(name, num_partitions:, timeout:)
options = {
topics: [[name, num_partitions, nil]],
timeout: timeout
}

broker = controller_broker

@logger.info "Creating #{num_partitions} partition(s) for topic `#{name}` using controller broker #{broker}"

response = broker.create_partitions(**options)

response.errors.each do |topic, error_code, error_message|
Protocol.handle_error(error_code, error_message)
end
mark_as_stale!

@logger.info "Topic `#{name}` was updated"
end

def resolve_offsets(topic, partitions, offset)
add_target_topics([topic])
refresh_metadata_if_necessary!
Expand Down
10 changes: 7 additions & 3 deletions lib/kafka/protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module Protocol
API_VERSIONS_API = 18
CREATE_TOPICS_API = 19
DELETE_TOPICS_API = 20
CREATE_PARTITIONS_API = 37

# A mapping from numeric API keys to symbolic API names.
APIS = {
Expand All @@ -45,6 +46,7 @@ module Protocol
API_VERSIONS_API => :api_versions,
CREATE_TOPICS_API => :create_topics,
DELETE_TOPICS_API => :delete_topics,
CREATE_PARTITIONS_API => :create_partitions
}

# A mapping from numeric error codes to exception classes.
Expand Down Expand Up @@ -95,13 +97,13 @@ module Protocol
# @param error_code Integer
# @raise [ProtocolError]
# @return [nil]
def self.handle_error(error_code)
def self.handle_error(error_code, error_message = nil)
if error_code == 0
# No errors, yay!
elsif error = ERRORS[error_code]
raise error
raise error, error_message
else
raise UnknownError, "Unknown error with code #{error_code}"
raise UnknownError, "Unknown error with code #{error_code} #{error_message}"
end
end

Expand Down Expand Up @@ -145,3 +147,5 @@ def self.api_name(api_key)
require "kafka/protocol/create_topics_response"
require "kafka/protocol/delete_topics_request"
require "kafka/protocol/delete_topics_response"
require "kafka/protocol/create_partitions_request"
require "kafka/protocol/create_partitions_response"
40 changes: 40 additions & 0 deletions lib/kafka/protocol/create_partitions_request.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Kafka
module Protocol

class CreatePartitionsRequest
def initialize(topics:, timeout:)
@topics, @timeout = topics, timeout
end

def api_key
CREATE_PARTITIONS_API
end

def api_version
0
end

def response_class
Protocol::CreatePartitionsResponse
end

def encode(encoder)
encoder.write_array(@topics) do |topic, count, assignments|
encoder.write_string(topic)
encoder.write_int32(count)
encoder.write_array(assignments) do |assignment|
encoder.write_array(assignment) do |broker|
encoder.write_int32(broker)
end
end
end
# Timeout is in ms.
encoder.write_int32(@timeout * 1000)
# validate_only. There isn't any use case for this in real life. So
# let's ignore it for now
encoder.write_boolean(false)
end
end

end
end
26 changes: 26 additions & 0 deletions lib/kafka/protocol/create_partitions_response.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module Kafka
module Protocol

class CreatePartitionsResponse
attr_reader :errors

def initialize(throttle_time_ms:, errors:)
@throttle_time_ms = throttle_time_ms
@errors = errors
end

def self.decode(decoder)
throttle_time_ms = decoder.int32
errors = decoder.array do
topic = decoder.string
error_code = decoder.int16
error_message = decoder.string
[topic, error_code, error_message]
end

new(throttle_time_ms: throttle_time_ms, errors: errors)
end
end

end
end
2 changes: 1 addition & 1 deletion lib/kafka/protocol/encoder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def write(bytes)
# @param boolean [Boolean]
# @return [nil]
def write_boolean(boolean)
write(boolean ? 0x1 : 0x0)
boolean ? write_int8(1) : write_int8(0)
end

# Writes an 8-bit integer to the IO object.
Expand Down
8 changes: 8 additions & 0 deletions spec/functional/api_versions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,12 @@
expect(produce_api.min_version).to eq 0
expect(produce_api.max_version).to be >= 2
end

example "checks cluster API support" do
expect(kafka.supports_api?(Kafka::Protocol::PRODUCE_API)).to eql(true)
expect(kafka.supports_api?(Kafka::Protocol::PRODUCE_API, 0)).to eql(true)
expect(kafka.supports_api?(Kafka::Protocol::PRODUCE_API, 100)).to eql(false)
expect(kafka.supports_api?(100)).to eql(false)
expect(kafka.supports_api?(100, 100)).to eql(false)
end
end
13 changes: 13 additions & 0 deletions spec/functional/topic_management_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,17 @@
kafka.delete_topic(topic)
expect(kafka.has_topic?(topic)).to eql(false)
end

example "create partitions" do
unless kafka.supports_api?(Kafka::Protocol::CREATE_PARTITIONS_API)
skip("This Kafka version not support ")
end
topic = generate_topic_name

kafka.create_topic(topic, num_partitions: 3)
expect(kafka.partitions_for(topic)).to eq 3

kafka.create_partitions_for(topic, num_partitions: 10)
expect(kafka.partitions_for(topic)).to eq 10
end
end