Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and test the 0.8.0 implementation #264

Merged
merged 7 commits into from
Dec 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,18 @@ defmodule KafkaEx do
}
end

defp build_worker_options(worker_init) do
@doc """
Builds options to be used with workers

Merges the given options with defaults from the application env config.
Returns p{:error, :invalid_consumer_options}` if the consumer group
configuation is invalid, and `{:ok, merged_options}` otherwise.

Note this happens automatically when using `KafkaEx.create_worker`.
"""
@spec build_worker_options(worker_init) ::
{:ok, worker_init} | {:error, :invalid_consumer_group}
def build_worker_options(worker_init) do
defaults = [
uris: Application.get_env(:kafka_ex, :brokers),
consumer_group: Application.get_env(:kafka_ex, :consumer_group),
Expand Down
123 changes: 123 additions & 0 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule KafkaEx.Server do
Defines the KafkaEx.Server behavior that all Kafka API servers must implement, this module also provides some common callback functions that are injected into the servers that `use` it.
"""

alias KafkaEx.NetworkClient
alias KafkaEx.Protocol.ConsumerMetadata
alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest
Expand All @@ -21,6 +22,9 @@ defmodule KafkaEx.Server do
defmodule State do
@moduledoc false

alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Protocol.Metadata.Broker

defstruct(
metadata: %Metadata.Response{},
brokers: [],
Expand All @@ -47,6 +51,21 @@ defmodule KafkaEx.Server do
ssl_options: KafkaEx.ssl_options,
use_ssl: boolean
}

@spec increment_correlation_id(t) :: t
def increment_correlation_id(state = %State{correlation_id: cid}) do

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File has the variable name before the pattern while most of the files have the variable name after the pattern when naming parameter pattern matches

%{state | correlation_id: cid + 1}
end

@spec broker_for_partition(t, binary, integer) :: Broker.t | nil
def broker_for_partition(state, topic, partition) do
MetadataResponse.broker_for_topic(
state.metadata,
state.brokers,
topic,
partition
)
end
end

@callback kafka_server_init(args :: [term]) ::
Expand Down Expand Up @@ -376,6 +395,110 @@ defmodule KafkaEx.Server do
kafka_server_metadata: 2, kafka_server_update_metadata: 1,
]

defp kafka_common_init(args, name) do
use_ssl = Keyword.get(args, :use_ssl, false)
ssl_options = Keyword.get(args, :ssl_options, [])

uris = Keyword.get(args, :uris, [])
metadata_update_interval = Keyword.get(
args,
:metadata_update_interval,
@metadata_update_interval
)

brokers = for {host, port} <- uris do
connect_broker(host, port, ssl_options, use_ssl)
end

{correlation_id, metadata} = retrieve_metadata(
brokers,
0,
config_sync_timeout()
)

state = %State{
metadata: metadata,
brokers: brokers,
correlation_id: correlation_id,
metadata_update_interval: metadata_update_interval,
ssl_options: ssl_options,
use_ssl: use_ssl,
worker_name: name
}

state = update_metadata(state)
{:ok, _} = :timer.send_interval(
state.metadata_update_interval,
:update_metadata
)

state
end

defp connect_broker(host, port, ssl_opts, use_ssl) do
%Broker{
host: host,
port: port,
socket: NetworkClient.create_socket(host, port, ssl_opts, use_ssl)
}
end

defp client_request(request, state) do
%{
request |
client_id: @client_id,
correlation_id: state.correlation_id
}
end

# gets the broker for a given partition, updating metadata if necessary
# returns {broker, maybe_updated_state}
defp broker_for_partition_with_update(state, topic, partition) do
case State.broker_for_partition(state, topic, partition) do
nil ->
updated_state = update_metadata(state)
{
State.broker_for_partition(updated_state, topic, partition),
updated_state
}
broker ->
{broker, state}
end
end

# assumes module.create_request(request) and module.parse_response
# both work
defp network_request(request, module, state) do
{broker, updated_state} = broker_for_partition_with_update(
state,
request.topic,
request.partition
)

case broker do
nil ->
Logger.error(fn ->
"Leader for topic #{request.topic} is not available"
end)
{{:error, :topic_not_found}, updated_state}
_ ->
wire_request = request
|> client_request(updated_state)
|> module.create_request

response = broker
|> NetworkClient.send_sync_request(
wire_request,
config_sync_timeout()
)
|> module.parse_response

state_out = State.increment_correlation_id(updated_state)

{response, state_out}
end
end

defp remove_stale_brokers(brokers, metadata_brokers) do
{brokers_to_keep, brokers_to_remove} = Enum.partition(brokers, fn(broker) ->
Enum.find_value(metadata_brokers, &(broker.node_id == -1 || (broker.node_id == &1.node_id) && broker.socket && Socket.info(broker.socket)))
Expand Down
48 changes: 13 additions & 35 deletions lib/kafka_ex/server_0_p_8_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@ defmodule KafkaEx.Server0P8P0 do

use KafkaEx.Server
alias KafkaEx.Protocol.Fetch
alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest
alias KafkaEx.Protocol.Metadata.Broker
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Server.State
alias KafkaEx.NetworkClient

def kafka_server_init([args]) do
kafka_server_init([args, self()])
end

def kafka_server_init([args, name]) do
uris = Keyword.get(args, :uris, [])
metadata_update_interval = Keyword.get(args, :metadata_update_interval, @metadata_update_interval)
brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port)} end)
{correlation_id, metadata} = retrieve_metadata(brokers, 0, config_sync_timeout())
state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, metadata_update_interval: metadata_update_interval, worker_name: name}
# Get the initial "real" broker list and start a regular refresh cycle.
state = update_metadata(state)
{:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata)
# warn if ssl is configured
if Keyword.get(args, :use_ssl) do
Logger.warn(fn ->
"KafkaEx is being configured to use ssl with a broker version that " <>
"does not support ssl"
end)
end

state = kafka_common_init(args, name)

{:ok, state}
end
Expand Down Expand Up @@ -67,28 +63,10 @@ defmodule KafkaEx.Server0P8P0 do
def kafka_server_heartbeat(_, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka"
def kafka_server_update_consumer_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka"

defp fetch(fetch_request, state) do
fetch_data = Fetch.create_request(%FetchRequest{
fetch_request |
client_id: @client_id,
correlation_id: state.correlation_id,
})
{broker, state} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, fetch_request.topic, fetch_request.partition) do
nil ->
updated_state = update_metadata(state)
{MetadataResponse.broker_for_topic(state.metadata, state.brokers, fetch_request.topic, fetch_request.partition), updated_state}
broker -> {broker, state}
end

case broker do
nil ->
Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available")
{:topic_not_found, state}
_ ->
response = broker
|> NetworkClient.send_sync_request(fetch_data, config_sync_timeout())
|> Fetch.parse_response
{response, %{state | correlation_id: state.correlation_id + 1}}
defp fetch(request, state) do
case network_request(request, Fetch, state) do
{{:error, error}, state_out} -> {error, state_out}
{response, state_out} -> {response, state_out}
end
end
end
2 changes: 1 addition & 1 deletion scripts/ci_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ then
echo "First tests passed, skipping repeat"
else
echo "Repeating tests"
mix "$TEST_COMMAND" --include integration --include consumer_group --include server_0_p_9_p_0
mix "$TEST_COMMAND" --include integration --include consumer_group --include server_0_p_9_p_0 --include server_0_p_8_p_0
fi
2 changes: 1 addition & 1 deletion scripts/docker_up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ done
docker-compose up -d

# create topics needed for testing
docker-compose exec kafka3 /bin/bash -c "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 KAFKA_PORT=9094 KAFKA_CREATE_TOPICS=consumer_group_implementation_test:4:2 create-topics.sh"
docker-compose exec kafka3 /bin/bash -c "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 KAFKA_PORT=9094 KAFKA_CREATE_TOPICS=consumer_group_implementation_test:4:2,test0p8p0:4:2 create-topics.sh"
56 changes: 56 additions & 0 deletions test/integration/server0_p_8_p_0_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule KafkaEx.Server0P8P0.Test do
use ExUnit.Case
import TestHelper

@moduletag :server_0_p_8_p_0

alias KafkaEx.Server0P8P0, as: Server

@topic "test0p8p0"

setup do
{:ok, args} = KafkaEx.build_worker_options([])
{:ok, worker} = Server.start_link(args, :no_name)

# we don't want to crash if the worker crashes
Process.unlink(worker)

on_exit fn ->
if Process.alive?(worker) do
Process.exit(worker, :normal)
end
end

{:ok, [worker: worker]}
end

test "can produce and fetch a message", %{worker: worker}do
now = :erlang.monotonic_time
msg = "test message #{now}"
partition = 0
:ok = KafkaEx.produce(@topic, partition, msg, worker_name: worker)

wait_for(fn ->
[got] = KafkaEx.fetch(
@topic,
partition,
worker_name: worker,
offset: 1,
auto_commit: false
)
[got_partition] = got.partitions
Enum.any?(got_partition.message_set, fn(m) -> m.value == msg end)
end)
end

test "when the partition is not found", %{worker: worker} do
partition = 42
assert :topic_not_found == KafkaEx.fetch(
@topic,
partition,
worker_name: worker,
offset: 1,
auto_commit: false
)
end
end