Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error connecting on Kafka version 0.8.1.1 #267

Closed
wilbertom opened this issue Dec 16, 2017 · 4 comments
Closed

Error connecting on Kafka version 0.8.1.1 #267

wilbertom opened this issue Dec 16, 2017 · 4 comments

Comments

@wilbertom
Copy link

Hello,

I had a short conversation about this with @dantswain about this today on slack but I wanted to open an official ticket.

When using the latest kafka_ex and kafka 0.8.1.1 versions, the following issue occurs while trying to start a consumer.

05:27:17.455 [error] Receiving data from broker "172.18.0.2":9092 failed with :closed

05:27:17.455 [error] Receiving data from broker "kafka_zookeeper":9092 failed with :closed
** (EXIT from #PID<0.228.0>) evaluator process exited with reason: an exception was raised:
    ** (MatchError) no match of right hand side value: {:error, {:function_clause, [{KafkaEx.Protocol.ConsumerMetadata, :parse_response, [nil], [file: 'lib/kafka_ex/protocol/consumer_metadata.ex', line: 38]}, {KafkaEx.Server0P9P0, :update_consumer_metadata, 3, [file: 'lib/kafka_ex/server_0_p_9_p_0.ex', line: 177]}, {KafkaEx.Server0P9P0, :kafka_server_init, 1, [file: 'lib/kafka_ex/server_0_p_9_p_0.ex', line: 71]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 365]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 333]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}
        (kafka_ex) lib/kafka_ex/gen_consumer.ex:395: KafkaEx.GenConsumer.init/1
        (stdlib) gen_server.erl:365: :gen_server.init_it/2
        (stdlib) gen_server.erl:333: :gen_server.init_it/6
        (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

@dantswain I'm almost certain consumer groups are supported in this version of Kafka because we use consumer groups with this version in a different consumer we run in Java, unless I'm using the wrong version while developing which would be great.

I understand if you all can't get around to this since this is an older Kafka version and probably not a priority.

If so maybe I can try to implement a patch but it might be outside my skill level since I'm just starting with Elixir.

Also given #260 I can see if you guys wouldn't even want a patch.

@wilbertom
Copy link
Author

Some other things noticed, here the library is trying to use KafkaEx.Server0P9P0 when on version 0.8.1. Trying to force KafkaEx.Server0P8P0 via kafka_version: "0.8.0" in the config.exs results in a different error:

06:05:37.701 [debug] Succesfully connected to broker "kafka_zookeeper":9092

06:05:37.706 [debug] Establishing connection to broker 1: "172.18.0.2" on port 9092

06:05:37.706 [debug] Succesfully connected to broker "172.18.0.2":9092
{:ok, #PID<0.189.0>}
iex(2)>
06:05:37.710 [debug] Shutting down worker #PID<0.190.0>
iex(2)>
06:05:37.711 [debug] Succesfully connected to broker "kafka_zookeeper":9092
iex(2)>
06:05:37.714 [debug] Establishing connection to broker 1: "172.18.0.2" on port 9092
iex(2)>
06:05:37.714 [debug] Succesfully connected to broker "172.18.0.2":9092
iex(2)> ** (EXIT from #PID<0.187.0>) evaluator process exited with reason: exited in: GenServer.call(#PID<0.190.0>, {:offset_fetch, %KafkaEx.Protocol.OffsetFetch.Request{consumer_group: "kafkareader", partition: 0, topic: "stream"}}, 5000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) Offset Fetch is not supported in 0.8.0 version of kafka
            (kafka_ex) lib/kafka_ex/server_0_p_8_p_0.ex:56: KafkaEx.Server0P8P0.kafka_server_offset_fetch/2
            (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
iex(2)>
iex(2)> Interactive Elixir (1.5.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(2)> iex(1)>
06:05:37.721 [error] GenServer #PID<0.190.0> terminating
** (RuntimeError) Offset Fetch is not supported in 0.8.0 version of kafka
    (kafka_ex) lib/kafka_ex/server_0_p_8_p_0.ex:56: KafkaEx.Server0P8P0.kafka_server_offset_fetch/2
    (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.189.0>): {:offset_fetch, %KafkaEx.Protocol.OffsetFetch.Request{consumer_group: "kafkareader", partition: 0, topic: "stream"}}
State: %KafkaEx.Server.State{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "172.18.0.2", node_id: 1, port: 9092, socket: %KafkaEx.Socket{socket: #Port<0.6709>, ssl: false}}, %KafkaEx.Protocol.Metadata.Broker{host: "kafka_zookeeper", node_id: -1, port: 9092, socket: %KafkaEx.Socket{socket: #Port<0.6666>, ssl: false}}], consumer_group: nil, consumer_group_update_interval: nil, consumer_metadata: %KafkaEx.Protocol.ConsumerMetadata.Response{coordinator_host: "", coordinator_id: 0, coordinator_port: 0, error_code: 0}, correlation_id: 3, event_pid: nil, metadata: %KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "172.18.0.2", node_id: 1, port: 9092, socket: nil}], topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 0, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 3, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 1, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 4, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 2, replicas: [1]}], topic: "stream"}]}, metadata_update_interval: 30000, ssl_options: [], use_ssl: false, worker_name: #PID<0.190.0>}
Client #PID<0.189.0> is alive
    (stdlib) gen.erl:169: :gen.do_call/4
    (elixir) lib/gen_server.ex:771: GenServer.call/3
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:600: KafkaEx.GenConsumer.load_offsets/1
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:445: KafkaEx.GenConsumer.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

iex(1)>
06:05:37.723 [error] GenServer #PID<0.189.0> terminating
** (stop) exited in: GenServer.call(#PID<0.190.0>, {:offset_fetch, %KafkaEx.Protocol.OffsetFetch.Request{consumer_group: "kafkareader", partition: 0, topic: "stream"}}, 5000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) Offset Fetch is not supported in 0.8.0 version of kafka
            (kafka_ex) lib/kafka_ex/server_0_p_8_p_0.ex:56: KafkaEx.Server0P8P0.kafka_server_offset_fetch/2
            (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:774: GenServer.call/3
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:600: KafkaEx.GenConsumer.load_offsets/1
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:445: KafkaEx.GenConsumer.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :timeout
State: %KafkaEx.GenConsumer.State{acked_offset: nil, commit_interval: 5000, commit_threshold: 100, committed_offset: nil, consumer_module: Kafkareader, consumer_state: nil, current_offset: nil, group: "kafkareader", last_commit: nil, partition: 0, topic: "stream", worker_name: #PID<0.190.0>}

@dantswain
Copy link
Collaborator

Hi @wilbertom ! You could try using the 0.8.2 implementation - KafkaEx doesn't do any actual runtime checking against the broker's reported version. 0.8.1 may or may not support "consumer group" offset management, but the fully automatically rebalancing consumer groups that we implented with KafkaEx.ConsumerGroup and KafkaEx.GenConsumer are not supported until Kafka 0.9.0.

Are you stuck with 0.8.1? Kafka has changed quite a lot and as a Kafka user I'd heartily recommend at LEAST upgrading to 0.9.0 if not a newer version. 0.9.0 supports a lot of features that 0.8.x did not and there have been lots of bug fixes.

As for #260 - that is a call for discussion. So if you have a strong need for 0.8.x support, please report it there. I can't guarantee that we will continue to support it, but we almost surely won't if no one in the community tells us that it's needed (each version we support adds some amount of maintenance burden).

@bjhaid
Copy link
Member

bjhaid commented Dec 17, 2017

I am positive 0.8.1 does not support consumer group offset management, offsets were still stored in zookeeper in 0.8.1

@wilbertom
Copy link
Author

@dantswain I'll leave my comments about versions in #260.

@bjhaid yes that is the behaviour I was thinking of.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants