Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DeleteTopics message (API#20) #338

Merged
merged 6 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ clusters.
./all_tests.sh
```

##### Kafka >= 0.9.0
##### Kafka = 0.9.0

The 0.9 client includes functionality that cannot be tested with older
clusters.
Expand Down
14 changes: 13 additions & 1 deletion lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule KafkaEx do
alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse
alias KafkaEx.Protocol.CreateTopics.Request, as: CreateTopicsRequest
alias KafkaEx.Protocol.CreateTopics.Response, as: CreateTopicsResponse
alias KafkaEx.Protocol.DeleteTopics.Response, as: DeleteTopicsResponse
alias KafkaEx.Protocol.ApiVersions.Response, as: ApiVersionsResponse
alias KafkaEx.Server
alias KafkaEx.Stream
Expand Down Expand Up @@ -614,10 +615,21 @@ defmodule KafkaEx do
CreateTopicsResponse.t()
def create_topics(requests, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker())
timeout = Keyword.get(opts, :timeout, 4000)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:create_topics, requests, timeout})
end

@doc """
Delete topics. Must provide a list of topic names.
"""
@spec delete_topics([String.t()], Keyword.t()) ::
DeleteTopicsResponse.t()
def delete_topics(topics, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker())
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:delete_topics, topics, timeout})
end

# OTP API
def start(_type, _args) do
max_restarts = Application.get_env(:kafka_ex, :max_restarts, 10)
Expand Down
21 changes: 15 additions & 6 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,12 @@ defmodule KafkaEx.GenConsumer do
Any other return value will cause the `start_link/5` to return `{:error,
error}` and the process to exit.
"""
@callback init(topic :: binary, partition :: non_neg_integer, extra_args :: map()) ::
{:ok, state :: term}

@callback init(
topic :: binary,
partition :: non_neg_integer,
extra_args :: map()
) ::
{:ok, state :: term}

@doc """
Invoked for each message set consumed from a Kafka topic partition.
Expand Down Expand Up @@ -367,7 +370,11 @@ defmodule KafkaEx.GenConsumer do
{:noreply, consumer_state}
end

defoverridable init: 2, init: 3, handle_call: 3, handle_cast: 2, handle_info: 2
defoverridable init: 2,
init: 3,
handle_call: 3,
handle_cast: 2,
handle_info: 2
end
end

Expand Down Expand Up @@ -518,13 +525,15 @@ defmodule KafkaEx.GenConsumer do
Application.get_env(:kafka_ex, :auto_offset_reset, @auto_offset_reset)
)

extra_consumer_args =
extra_consumer_args =
Keyword.get(
opts,
:extra_consumer_args
)

{:ok, consumer_state} = consumer_module.init(topic, partition, extra_consumer_args)
{:ok, consumer_state} =
consumer_module.init(topic, partition, extra_consumer_args)

worker_opts = Keyword.take(opts, [:uris])

{:ok, worker_name} =
Expand Down
3 changes: 2 additions & 1 deletion lib/kafka_ex/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule KafkaEx.Protocol do
leave_group: 13,
sync_group: 14,
api_versions: 18,
create_topics: 19
create_topics: 19,
delete_topics: 20
}

# DescribeConfigs 32
Expand Down
24 changes: 24 additions & 0 deletions lib/kafka_ex/protocol/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,28 @@ defmodule KafkaEx.Protocol.Common do
{items, data_after_array} = read_array(num_items - 1, rest, read_one)
{[item | items], data_after_array}
end

@spec encode_nullable_string(String.t()) :: binary
def encode_nullable_string(text) do
case text do
nil -> <<-1::16-signed>>
_ -> encode_string(text)
end
end

@spec encode_string(String.t()) :: binary
def encode_string(text) do
<<byte_size(text)::16-signed, text::binary>>
end

def map_encode(elems, function) do
if nil == elems or [] == elems do
<<0::32-signed>>
else
<<length(elems)::32-signed>> <>
(elems
|> Enum.map(function)
|> Enum.reduce(&(&1 <> &2)))
end
end
end
25 changes: 1 addition & 24 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule KafkaEx.Protocol.CreateTopics do
alias KafkaEx.Protocol
import KafkaEx.Protocol.Common
@supported_versions_range {0, 0}

@moduledoc """
Expand Down Expand Up @@ -126,30 +127,6 @@ defmodule KafkaEx.Protocol.CreateTopics do
encode_nullable_string(config_entry.config_value)
end

@spec encode_nullable_string(String.t()) :: binary
defp encode_nullable_string(text) do
case text do
nil -> <<-1::16-signed>>
_ -> encode_string(text)
end
end

@spec encode_string(String.t()) :: binary
defp encode_string(text) do
<<byte_size(text)::16-signed, text::binary>>
end

defp map_encode(elems, function) do
if nil == elems or [] == elems do
<<0::32-signed>>
else
<<length(elems)::32-signed>> <>
(elems
|> Enum.map(function)
|> Enum.reduce(&(&1 <> &2)))
end
end

@spec parse_response(binary, integer) :: [] | Response.t()
def parse_response(
<<_correlation_id::32-signed, topic_errors_count::32-signed,
Expand Down
93 changes: 93 additions & 0 deletions lib/kafka_ex/protocol/delete_topics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
defmodule KafkaEx.Protocol.DeleteTopics do
alias KafkaEx.Protocol
import KafkaEx.Protocol.Common
@supported_versions_range {0, 0}

@moduledoc """
Implementation of the Kafka DeleteTopics request and response APIs

See: https://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
"""

# DeleteTopics Request (Version: 0) => [topics] timeout
# topics => STRING
# timeout => INT32

# DeleteTopics Response (Version: 0) => [topic_error_codes]
# topic_error_codes => topic error_code
# topic => STRING
# error_code => INT16

defmodule Request do
@moduledoc false
defstruct topics: nil, timeout: nil
@type t :: %Request{topics: [String.t()], timeout: integer}
end

defmodule TopicError do
@moduledoc false
defstruct topic_name: nil, error_code: nil
@type t :: %TopicError{topic_name: binary, error_code: atom}
end

defmodule Response do
@moduledoc false
defstruct topic_errors: nil
@type t :: %Response{topic_errors: [TopicError]}
end

def api_version(api_versions) do
KafkaEx.ApiVersions.find_api_version(
api_versions,
:delete_topics,
@supported_versions_range
)
end

@spec create_request(integer, binary, Request.t(), integer) :: binary
def create_request(
correlation_id,
client_id,
delete_topics_request,
api_version
)

def create_request(correlation_id, client_id, delete_topics_request, 0) do
Protocol.create_request(:delete_topics, correlation_id, client_id) <>
encode_topics(delete_topics_request.topics) <>
<<delete_topics_request.timeout::32-signed>>
end

@spec encode_topics([String.t()]) :: binary
defp encode_topics(topics) do
topics |> map_encode(&encode_string/1)
end

@spec parse_response(binary, integer) :: [] | Response.t()
def parse_response(
<<_correlation_id::32-signed, topic_errors_count::32-signed,
topic_errors::binary>>,
0
) do
%Response{
topic_errors: parse_topic_errors(topic_errors_count, topic_errors)
}
end

@spec parse_topic_errors(integer, binary) :: [TopicError.t()]
defp parse_topic_errors(0, _), do: []

defp parse_topic_errors(
topic_errors_count,
<<topic_name_size::16-signed, topic_name::size(topic_name_size)-binary,
error_code::16-signed, rest::binary>>
) do
[
%TopicError{
topic_name: topic_name,
error_code: Protocol.error(error_code)
}
| parse_topic_errors(topic_errors_count - 1, rest)
]
end
end
33 changes: 31 additions & 2 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ defmodule KafkaEx.Server do
state :: State.t()
) :: {:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_delete_topics(
[String.t()],
network_timeout :: integer,
state :: State.t()
) :: {:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_api_versions(state :: State.t()) :: {:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_server_update_metadata(state :: State.t()) ::
Expand Down Expand Up @@ -334,6 +340,10 @@ defmodule KafkaEx.Server do
kafka_create_topics(requests, network_timeout, state)
end

def handle_call({:delete_topics, topics, network_timeout}, _from, state) do
kafka_delete_topics(topics, network_timeout, state)
end

def handle_call({:api_versions}, _from, state) do
kafka_api_versions(state)
end
Expand Down Expand Up @@ -839,8 +849,27 @@ defmodule KafkaEx.Server do
config_sync_timeout()
)
|> case do
{:error, reason} -> {:error, reason}
response -> module.parse_response(response)
{:error, reason} ->
{:error, reason}

response ->
try do
module.parse_response(response)
rescue
_ ->
Logger.error(
"Failed to parse a response from the server: #{
inspect(response)
}"
)

Kernel.reraise(
"Parse error during #{inspect(module)}.parse_response. Couldn't parse: #{
inspect(response)
}",
System.stacktrace()
)
end
end

state_out = State.increment_correlation_id(updated_state)
Expand Down
50 changes: 49 additions & 1 deletion lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule KafkaEx.Server0P10AndLater do
"""
use KafkaEx.Server
alias KafkaEx.Protocol.CreateTopics
alias KafkaEx.Protocol.DeleteTopics
alias KafkaEx.Protocol.ApiVersions
alias KafkaEx.Server0P8P2
alias KafkaEx.Server0P9P0
Expand Down Expand Up @@ -179,6 +180,53 @@ defmodule KafkaEx.Server0P10AndLater do
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end

def kafka_delete_topics(topics, network_timeout, state) do
api_version =
case DeleteTopics.api_version(state.api_versions) do
{:ok, api_version} ->
api_version

_ ->
raise "DeleteTopic is not supported in this version of Kafka, or the versions supported by the client do not match the ones supported by the server."
end

main_request =
DeleteTopics.create_request(
state.correlation_id,
@client_id,
%DeleteTopics.Request{
topics: topics,
timeout: config_sync_timeout(network_timeout)
},
api_version
)

broker = state.brokers |> Enum.find(& &1.is_controller)

{response, state} =
case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}

_ ->
response =
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} -> {:error, reason}
response -> DeleteTopics.parse_response(response, api_version)
end

{response, %{state | correlation_id: state.correlation_id + 1}}
end

{:reply, response, state}
end

def kafka_create_topics(requests, network_timeout, state) do
api_version =
case CreateTopics.api_version(state.api_versions) do
Expand All @@ -191,7 +239,7 @@ defmodule KafkaEx.Server0P10AndLater do

create_topics_request = %CreateTopics.Request{
create_topic_requests: requests,
timeout: network_timeout
timeout: config_sync_timeout(network_timeout)
}

main_request =
Expand Down
Loading