Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crash on rd_kafka_topic_metadata_update #155

Closed
laxpio opened this issue Oct 9, 2014 · 27 comments
Closed

Crash on rd_kafka_topic_metadata_update #155

laxpio opened this issue Oct 9, 2014 · 27 comments

Comments

@laxpio
Copy link

laxpio commented Oct 9, 2014

User c api,when start server,get the core info.

(gdb) bt
#0  0x00007f1000000000 in ?? ()
#1  0x00000000004918ce in rd_kafka_msg_partitioner (rkt=0x7f10cc004e10, rkm=0x7f10cc0663f0, do_lock=0) at rdkafka_msg.c:358
#2  0x0000000000493702 in rd_kafka_topic_assign_uas (rkt=0x7f10cc004e10) at rdkafka_topic.c:900
#3  0x0000000000494299 in rd_kafka_topic_metadata_update (rkb=0x7f10cc0013b0, mdt=<value optimized out>) at rdkafka_topic.c:1027
#4  0x0000000000491087 in rd_kafka_metadata_handle (rkb=0x7f10cc0013b0, err=0, reply=0x7f10b4000a40, request=0x7f10b4004140, opaque=0x7f10cc003260)
    at rdkafka_broker.c:966
#5  rd_kafka_broker_metadata_reply (rkb=0x7f10cc0013b0, err=0, reply=0x7f10b4000a40, request=0x7f10b4004140, opaque=0x7f10cc003260)
    at rdkafka_broker.c:1017
#6  0x000000000048cb27 in rd_kafka_req_response (rkb=0x7f10cc0013b0) at rdkafka_broker.c:1303
#7  rd_kafka_recv (rkb=0x7f10cc0013b0) at rdkafka_broker.c:1495
#8  0x000000000048d4e0 in rd_kafka_broker_io_serve (rkb=0x7f10cc0013b0) at rdkafka_broker.c:2428
#9  0x000000000048f5db in rd_kafka_broker_ua_idle (arg=0x7f10cc0013b0) at rdkafka_broker.c:2451
#10 rd_kafka_broker_thread_main (arg=0x7f10cc0013b0) at rdkafka_broker.c:4040
#11 0x00007f10fb7387f1 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f10fa8daccd in clone () from /lib64/libc.so.6
(gdb) frame 3
#3  0x0000000000494299 in rd_kafka_topic_metadata_update (rkb=0x7f10cc0013b0, mdt=<value optimized out>) at rdkafka_topic.c:1027
1027    rdkafka_topic.c: No such file or directory.
        in rdkafka_topic.c
(gdb) info args
rkb = 0x7f10cc0013b0
mdt = <value optimized out>
(gdb) 

kafka version is 0.8.0
librdkafka is branch master

and the issues was find before,as the issues #132
#132

@laxpio
Copy link
Author

laxpio commented Oct 9, 2014

 ./rdkafka_example -b  
Metadata for all topics (from broker -1: 10.135.4.232:9092/bootstrap):
 12 brokers:
  broker 0 at 10.135.3.34:9092
  broker 5 at 10.153.133.204:9092
  broker 1 at 10.135.35.227:9092
  broker 6 at 10.153.133.205:9092
  broker 33 at 10.224.138.33:9092
  broker 2 at 10.135.4.232:9092
  broker 34 at 10.224.138.34:9092
  broker 7 at 10.153.133.206:9092
  broker 3 at 10.135.4.83:9092
  broker 35 at 10.224.138.35:9092
  broker 36 at 10.224.138.36:9092
  broker 4 at 10.153.133.203:9092
 6 topics:
  topic "kafka_client" with 8 partitions:
    partition 0, leader 3, replicas: 0,3, isrs: 3
    partition 1, leader 1, replicas: 1,33, isrs: 1
    partition 2, leader 2, replicas: 2,34, isrs: 2
    partition 3, leader 3, replicas: 3,35, isrs: 3
    partition 4, leader 36, replicas: 33,36, isrs: 36
    partition 5, leader 0, replicas: 34,0, isrs: 0
    partition 6, leader 1, replicas: 35,1, isrs: 1
    partition 7, leader 2, replicas: 36,2, isrs: 2
  topic "qqpim_test" with 24 partitions:
    partition 0, leader 2, replicas: 2,34, isrs: 2
    partition 1, leader 3, replicas: 3,35, isrs: 3
    partition 2, leader 36, replicas: 33,36, isrs: 36
    partition 3, leader 0, replicas: 34,0, isrs: 0
    partition 4, leader 2, replicas: 1,6, isrs: 2
    partition 5, leader 2, replicas: 36,2, isrs: 2
    partition 6, leader 3, replicas: 0,3, isrs: 3
    partition 7, leader 1, replicas: 1,33, isrs: 1
    partition 8, leader 2, replicas: 2,35, isrs: 2
    partition 9, leader 3, replicas: 3,36, isrs: 3
    partition 10, leader 0, replicas: 33,0, isrs: 0
    partition 11, leader 1, replicas: 34,1, isrs: 1
    partition 12, leader 3, replicas: 1,7, isrs: 3
    partition 13, leader 3, replicas: 36,3, isrs: 3
    partition 14, leader 0, replicas: 0,33, isrs: 0
    partition 15, leader 1, replicas: 1,34, isrs: 1
    partition 16, leader 2, replicas: 2,36, isrs: 2
    partition 17, leader 3, replicas: 3,0, isrs: 3
    partition 18, leader 1, replicas: 33,1, isrs: 1
    partition 19, leader 2, replicas: 34,2, isrs: 2
    partition 20, leader 1, replicas: 1,0, isrs: 1
    partition 21, leader 36, replicas: 36,33, isrs: 36
    partition 22, leader 0, replicas: 0,34, isrs: 0
    partition 23, leader 1, replicas: 1,35, isrs: 1
  topic "RealStatis_test" with 16 partitions:
    partition 0, leader 33, replicas: 33,34,35, isrs: 35,33
    partition 1, leader 1, replicas: 0,5,6, isrs: 1
    partition 2, leader 0, replicas: 35,36,0, isrs: 0
    partition 3, leader 2, replicas: 2,7,0, isrs: 2
    partition 4, leader 3, replicas: 3,0,1, isrs: 3
    partition 5, leader 1, replicas: 1,2,3, isrs: 2,1,3
    partition 6, leader 2, replicas: 5,2,3, isrs: 2
    partition 7, leader 3, replicas: 6,3,4, isrs: 3
    partition 8, leader 35, replicas: 33,35,36, isrs: 35,36
    partition 9, leader 3, replicas: 0,2,3, isrs: 3
    partition 10, leader 1, replicas: 35,0,1, isrs: 1
    partition 11, leader 1, replicas: 36,1,2, isrs: 1,2
    partition 12, leader 2, replicas: 0,2,3, isrs: 2,3
    partition 13, leader 1, replicas: 1,3,33, isrs: 1,3
    partition 14, leader 2, replicas: 2,33,34, isrs: 2,33
    partition 15, leader 1, replicas: 6,0,1, isrs: 1
  topic "kafka_client_1" with 8 partitions:
    partition 0, leader 36, replicas: 36,34, isrs: 36
    partition 1, leader 0, replicas: 0,35, isrs: 0
    partition 2, leader 1, replicas: 1,36, isrs: 1
    partition 3, leader 2, replicas: 2,0, isrs: 2
    partition 4, leader 3, replicas: 3,1, isrs: 3,1
    partition 5, leader 2, replicas: 33,2, isrs: 2
    partition 6, leader 3, replicas: 34,3, isrs: 3
    partition 7, leader 35, replicas: 35,33, isrs: 35
  topic "RealStatis_2" with 16 partitions:
    partition 0, leader 3, replicas: 33,3, isrs: 3
    partition 1, leader 33, replicas: 34,33, isrs: 33
    partition 2, leader 35, replicas: 35,34, isrs: 35
    partition 3, leader 36, replicas: 36,35, isrs: 36
    partition 4, leader 0, replicas: 0,36, isrs: 0
    partition 5, leader 1, replicas: 1,0, isrs: 1
    partition 6, leader 2, replicas: 2,1, isrs: 2,1
    partition 7, leader 3, replicas: 3,2, isrs: 3,2
    partition 8, leader 33, replicas: 33,34, isrs: 33
    partition 9, leader 35, replicas: 34,35, isrs: 35
    partition 10, leader 35, replicas: 35,36, isrs: 35
    partition 11, leader 0, replicas: 36,0, isrs: 0
    partition 12, leader 1, replicas: 0,1, isrs: 1
    partition 13, leader 1, replicas: 1,2, isrs: 1,2
    partition 14, leader 2, replicas: 2,3, isrs: 2,3
    partition 15, leader 3, replicas: 3,33, isrs: 3
  topic "mark_test_topic_v3" with 2 partitions:
    partition 0, leader 2, replicas: 5,1,2, isrs: 2,1
    partition 1, leader 2, replicas: 6,2,3, isrs: 2,3

./kafka-list-topic.sh --zookeeper
topic: RealStatis_2     partition: 0    leader: 3       replicas: 33,3  isr: 3
topic: RealStatis_2     partition: 1    leader: 33      replicas: 34,33 isr: 33
topic: RealStatis_2     partition: 2    leader: 35      replicas: 35,34 isr: 35
topic: RealStatis_2     partition: 3    leader: 36      replicas: 36,35 isr: 36,35
topic: RealStatis_2     partition: 4    leader: 0       replicas: 0,36  isr: 0
topic: RealStatis_2     partition: 5    leader: 1       replicas: 1,0   isr: 1
topic: RealStatis_2     partition: 6    leader: 2       replicas: 2,1   isr: 2,1
topic: RealStatis_2     partition: 7    leader: 3       replicas: 3,2   isr: 3,2
topic: RealStatis_2     partition: 8    leader: 33      replicas: 33,34 isr: 33
topic: RealStatis_2     partition: 9    leader: 35      replicas: 34,35 isr: 35
topic: RealStatis_2     partition: 10   leader: 35      replicas: 35,36 isr: 35,36
topic: RealStatis_2     partition: 11   leader: 0       replicas: 36,0  isr: 0
topic: RealStatis_2     partition: 12   leader: 1       replicas: 0,1   isr: 1
topic: RealStatis_2     partition: 13   leader: 1       replicas: 1,2   isr: 1,2
topic: RealStatis_2     partition: 14   leader: 2       replicas: 2,3   isr: 2,3
topic: RealStatis_2     partition: 15   leader: 3       replicas: 3,33  isr: 3,33
topic: RealStatis_test  partition: 0    leader: 33      replicas: 33,34,35      isr: 35,33,34
topic: RealStatis_test  partition: 1    leader: 1       replicas: 0,5,6 isr: 1
topic: RealStatis_test  partition: 2    leader: 0       replicas: 35,36,0       isr: 0
topic: RealStatis_test  partition: 3    leader: 2       replicas: 2,7,0 isr: 2
topic: RealStatis_test  partition: 4    leader: 3       replicas: 3,0,1 isr: 3,0
topic: RealStatis_test  partition: 5    leader: 1       replicas: 1,2,3 isr: 2,1,3
topic: RealStatis_test  partition: 6    leader: 2       replicas: 5,2,3 isr: 2
topic: RealStatis_test  partition: 7    leader: 3       replicas: 6,3,4 isr: 3
topic: RealStatis_test  partition: 8    leader: 35      replicas: 33,35,36      isr: 35,36
topic: RealStatis_test  partition: 9    leader: 3       replicas: 0,2,3 isr: 3,2
topic: RealStatis_test  partition: 10   leader: 1       replicas: 35,0,1        isr: 1,0
topic: RealStatis_test  partition: 11   leader: 1       replicas: 36,1,2        isr: 1,2
topic: RealStatis_test  partition: 12   leader: 2       replicas: 0,2,3 isr: 2,3
topic: RealStatis_test  partition: 13   leader: 1       replicas: 1,3,33        isr: 1,3,33
topic: RealStatis_test  partition: 14   leader: 2       replicas: 2,33,34       isr: 2,33,34
topic: RealStatis_test  partition: 15   leader: 1       replicas: 6,0,1 isr: 1
topic: kafka_client     partition: 0    leader: 3       replicas: 0,3   isr: 3
topic: kafka_client     partition: 1    leader: 1       replicas: 1,33  isr: 1,33
topic: kafka_client     partition: 2    leader: 2       replicas: 2,34  isr: 2,34
topic: kafka_client     partition: 3    leader: 3       replicas: 3,35  isr: 3,35
topic: kafka_client     partition: 4    leader: 36      replicas: 33,36 isr: 36
topic: kafka_client     partition: 5    leader: 0       replicas: 34,0  isr: 0
topic: kafka_client     partition: 6    leader: 1       replicas: 35,1  isr: 1
topic: kafka_client     partition: 7    leader: 2       replicas: 36,2  isr: 2
topic: kafka_client_1   partition: 0    leader: 36      replicas: 36,34 isr: 36
topic: kafka_client_1   partition: 1    leader: 0       replicas: 0,35  isr: 0
topic: kafka_client_1   partition: 2    leader: 1       replicas: 1,36  isr: 1,36
topic: kafka_client_1   partition: 3    leader: 2       replicas: 2,0   isr: 2
topic: kafka_client_1   partition: 4    leader: 3       replicas: 3,1   isr: 3,1
topic: kafka_client_1   partition: 5    leader: 2       replicas: 33,2  isr: 2
topic: kafka_client_1   partition: 6    leader: 3       replicas: 34,3  isr: 3
topic: kafka_client_1   partition: 7    leader: 35      replicas: 35,33 isr: 35
topic: mark_test_topic_v3       partition: 0    leader: 2       replicas: 5,1,2 isr: 2,1
topic: mark_test_topic_v3       partition: 1    leader: 2       replicas: 6,2,3 isr: 2,3
topic: qqpim_test       partition: 0    leader: 2       replicas: 2,34  isr: 2,34
topic: qqpim_test       partition: 1    leader: 3       replicas: 3,35  isr: 3,35
topic: qqpim_test       partition: 2    leader: 36      replicas: 33,36 isr: 36
topic: qqpim_test       partition: 3    leader: 0       replicas: 34,0  isr: 0
topic: qqpim_test       partition: 4    leader: 2       replicas: 1,6   isr: 2
topic: qqpim_test       partition: 5    leader: 2       replicas: 36,2  isr: 2
topic: qqpim_test       partition: 6    leader: 3       replicas: 0,3   isr: 3
topic: qqpim_test       partition: 7    leader: 1       replicas: 1,33  isr: 1,33
topic: qqpim_test       partition: 8    leader: 2       replicas: 2,35  isr: 2,35
topic: qqpim_test       partition: 9    leader: 3       replicas: 3,36  isr: 3,36
topic: qqpim_test       partition: 10   leader: 0       replicas: 33,0  isr: 0
topic: qqpim_test       partition: 11   leader: 1       replicas: 34,1  isr: 1
topic: qqpim_test       partition: 12   leader: 3       replicas: 1,7   isr: 3
topic: qqpim_test       partition: 13   leader: 3       replicas: 36,3  isr: 3
topic: qqpim_test       partition: 14   leader: 0       replicas: 0,33  isr: 0
topic: qqpim_test       partition: 15   leader: 1       replicas: 1,34  isr: 1,34
topic: qqpim_test       partition: 16   leader: 2       replicas: 2,36  isr: 2,36
topic: qqpim_test       partition: 17   leader: 3       replicas: 3,0   isr: 3
topic: qqpim_test       partition: 18   leader: 1       replicas: 33,1  isr: 1
topic: qqpim_test       partition: 19   leader: 2       replicas: 34,2  isr: 2
topic: qqpim_test       partition: 20   leader: 1       replicas: 1,0   isr: 1,0
topic: qqpim_test       partition: 21   leader: 36      replicas: 36,33 isr: 36
topic: qqpim_test       partition: 22   leader: 0       replicas: 0,34  isr: 0
topic: qqpim_test       partition: 23   leader: 1       replicas: 1,35  isr: 1,35

@edenhill
Copy link
Contributor

edenhill commented Oct 9, 2014

Did you set rd_kafka_conf_set_partitioner_cb(), and if so, to what?

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

no set rd_kafka_conf_set_partitioner_cb()

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

after add rd_kafka_conf_set_partitioner_cb(),gain failedinfo :Local: Broker handle destroyed

@edenhill
Copy link
Contributor

Can you elaborate on what happened after setting the callback?

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

after setting the callback,also get core as above,and gain "Local: Broker handle destroyed" in the server log.
and get other type core,
#0 0x00007f83ea337885 in raise () from /lib64/libc.so.6
#1 0x00007f83ea339065 in abort () from /lib64/libc.so.6
#2 0x00007f83ea374977 in __libc_message () from /lib64/libc.so.6
#3 0x00007f83ea37a296 in malloc_printerr () from /lib64/libc.so.6
#4 0x000000000049475e in rd_kafka_topic_new (rk=0x7f8388001080, topic=0x1e20e88 "qqpim_test", conf=0x7f8388003530) at rdkafka_topic.c:503
(gdb) frame 4
#4 0x000000000049475e in rd_kafka_topic_new (rk=0x7f8388001080, topic=0x1e20e88 "qqpim_test", conf=0x7f8388003530) at rdkafka_topic.c:503
503 rdkafka_topic.c: No such file or directory.
in rdkafka_topic.c
(gdb) info args
rk = 0x7f8388001080
topic = 0x1e20e88 "qqpim_test"
conf = 0x7f8388003530
(gdb) p *conf
$1 = {required_acks = -1, enforce_isr_cnt = 32643, request_timeout_ms = 775171637, message_timeout_ms = 825306672, partitioner = 0x90,
produce_offset_report = 52, auto_commit = 0, auto_commit_interval_ms = -2013193104, auto_offset_reset = 32643,
offset_store_path = 0x7 <Address 0x7 out of bounds>, offset_store_sync_interval_ms = -1, offset_store_method = 775303738,
opaque = 0x6e776f6e6b6e75}
(gdb)

@edenhill
Copy link
Contributor

The partitoiner callback is set to 0x90 which obviously isn't a valid function pointer address,
the other values look weird too - seems like memory corruption.
Can you run your program with valgrind to find out what's happening?
valgrind ./your-program ...

@edenhill
Copy link
Contributor

Can you show me your code for setting up the topic configuration and creating the topic, etc?

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

int initKafka() {
...........
...........
if (_sendflags & RD_KAFKA_MSG_F_COPY) {
_sendflags &= ~RD_KAFKA_MSG_F_FREE;
}
DL << "Use Send Flag="<<_sendflags << endl;

/* Kafka configuration */
conf = rd_kafka_conf_new();
/* Topic configuration */
topic_conf = rd_kafka_topic_conf_new();

rd_kafka_conf_set_dr_cb(conf, msg_delivered);

/*
* Create configuration objects
*/
rd_kafka_conf_res_t rd_res;

rd_res = rd_kafka_conf_set(conf,"client.id","test_rdkafka",errstr,sizeof(errstr));
DL <<"set client id,client.id(test_rdkafka)|" << errstr << endl;
if(rd_res != RD_KAFKA_CONF_OK)
{
    EL << "set client.id Error : " << errstr << endl;
}


rd_res = rd_kafka_conf_set(conf,"queue.buffering.max.messages",s_max_messages.c_str(),errstr,sizeof(errstr));
DL <<"set max buffer,maxBuffer("<<s_max_messages<<")|" << errstr << endl;
if(rd_res != RD_KAFKA_CONF_OK)
{
    EL << "set queue.buffering.max.messages Error : " << errstr << endl;
}


rd_res = rd_kafka_conf_set(conf,"batch.num.messages",s_batch_num_messages.c_str(),errstr,sizeof(errstr));
DL <<"set batch message num,nums("<<s_batch_num_messages<<")|" << errstr << endl;
if(rd_res != RD_KAFKA_CONF_OK)
{
    EL << "set batch.num.messages Error : " << errstr << endl;
}


rd_res = rd_kafka_conf_set(conf,"message.send.max.retries",s_max_retries.c_str(),errstr,sizeof(errstr));
DL <<"set retry time,nums("<<s_max_retries<<")|" << errstr << endl;
if(rd_res != RD_KAFKA_CONF_OK)
{
    EL << "set message.send.max.retries Error : " << errstr << endl;
}

if( pCode > 0 )
{
    string codeType = RD_COMPRESSION_NONE;
    //compression.codec 
    if( pCode == 1 )
    {
        codeType = RD_COMPRESSION_GZIP;
    }
    else if( pCode == 2 )
    {
        codeType = RD_COMPRESSION_SNAPPY;
    }
    rd_res = rd_kafka_conf_set(conf,"compression.codec",codeType.c_str(),errstr,sizeof(errstr));
    DL <<"set compression type,codeType("<< codeType << ")|" << errstr << endl;
    if(rd_res != RD_KAFKA_CONF_OK)
    {
        EL << "set compression.codec Error : " << errstr << endl;
    }
}   

/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,errstr, sizeof(errstr)))) {
    EL << "Failed to create new producer: " << errstr << endl;
}

/* Add brokers */
if (rd_kafka_brokers_add(rk, brokerList.c_str()) == 0) {
    EL << " No valid brokers specified." <<brokerList<< endl;;
}
max_msg = i_max_messages;
return 0;

}

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

void logger(const std::string & app, const std::string & server, const std::string & file, const std::string & format, const vector<std::string> & buffer, taf::JceCurrentPtr current) {
    .........
    .........
    const string topicName = fileConfig.topic_name;
    int partition_num = fileConfig.partition_num;

    /* Create topic */
    rkt = rd_kafka_topic_new(rk, topicName.c_str(), topic_conf);
    if(!rkt)
    {
        EL << "create topic(" << topicName << "),rd_kafka_t (" << rk
            <<"),rd_kafka_topic_conf_t("<<topic_conf<<") failed."
            <<"|errno="<<errno<<"|"<<strerror(errno)<< endl;
        return ;
    }

    DL<<"CplusCmd|"<<iSize<<"|"<<topicName<<endl;
    if ( fileConfig.format_type != FORMAT_SCRIPT ) 
    { 
        string formatStr;
        for (int i = 0; i < iSize; i++) 
        { 
            formatStr = fileConfig.baseFormat->toFormat(fileConfig.version,fileConfig.file_id,_now,buffer[i]);
            if (formatStr.empty()) {
                delSize--;
                continue;
            }
            int partition = RD_KAFKA_PARTITION_UA;
            if( partition_num>0 ) 
            {
                partition=i%partition_num;
            }
            int iRet = write2kafka(formatStr,partition);
            if(iRet == -1)
            {
                delSize--;
            }
        }
    } 
    else 
    { .......
    ........
    }
    rd_kafka_poll(rk,0);
    //Message Buffer check and notify
    int ret = rd_kafka_outq_len(rk);
    DL<<"ret|"<<ret << "|BufSize|" << iSize <<endl;
    if( ret>=(max_msg * 80/100) ) 
    { 
        ostringstream oss;
        oss<<":KAFKAMSGQUEUEWILLFULL,Size:"<<ret;
        string msg = oss.str();
        DAYLOG("Ddebug") << msg << "|QUEUE MAX SIZE : " <<max_msg << endl;
        sendMessage(msg);
        rd_kafka_poll(rk,10);
    }
}

int write2kafka(const std::string & msg,int partition)
{
    int iRet = rd_kafka_produce(rkt, partition,RD_KAFKA_MSG_F_COPY,(char*)msg.c_str(),msg.size(),NULL, 0,NULL);
    if ( iRet == -1 ) 
    {
        //send fail,trigger rewrite
        EL << "Failed to produce to topic " << rd_kafka_topic_name(rkt)
            << " partition " << partition << " " << rd_kafka_err2str(rd_kafka_errno2err(errno)) << endl;
        DAYLOG("buf_full")<<"errno="<<errno<<"|"<<strerror(errno)<<"|iret="<<iRet<<endl;
        //the buffer full send msg first
        rd_kafka_poll(rk, 10);

        //check is need rewrite
        if( g_kafka_rewrite == 1 )
        {
            if( g_nums_rewrite > 0 )
            {
                //rewrite Xnums
                int t_count = 0;
                while( t_count < g_nums_rewrite )
                {
                    t_count++;
                    iRet = rd_kafka_produce(rkt, partition,RD_KAFKA_MSG_F_COPY,(char*)msg.c_str(),msg.size(),NULL, 0,NULL);
                    if( iRet == 0 )
                    {
                        iRet = 0;//rewrite succ
                        break;
                    }
                    rd_kafka_poll(rk,10);
                }
                if( iRet == -1 )
                {
                    DAYLOG("BUF_ERROR")<<"ReWrite Failed,errno="<<errno<<"|"<<strerror(errno)<<"|iret="<<iRet<<endl;
                    DAYLOG("FAIL_LOG")<<msg<<endl;
                }
            }
            else if( g_nums_rewrite < 0 )
            {
                //rewrite until succ
                //send buff log
                int outq_len = rd_kafka_outq_len(rk);
                while( max_msg > outq_len && outq_len >=(max_msg * 90/100) )
                {
                    rd_kafka_poll(rk, 10);
                    outq_len = rd_kafka_outq_len(rk);
                }
                //resend message
                iRet = rd_kafka_produce(rkt, partition,RD_KAFKA_MSG_F_COPY,(char*)msg.c_str(),msg.size(),NULL, 0,NULL);
                if( iRet == -1  )
                {
                    DAYLOG("BUF_ERROR")<<"Rewrite until Succ,errno="<<errno<<"|"<<strerror(errno)<<"|iret="<<iRet<<endl;
                    DAYLOG("FAIL_LOG")<<msg<<endl;
                }
            }
        }
        else
        {
            //curr log write fail
            DAYLOG("FAIL_LOG")<<msg<<endl;
        }
    }
    rd_kafka_poll(rk,0);
    return iRet;
}

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

the code about init kafka conf ,deal message,write2kafka

@edenhill
Copy link
Contributor

Thanks, I cant see anything that is obviously wrong.
Can you run the program in valgrind?

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

yes,i create a new topic,and use it,the server is run ok.

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

run the program in valgrind, has little memory leak

@edenhill
Copy link
Contributor

Can you paste or email me the output from valgrind?

@edenhill
Copy link
Contributor

When it crashes, that is

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

==26482== LEAK SUMMARY:
==26482== definitely lost: 1,424 bytes in 1 blocks
==26482== indirectly lost: 225 bytes in 3 blocks
==26482== possibly lost: 1,216 bytes in 4 blocks
==26482== still reachable: 23,206 bytes in 63 blocks
==26482== suppressed: 0 bytes in 0 blocks

@edenhill
Copy link
Contributor

I need more output from valgrind, that is just the summary, I need the detailed output where it leaks.

I'd also want you to trigger the crash when running valgrind and provide me that output aswell.
Thanks

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

the core can not reappear, but it create topic failed,and the errno=22(Invalid argument).

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

==737== Memcheck, a memory error detector
==737== Copyright (C) 2002-2013, and GNU GPL'd, by Julian Seward et al.
==737== Using Valgrind-3.10.0 and LibVEX; rerun with -h for copyright info
==737== Command: ./test_realStatis -T qqpim_test -B 10.135.4.232:9092,10.135.4.83:9092,10.135.35.227:9092 -P -1
==737==
./test_realStatis: /lib64/libz.so.1: no version information available (required by ./test_realStatis)
./test_realStatis: /lib64/libz.so.1: no version information available (required by ./test_realStatis)
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29929: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x416F78: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:67)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29938: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x416F78: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:67)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Syscall param write(buf) points to uninitialised byte(s)
==737== at 0x5EE7A2D: ??? (in /lib64/libc-2.12.so)
==737== by 0x5E7FF12: _IO_file_write@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)
==737== by 0x5E814C4: _IO_do_write@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)
==737== by 0x5E806C7: IO_file_sync@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)
==737== by 0x5E74EE9: fflush (in /lib64/libc-2.12.so)
==737== by 0x57004F1: std::ostream::flush() (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x4429E9: taf::RollWriteT::operator()(std::ostream&, std::deque<std::pair<int, std::string>, std::allocator<std::pair<int, std::string> > > const&) (taf_logger.cpp:57)
==737== by 0x41BA25: taf::TC_RollBySizetaf::RollWriteT::roll(std::deque<std::pair<int, std::string>, std::allocator<std::pair<int, std::string> > > const&) (tc_logger.h:1095)
==737== by 0x58E57B: taf::TC_LoggerRoll::write(std::pair<int, std::string> const&) (tc_logger.cpp:73)
==737== by 0x58E695: taf::LoggerBuffer::sync() (tc_logger.cpp:281)
==737== by 0x57004F1: std::ostream::flush() (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418A74: taf::LoggerStream::operator<<(std::ostream& (
)(std::ostream&)) (tc_logger.h:428)
==737== Address 0x4021059 is not stack'd, malloc'd or (recently) free'd
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29929: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x41727C: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:75)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29938: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x41727C: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:75)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29929: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x417580: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:83)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29938: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x417580: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:83)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29929: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x417884: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:91)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29938: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x417884: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:91)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29929: _strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const
) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x417BED: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:110)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Conditional jump or move depends on uninitialised value(s)
==737== at 0x4C29938: __strlen_sse42 (vg_replace_strmem.c:416)
==737== by 0x5701BA0: std::basic_ostream<char, std::char_traits >& std::operator<< std::char_traits(std::basic_ostream<char, std::char_traits >&, char const*) (in /usr/lib64/libstdc++.so.6.0.13)
==737== by 0x418E1B: taf::LoggerStream& taf::LoggerStream::operator<< <char [512]>(char const (&) [512]) (tc_logger.h:422)
==737== by 0x417BED: kafkaClient_c::initKafka(std::string const&) (kafkaClient_c.cpp:110)
==737== by 0x41E5B0: main (optsClient_c.cpp:80)
==737==
==737== Invalid read of size 4
==737== at 0x4313B9: rd_kafka_topic_new (rdkafka_topic.c:484)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96cc is 12 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 4
==737== at 0x4313C4: rd_kafka_topic_new (rdkafka_topic.c:484)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96c8 is 8 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 8
==737== at 0x431476: rd_kafka_topic_new (rdkafka_topic.c:502)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96c0 is 0 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 8
==737== at 0x431485: rd_kafka_topic_new (rdkafka_topic.c:502)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96c8 is 8 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 8
==737== at 0x431491: rd_kafka_topic_new (rdkafka_topic.c:502)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96d0 is 16 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 8
==737== at 0x43149D: rd_kafka_topic_new (rdkafka_topic.c:502)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96d8 is 24 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 8
==737== at 0x4314A9: rd_kafka_topic_new (rdkafka_topic.c:502)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96e0 is 32 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 8
==737== at 0x4314B5: rd_kafka_topic_new (rdkafka_topic.c:502)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96e8 is 40 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 8
==737== at 0x4314C1: rd_kafka_topic_new (rdkafka_topic.c:502)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96f0 is 48 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid read of size 8
==737== at 0x4314CD: rd_kafka_topic_new (rdkafka_topic.c:502)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96f8 is 56 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==
==737== Invalid free() / delete / delete[] / realloc()
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737== Address 0x61a96c0 is 0 bytes inside a block of size 64 free'd
==737== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==737== by 0x4314DD: rd_kafka_topic_new (rdkafka_topic.c:503)
==737== by 0x416C1D: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:146)
==737== by 0x41E797: main (optsClient_c.cpp:109)
==737==

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

i clear some extraneous code.

==4786== Memcheck, a memory error detector
==4786== Copyright (C) 2002-2013, and GNU GPL'd, by Julian Seward et al.
==4786== Using Valgrind-3.10.0 and LibVEX; rerun with -h for copyright info
==4786== Command: ./test_realStatis -T qqpim_test -B 10.135.4.232:9092,10.135.4.83:9092,10.135.35.227:9092 -P -1
==4786==
./test_realStatis: /lib64/libz.so.1: no version information available (required by ./test_realStatis)
./test_realStatis: /lib64/libz.so.1: no version information available (required by ./test_realStatis)
==4786== Invalid read of size 4
==4786== at 0x41A029: rd_kafka_topic_new (rdkafka_topic.c:484)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f6ec is 12 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 4
==4786== at 0x41A034: rd_kafka_topic_new (rdkafka_topic.c:484)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f6e8 is 8 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 8
==4786== at 0x41A0E6: rd_kafka_topic_new (rdkafka_topic.c:502)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f6e0 is 0 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 8
==4786== at 0x41A0F5: rd_kafka_topic_new (rdkafka_topic.c:502)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f6e8 is 8 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 8
==4786== at 0x41A101: rd_kafka_topic_new (rdkafka_topic.c:502)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f6f0 is 16 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 8
==4786== at 0x41A10D: rd_kafka_topic_new (rdkafka_topic.c:502)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f6f8 is 24 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 8
==4786== at 0x41A119: rd_kafka_topic_new (rdkafka_topic.c:502)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f700 is 32 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 8
==4786== at 0x41A125: rd_kafka_topic_new (rdkafka_topic.c:502)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f708 is 40 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 8
==4786== at 0x41A131: rd_kafka_topic_new (rdkafka_topic.c:502)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f710 is 48 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid read of size 8
==4786== at 0x41A13D: rd_kafka_topic_new (rdkafka_topic.c:502)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f718 is 56 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==
==4786== Invalid free() / delete / delete[] / realloc()
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786== Address 0x619f6e0 is 0 bytes inside a block of size 64 free'd
==4786== at 0x4C27BE4: free (vg_replace_malloc.c:473)
==4786== by 0x41A14D: rd_kafka_topic_new (rdkafka_topic.c:503)
==4786== by 0x405E3F: kafkaClient_c::dealMessgae(std::vector<std::string, std::allocatorstd::string > const&, std::string const&, int) (kafkaClient_c.cpp:136)
==4786== by 0x407504: main (optsClient_c.cpp:109)
==4786==

@edenhill
Copy link
Contributor

Are you reusing the topic_conf for multiple topics?
If so you need to..._dup() it first since topic_new() taket ownership and
frees the provided topic conf

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

thanks edenhill , now i set the topic conf as null.
and the Crash on rd_kafka_topic_metadata_update also cause use the same topic_conf for multiple topics? i can not reproduce it.

@edenhill
Copy link
Contributor

if you want to reuse the topic_conf object you need to duplicate it, so change your topic_new call to:

rd_kafka_topic_new(....,  rd_kafka_topic_conf_dup(topic_conf));

@laxpio
Copy link
Author

laxpio commented Oct 10, 2014

RD_KAFKA_RESP_ERR__MSG_TIMED_OUT is means the respose about send message timeout or the message in produce buffer timeout?

@edenhill
Copy link
Contributor

That means the message timed out in the local queue, this is usually caused by the broker not being available, or limited bandwidth to the broker.

@edenhill
Copy link
Contributor

Can we close this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants