Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka message from producer to consumer, the entire process takes an average of 400-600ms #2591

Closed
3 of 7 tasks
lvfuchao opened this issue Oct 28, 2019 · 2 comments · Fixed by #2612
Closed
3 of 7 tasks

Comments

@lvfuchao
Copy link

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

1. Producer、 consumer and kafka cluster in the same machine room。The whole process of kafka producing message to consumer is as follows:

  1. Producer produces message to buffer time: 1572228409758 ms
  2. From the Event event callback to get the entire network delay that the producer message was successfully sent to the broker: 2ms
    LOG-7-RECV: [thrd:IP:9292/bootstrap]: IP:9292/1: Received ProduceResponse (v7, 53 bytes, CorrId 2, rtt 2.48ms)
  3. Kafka consumer received the message time: 1572228410256 ms
    so the whole process takes 498ms

2. Configuration

  1. kafka-cluster:
broker.id=1
port=9292
host.name=xxx
listeners=PLAINTEXT://xxx:9292
advertised.listeners=PLAINTEXT://xxx:9292

auto.create.topics.enable = false
auto.leader.rebalance.enable=true

# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000

# Log configuration
num.partitions=8
num.recovery.threads.per.data.dir=1
message.max.bytes=1000000
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=24
log.retention.bytes=5000000000
log.retention.check.interval.ms=300000
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=6
log.segment.bytes=2000000000
log.dirs=/data/data1/svckafka/logdata,/data/data2/svckafka/logdata,/data/data3/svckafka/logdata,/data/data4/svckafka/logdata,/data/data5/svckafka/logdata,/data/data6/svckafka/logdata


# ZK configuration
zookeeper.connect=xxx:2181,xxx:2181,xxx:2181,xxx:2181,xxx:2181
zookeeper.connection.timeout.ms=24000
zookeeper.sync.time.ms=2000

# Socket server configuration
num.io.threads=12
num.network.threads=20

socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
  1. producer:
statistics.interval.ms :5000
queue.buffering.max.ms :500
debug : msg,protocol
request.required.acks : 1
  1. consuemer:
statistics.interval.ms : 5000
debug : consumer,security,broker
enable.partition.eof : true
max.partition.fetch.bytes : 10240000
auto.offset.reset : latest

3. kafka-consumer code as follow:

class KafkaConsumerImp {
public:
    int initConsumer();
    bool start();
    void stop();
    bool start_consumer();
    //void registerMsgHandler(const MsgCallback &&handler);

private:
    void consumer_cb(RdKafka::Message *msg, void *opaque);
    bool is_initialized_;
    bool started_;
    std::atomic_int consume_msg_cnt_{1};

    std::shared_ptr<RdKafka::KafkaConsumer> kafka_consumer_;
    std::shared_ptr<RdKafka::Conf> kafka_conf_;
    std::shared_ptr<RdKafka::Conf> kafka_topic_conf_;
    std::unique_ptr<std::thread> thread_;
}

int KafkaConsumerImp::initConsumer() {

    std::string errstr;
    //Create configuration objects
    kafka_conf_ = std::shared_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
    kafka_topic_conf_ = std::shared_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));

    if ( kafka_conf_->set("event_cb", KafkaEventCb::Instance(), errstr)
            != RdKafka::Conf::CONF_OK ) {
        YY_LOG(LOG_ERR, "RdKafka conf set \"event_cb\" failed, errmsg is: %s !!!", errstr.c_str());
        exit(1);
    }
    if ( kafka_conf_->set("rebalance_cb", KafkaRebalanceCb::Instance(), errstr)
            != RdKafka::Conf::CONF_OK ) {
        YY_LOG(LOG_ERR, "RdKafka conf set \"rebalance_cb\" failed, errmsg is: %s !!!", errstr.c_str());
        exit(1);
    }

    // statistics.interval.ms -> default:0 value of 0 disables statistics)
    std::string strStatic_ms = "5000";
    if ( kafka_conf_->set("statistics.interval.ms", strStatic_ms, errstr)
         != RdKafka::Conf::CONF_OK )
    {
        std::cout << "% RdKafka conf set \"statistics.interval.ms\" failed: " << errstr << std::endl;
        YY_LOG(LOG_ERR, "RdKafka conf set \"statistics.interval.ms\" failed, errmsg is: %s !!!", errstr.c_str());
        exit(1);
    }

    if ( kafka_conf_->set("debug", "consumer,security,broker", errstr)
         != RdKafka::Conf::CONF_OK )
    {
        std::cout << "% RdKafka conf set \"debug\" failed: " << errstr << std::endl;
        YY_LOG(LOG_ERR, "RdKafka conf set \"debug\" failed, errmsg is: %s !!!", errstr.c_str());
        exit(1);
    }

    /*enable.partition.eof -> range:[true/false] default:true
    if ( kafka_conf_->set("enable.partition.eof", "true", errstr)
            != RdKafka::Conf::CONF_OK ) {
        YY_LOG(LOG_ERR, "RdKafka conf set \"enable.partition.eof\" failed, errmsg is: %s !!!", errstr.c_str());
        exit(1);
    }

    /*设置broker list*/
    if ( kafka_conf_->set("metadata.broker.list", ConfigInfo::Instance()->getConfigBrokers(), errstr)
            != RdKafka::Conf::CONF_OK ) {

        YY_LOG(LOG_ERR, "RdKafka conf set \"metadata.broker.list\":[%s] failed, errmsg is: %s !!!",
                          ConfigInfo::Instance()->getConfigBrokers().c_str(),
                          errstr.c_str());
        exit(1);
    }

    /*设置consumer group*/
    if ( kafka_conf_->set("group.id", ConfigInfo::Instance()->getConfigGroupId(), errstr)
            != RdKafka::Conf::CONF_OK ) {

        YY_LOG(LOG_ERR, "RdKafka conf set \"group.id\":[%s] failed, errmsg is: %s !!!",
                          ConfigInfo::Instance()->getConfigGroupId().c_str(), errstr.c_str());
        exit(1);
    }

    /*每次从单个分区中拉取消息的最大尺寸*/
    std::string strfetch_num = "10240000";
    if( kafka_conf_->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK ) {
        YY_LOG(LOG_ERR, "RdKafka conf set \"max.partition.fetch.bytes\":[%s] failed, errmsg is:%s !!!",
                         strfetch_num.c_str(),
                         errstr.c_str());
        exit(1);
    }

    /*创建kafka topic的配置*/
    if( kafka_topic_conf_->set("auto.offset.reset", "latest", errstr) != RdKafka::Conf::CONF_OK ) {
        YY_LOG(LOG_ERR, "RdKafka conf set \"auto.offset.reset\" failed, errmsg is: %s !!!", errstr.c_str());
        exit(1);
    }


    if( kafka_conf_->set("default_topic_conf", kafka_topic_conf_.get(), errstr) != RdKafka::Conf::CONF_OK ) {
        YY_LOG(LOG_ERR, "RdKafka conf set \"default_topic_conf\" failed, errmsg is: %s !!!", errstr.c_str());
        exit(1);
    }

    /*创建kafka consumer实例*/
    kafka_consumer_ = std::shared_ptr<RdKafka::KafkaConsumer>(RdKafka::KafkaConsumer::create(kafka_conf_.get(), errstr));
    if(!kafka_consumer_) {
        YY_LOG(LOG_ERR, "Rdkafka failed to create kafka consumer, errmsg is: %s !!!", errstr.c_str());
        exit(1);
    }

    YY_LOG(LOG_INFO, "Rdkafka created consumer success, Consumer name is:%s, Config is:%s !!!",
                      kafka_consumer_->name().c_str(),
                      ConfigInfo::Instance()->dumpConfigInfo(false).c_str());

    /*Subscribe to topics*/
    RdKafka::ErrorCode err = kafka_consumer_->subscribe(ConfigInfo::Instance()->getConfigTopics());

    if (err) {
        YY_LOG(LOG_ERR, "Rdkafka consumer failed to subscribe to topics:%s, errmsg is:%s !!!", \
                          ConfigInfo::Instance()->dumpConfigTopic().c_str(),
                          RdKafka::err2str(err).c_str());
        exit(1);
    }

    // initialize success
    this->is_initialized_ = true;

    return 0;
}

bool KafkaConsumerImp::start_consumer() {

    // begin consumer thread
    thread_ = std::make_unique<std::thread>([this] {
        this->start();
        YY_LOG(LOG_INFO, "Rdkafka Consumer consume msg thread beginning !!!");
    });
}

bool KafkaConsumerImp::start() {
    /*Consume messages*/
    if(started_) {
        return -1;
    }
    if(!kafka_consumer_) {
        return -2;
    }
    this->started_ = true;
    try {
        while (EDU_SP_KAFKA::run) {

            if (kafka_consumer_ == nullptr) {
                YY_LOG(LOG_ERR, "kafka_consumer_ is nullptr !!!")
                return -3;
            }

            RdKafka::Message *msg = kafka_consumer_->consume(2000);
           
           this-> consumer_cb(msg, nullptr);
            delete msg;
        }
    } catch (std::exception &e) {
        YY_LOG(LOG_ERR, "Rdkafka Execption: consumer_ start failure: %s !!!", e.what());
        return -4;
    }
    return true;
}

void KafkaConsumerImp::consumer_cb(RdKafka::Message *message, void *opaque) {

    TimeEscaped time;
    switch (message->err()) {

        case RdKafka::ERR__TIMED_OUT: {
            YY_LOG(LOG_INFO, "Rdkafka Consume Msg Timeout, This is Not really an error please ignore, Error code:%d, Error msg:%s !!!",
                             message->err(),
                             message->errstr().c_str());
            break;
        }

        case RdKafka::ERR_NO_ERROR: {
            /* Real message */
            msg_cnt++;
            consume_msg_cnt_++;
            msg_bytes += message->len();
 
            auto CurrentTimeMs = time.GetCurrentTimeMs();
            auto CurrentTimeUs = time.GetCurrentTimeUs();

            YY_LOG(LOG_INFO, "Rdkafka Consume msg success -> CurrentTime is: [%s(ms) : %s(us)],    Msgbody is: %s !!!",
                              std::to_string(CurrentTimeMs).c_str(),
                              std::to_string(CurrentTimeUs).c_str(),
                              static_cast<const char *>(message->payload()));

            // app msg callback
            try {
               // app callback
            ...
           } catch{...}
}

How to reproduce

<your steps how to reproduce goes here, or remove section if not relevant>

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): <v1.1.0>
  • Apache Kafka version: <2.12-2.3.0>
  • librdkafka client configuration: <REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
  • Operating system: <Linux ubuntu x86_64>
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

The latency is expected since you set queue.buffering.max.ms (aka linger.ms) to 500.
https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#performance

@lvfuchao
Copy link
Author

ok i will try to set linger.ms = 50 or 100

and there is an another question in confluent-kafka-go consumer , i can not consumer message from broker log as:

Created Consumer &{x:9292,x:9292,x:9292 k12_kafkaGo_group [k12_wb_test k12_wb_switch_test] false 0xc000060180 0xc00008e090 0x4b4610} success !!!
kafka consumer create success, "#&{x:9292,x:9292,x:9292 k12_kafkaGo_group [k12_wb_test k12_wb_switch_test] true 0xc000060180 0xc00008e090 0x4b4610}" !!!


%7|1571914578.673|BROKER|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
%7|1571914578.673|BROKER|rdkafka#consumer-1| [thrd:app]: x:9292/bootstrap: Added new broker with NodeId -1
%7|1571914578.673|BROKER|rdkafka#consumer-1| [thrd:app]: x:9292/bootstrap: Added new broker with NodeId -1
%7|1571914578.673|BROKER|rdkafka#consumer-1| [thrd:app]: x:9292/bootstrap: Added new broker with NodeId -1
%7|1571914578.673|BRKMAIN|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Enter main broker thread
%7|1571914578.673|BRKMAIN|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Enter main broker thread
%7|1571914578.673|BRKMAIN|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Enter main broker thread
%7|1571914578.673|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread

%7|1571914578.673|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.2.1-29-g7aa9b3 (0x10201ff) rdkafka#consumer-1 initialized (builtin.features snappy,sasl,regex,lz4,sasl_plain,plugins, GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS HDRHISTOGRAM SNAPPY SOCKEM CRC32C_HW, debug 0x2202)
%7|1571914578.673|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "k12_kafkaGo_group": subscribe to new subscription of 2 topics (join state init)
%7|1571914578.673|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "k12_kafkaGo_group" is rebalancing in state init (join-state init) without assignment: unsubscribe
%7|1571914578.673|CONNECT|rdkafka#consumer-1| [thrd:main]: x:9292/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
%7|1571914578.673|CONNECT|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Received CONNECT op
%7|1571914578.673|STATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1571914578.673|CONNECT|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: broker in state TRY_CONNECT connecting
%7|1571914578.673|STATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1571914578.673|CONNECT|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Connecting to ipv4#x:9292 (plaintext) with socket 13
%7|1571914578.673|BRKMAIN|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
%7|1571914578.679|CONNECT|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Connected to ipv4#x:9292
%7|1571914578.679|CONNECTED|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Connected (#1)
%7|1571914578.679|FEATURE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1571914578.679|STATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1571914578.684|FEATURE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1571914578.684|STATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1571914578.689|CLUSTERID|rdkafka#consumer-1| [thrd:main]: x:9292/bootstrap: ClusterId update "" -> "Vn7qR45rR8q5Gu_KskPZKA"
%7|1571914578.689|CONTROLLERID|rdkafka#consumer-1| [thrd:main]: x:9292/bootstrap: ControllerId update -1 -> 3
%7|1571914578.689|UPDATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: NodeId changed from -1 to 2
%7|1571914578.689|UPDATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/2: Name changed from x:9292/bootstrap to x:9292/2
%7|1571914578.689|LEADER|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/2: Mapped 0 partition(s) to broker
%7|1571914578.689|UPDATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: NodeId changed from -1 to 1
%7|1571914578.689|UPDATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/1: Name changed from x:9292/bootstrap to x:9292/1
%7|1571914578.689|LEADER|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/1: Mapped 0 partition(s) to broker
%7|1571914578.689|UPDATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/bootstrap: NodeId changed from -1 to 3
%7|1571914578.689|UPDATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/3: Name changed from x:9292/bootstrap to x:9292/3
%7|1571914578.689|LEADER|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/3: Mapped 0 partition(s) to broker
%7|1571914578.689|STATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/3: Broker changed state UP -> UPDATE
%7|1571914578.689|STATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/3: Broker changed state UPDATE -> UP
%7|1571914579.678|UPDATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/1: Nodename changed from  to x:9292
%7|1571914579.679|BROKERFAIL|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/1: failed: err: Local: Broker node update: (errno: Success)
%7|1571914579.679|STATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/1: Broker changed state INIT -> DOWN
%7|1571914579.679|STATE|rdkafka#consumer-1| [thrd:x:9292/bootstrap]: x:9292/1: Broker changed state DOWN -> INIT

% Error: Local: Broker node update: x:9292/1: Broker hostname updated (after 1005ms in state INIT) !!!

%7|1571914579.679|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received CONNECT op
%7|1571914579.679|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1571914579.679|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker in state TRY_CONNECT connecting
%7|1571914579.679|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker has no address yet: postponing connect
%7|1571914580.680|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker in state TRY_CONNECT connecting
%7|1571914580.680|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker has no address yet: postponing connect
%7|1571914581.680|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker in state TRY_CONNECT connecting
%7|1571914581.680|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker has no address yet: postponing connect
%7|1571914582.681|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker in state TRY_CONNECT connecting
%7|1571914582.681|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker has no address yet: postponing connect
%7|1571914583.682|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker in state TRY_CONNECT connecting
%7|1571914583.682|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker has no address yet: postponing connect

code as follow: i do not kown the reason

type MsgHandler func([]byte)

type KafkaConsumer struct {
	brokers        string
	group          string
	topics         []string
	running        bool
	sigchan        chan os.Signal
	consumerHandle *kafka.Consumer
	msgHandler     MsgHandler
}

func NewKafkaConsumer(brokers string, group string, topics []string) *KafkaConsumer {

	return &KafkaConsumer{
		brokers: brokers,
		group:   group,
		topics:  topics,
		sigchan: make(chan os.Signal, 1)}
}

func (c *KafkaConsumer) InitConsumer(handle MsgHandler) error {

	signal.Notify(c.sigchan, syscall.SIGINT, syscall.SIGTERM)

	if handle == nil {
		fmt.Printf("Failed to Add MsgHandler !!!\n")
		return nil
	}

	c.msgHandler = handle

	var consumerHandle, err = kafka.NewConsumer(&kafka.ConfigMap {
		"bootstrap.servers":         c.brokers,
		"broker.address.family":     "v4",
		"group.id":                  c.group,
		//"statistics.interval.ms":    5000,
		"debug":                     "consumer,security,broker",
		//"enable.partition.eof":      "true",
		"session.timeout.ms":        6000,
		//"max.partition.fetch.bytes": "10240000",
		"auto.offset.reset":         "earliest"})

	if err != nil {
		fmt.Printf("Failed to create consumer: %s !!!\n", err)
		return err
	}

	c.consumerHandle = consumerHandle
	fmt.Printf("Created Consumer %v success !!!\n", c)

	err = c.consumerHandle.SubscribeTopics(c.topics, nil)
	c.running = true
      	go func() {

		defer c.stopConsumer()

		for c.running == true {

			select {

			case sig := <-c.sigchan:
				fmt.Printf("Caught signal %v: terminating !!!\n", sig)
				c.running = false

			default:
				ev := c.consumerHandle.Poll(100)
				if ev == nil {
					continue
				}

				switch e := ev.(type) {

				case *kafka.Message:
					fmt.Printf("%% Message on %s:, %s !!!\n", e.TopicPartition, string(e.Value))
					c.msgHandler(e.Value)
					if e.Headers != nil {
						fmt.Printf("%% Headers: %v !!!\n", e.Headers)
					}

				case kafka.Error:
					/** Errors should generally be considered
					informational, the client will try to
					automatically recover.
					But in this example we choose to terminate
					the application if all brokers are down. */
					fmt.Printf("%% Error: %v: %v !!!\n", e.Code(), e)
					if e.Code() == kafka.ErrAllBrokersDown {
						c.running = false
					}

				default:
					fmt.Printf("Ignored %v !!!\n", e)
				}
			}
		}
	}()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants