Skip to content

Commit

Permalink
Added rd_kafka_producev() using va-args to construct message fields (#…
Browse files Browse the repository at this point in the history
…858, #707, #908, #345)

 - adds support for specifying the message timestamp at produce time (#707)
 - adds support for specifying topic by name rather than topic_t object (#908)
  • Loading branch information
edenhill committed Nov 16, 2016
1 parent f817039 commit 15d3e7e
Show file tree
Hide file tree
Showing 6 changed files with 607 additions and 139 deletions.
117 changes: 115 additions & 2 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,26 @@ typedef SSIZE_T ssize_t;
#define RD_EXPORT
#define RD_DEPRECATED __attribute__((deprecated))
#endif
/* @endcond */


/**
* @brief Type-checking macros
* Compile-time checking that \p ARG is of type \p TYPE.
* @returns \p RET
*/
#define _LRK_TYPECHECK(RET,TYPE,ARG) \
({ if (0) { TYPE __t RD_UNUSED = (ARG); } RET; })

#define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) \
({ \
if (0) { \
TYPE __t RD_UNUSED = (ARG); \
TYPE2 __t2 RD_UNUSED = (ARG2); \
} \
RET; })

/* @endcond */


/**
* @name librdkafka version
Expand Down Expand Up @@ -648,6 +665,89 @@ rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,



/**
* @name Var-arg tag types
* @{
*
*/

/**
* @enum rd_kafka_vtype_t
*
* @brief Var-arg tag types
*
* @sa rd_kafka_producev()
*/
typedef enum rd_kafka_vtype_t {
RD_KAFKA_VTYPE_END, /**< va-arg sentinel */
RD_KAFKA_VTYPE_TOPIC, /**< (const char *) Topic name */
RD_KAFKA_VTYPE_RKT, /**< (rd_kafka_topic_t *) Topic handle */
RD_KAFKA_VTYPE_PARTITION, /**< (int32_t) Partition */
RD_KAFKA_VTYPE_VALUE, /**< (void *, size_t) Message value (payload)*/
RD_KAFKA_VTYPE_KEY, /**< (void *, size_t) Message key */
RD_KAFKA_VTYPE_OPAQUE, /**< (void *) Application opaque */
RD_KAFKA_VTYPE_MSGFLAGS, /**< (int) RD_KAFKA_MSG_F_.. flags */
RD_KAFKA_VTYPE_TIMESTAMP, /**< (int64_t) Milliseconds since epoch UTC */
} rd_kafka_vtype_t;


/**
* @brief Convenience macros for rd_kafka_vtype_t that takes the
* correct arguments for each vtype.
*/

/*!
* va-arg end sentinel used to terminate the variable argument list
*/
#define RD_KAFKA_V_END RD_KAFKA_VTYPE_END

/*!
* Topic name (const char *)
*/
#define RD_KAFKA_V_TOPIC(topic) \
_LRK_TYPECHECK(RD_KAFKA_VTYPE_TOPIC, const char *, topic), topic
/*!
* Topic object (rd_kafka_topic_t *)
*/
#define RD_KAFKA_V_RKT(rkt) \
_LRK_TYPECHECK(RD_KAFKA_VTYPE_RKT, rd_kafka_topic_t *, rkt), rkt
/*!
* Partition (int32_t)
*/
#define RD_KAFKA_V_PARTITION(partition) \
_LRK_TYPECHECK(RD_KAFKA_VTYPE_PARTITION, int32_t, partition), partition
/*!
* Message value/payload pointer and length (void *, size_t)
*/
#define RD_KAFKA_V_VALUE(VALUE,LEN) \
_LRK_TYPECHECK2(RD_KAFKA_VTYPE_VALUE, void *, VALUE, size_t, LEN), \
VALUE, LEN
/*!
* Message key pointer and length (const void *, size_t)
*/
#define RD_KAFKA_V_KEY(KEY,LEN) \
_LRK_TYPECHECK2(RD_KAFKA_VTYPE_KEY, const void *, KEY, size_t, LEN), \
KEY, LEN
/*!
* Opaque pointer (void *)
*/
#define RD_KAFKA_V_OPAQUE(opaque) \
_LRK_TYPECHECK(RD_KAFKA_VTYPE_OPAQUE, void *, opaque), opaque
/*!
* Message flags (int)
* @sa RD_KAFKA_MSG_F_COPY, et.al.
*/
#define RD_KAFKA_V_MSGFLAGS(msgflags) \
_LRK_TYPECHECK(RD_KAFKA_VTYPE_MSGFLAGS, int, msgflags), msgflags
/*!
* Timestamp (int64_t)
*/
#define RD_KAFKA_V_TIMESTAMP(timestamp) \
_LRK_TYPECHECK(RD_KAFKA_VTYPE_TIMESTAMP, int64_t, timestamp), timestamp

/**@}*/


/**
* @name Kafka messages
* @{
Expand Down Expand Up @@ -736,7 +836,7 @@ rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) {
*
* The timestamp is the number of milliseconds since the epoch (UTC).
*
* \p tstype is updated to indicate the type of timestamp.
* \p tstype (if not NULL) is updated to indicate the type of timestamp.
*
* @returns message timestamp, or -1 if not available.
*
Expand Down Expand Up @@ -2328,6 +2428,19 @@ int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
void *msg_opaque);


/**
* @brief Produce and send a single message to broker.
*
* The message is defined by a va-arg list using \c rd_kafka_vtype_t
* tag tuples which must be terminated with a single \c RD_KAFKA_V_END.
*
* @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, else an error code.
*
* @sa rd_kafka_produce, RD_KAFKA_V_END
*/
RD_EXPORT
rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...);


/**
* @brief Produce multiple messages.
Expand Down
100 changes: 94 additions & 6 deletions src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
const void *key, size_t keylen,
void *msg_opaque,
rd_kafka_resp_err_t *errp,
int *errnop,
rd_ts_t utc_now,
int *errnop,
int64_t timestamp,
rd_ts_t now) {
rd_kafka_msg_t *rkm;

Expand Down Expand Up @@ -174,8 +174,10 @@ static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */,
payload, len, key, keylen, msg_opaque);

rkm->rkm_timestamp = utc_now / 1000;
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
if (timestamp) {
rkm->rkm_timestamp = timestamp;
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
}

if (rkt->rkt_conf.message_timeout_ms == 0) {
rkm->rkm_ts_timeout = INT64_MAX;
Expand Down Expand Up @@ -210,7 +212,7 @@ int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
/* Create message */
rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags,
payload, len, key, keylen, msg_opaque,
&err, &errnox, rd_uclock(), rd_clock());
&err, &errnox, rd_uclock()/1000, rd_clock());
if (unlikely(!rkm)) {
/* errno is already set by msg_new() */
rd_kafka_set_last_error(err, errnox);
Expand Down Expand Up @@ -246,7 +248,93 @@ int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
}


rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...) {
va_list ap;
rd_kafka_msg_t s_rkm = RD_ZERO_INIT, *rkm = &s_rkm;
rd_kafka_vtype_t vtype;
rd_kafka_itopic_t *rkt = NULL;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;

va_start(ap, rk);
while ((vtype = va_arg(ap, rd_kafka_vtype_t)) != RD_KAFKA_VTYPE_END) {
switch (vtype)
{
case RD_KAFKA_VTYPE_TOPIC:
rkt = rd_kafka_topic_new0(rk, va_arg(ap, const char *),
NULL, NULL, 1);
break;

case RD_KAFKA_VTYPE_RKT:
rkt = rd_kafka_topic_keep(
rd_kafka_topic_a2i(
va_arg(ap, rd_kafka_topic_t *)));
break;

case RD_KAFKA_VTYPE_PARTITION:
rkm->rkm_partition = va_arg(ap, int32_t);
break;

case RD_KAFKA_VTYPE_VALUE:
rkm->rkm_payload = va_arg(ap, void *);
rkm->rkm_len = va_arg(ap, size_t);
break;

case RD_KAFKA_VTYPE_KEY:
rkm->rkm_key = va_arg(ap, void *);
rkm->rkm_key_len = va_arg(ap, size_t);
break;

case RD_KAFKA_VTYPE_OPAQUE:
rkm->rkm_opaque = va_arg(ap, void *);
break;

case RD_KAFKA_VTYPE_MSGFLAGS:
rkm->rkm_flags = va_arg(ap, int);
break;

case RD_KAFKA_VTYPE_TIMESTAMP:
rkm->rkm_timestamp = va_arg(ap, int64_t);
break;

default:
err = RD_KAFKA_RESP_ERR__INVALID_ARG;
break;
}
}

va_end(ap);

if (unlikely(!rkt))
return RD_KAFKA_RESP_ERR__INVALID_ARG;

if (likely(!err))
rkm = rd_kafka_msg_new0(rkt, rkm->rkm_partition,
rkm->rkm_flags,
rkm->rkm_payload, rkm->rkm_len,
rkm->rkm_key, rkm->rkm_key_len,
rkm->rkm_opaque,
&err, NULL,
rkm->rkm_timestamp, rd_clock());
rd_kafka_topic_destroy0(rkt);

if (unlikely(err))
return err;

/* Partition the message */
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
if (unlikely(err)) {
/* Handle partitioner failures: it only fails when
* the application
* attempts to force a destination partition that does not exist
* in the cluster. Note we must clear the RD_KAFKA_MSG_F_FREE
* flag since our contract says we don't free the payload on
* failure. */
rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
}

return err;
}

/**
* Produce a batch of messages.
Expand All @@ -258,7 +346,7 @@ int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
rd_kafka_message_t *rkmessages, int message_cnt) {
rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
int i;
rd_ts_t utc_now = rd_uclock();
int64_t utc_now = rd_uclock() / 1000;
rd_ts_t now = rd_clock();
int good = 0;
rd_kafka_resp_err_t all_err = 0;
Expand Down
6 changes: 4 additions & 2 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,15 @@ int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
rd_kafka_msg_t *rkm;

if (rkmessage->err) {
*tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
if (tstype)
*tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
return -1;
}

rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);

*tstype = rkm->rkm_tstype;
if (tstype)
*tstype = rkm->rkm_tstype;

return rkm->rkm_timestamp;
}
Expand Down
Loading

0 comments on commit 15d3e7e

Please sign in to comment.