Skip to content

Commit

Permalink
Merge pull request #12 from ZhuYouzhi/master
Browse files Browse the repository at this point in the history
Add return code for Kafka::produce and free message payload memory in rd_kafka_example when queue is full
  • Loading branch information
edenhill committed Aug 9, 2013
2 parents 52440ea + b6fff10 commit bac7e27
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
6 changes: 5 additions & 1 deletion examples/rdkafka_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ int main (int argc, char **argv) {
strncpy(opbuf, buf, len + 1);

/* Send/Produce message. */
rd_kafka_produce(rk, topic, partition, RD_KAFKA_OP_F_FREE, opbuf, len);
if(rd_kafka_produce(rk, topic, partition,
RD_KAFKA_OP_F_FREE, opbuf, len) == -1){
free(opbuf);
continue;
}
fprintf(stderr, "%% Sent %i bytes to topic "
"%s partition %i\n", len, topic, partition);
sendcnt++;
Expand Down
6 changes: 5 additions & 1 deletion examples/rdkafka_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ int main (int argc, char **argv) {
strncpy(opbuf, buf, len + 1);

/* Send/Produce message. */
kafka.produce(topic, partition, RD_KAFKA_OP_F_FREE, opbuf, len);
if (kafka.produce(topic, partition,
RD_KAFKA_OP_F_FREE, opbuf, len) == -1){
free(opbuf);
continue;
}
fprintf(stderr, "%% Sent %i bytes to topic "
"%s partition %i\n", len, topic, partition);
sendcnt++;
Expand Down
6 changes: 3 additions & 3 deletions rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Kafka{
const char * getTopic()const{return topic;}
*/

void produce(char *topic, uint32_t partition,int msgflags, char *payload, size_t len);
int produce(char *topic, uint32_t partition,int msgflags, char *payload, size_t len);

private:
rd_kafka_t *rk;
Expand All @@ -56,8 +56,8 @@ bool Kafka::setHandle(rd_kafka_type_t type,const char * broker,const rd_kafka_co
return (rk = rd_kafka_new(type, broker, conf))!=NULL;
}

void Kafka::produce(char *topic, uint32_t partition,int msgflags, char *payload, size_t len){
rd_kafka_produce(rk, topic, partition, msgflags, payload, len);
int Kafka::produce(char *topic, uint32_t partition,int msgflags, char *payload, size_t len){
return rd_kafka_produce(rk, topic, partition, msgflags, payload, len);
}

}
Expand Down

0 comments on commit bac7e27

Please sign in to comment.