diff --git a/Dockerfile b/Dockerfile index 529ee3af..c8408cc4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM debian:stretch-slim -ARG librdkafka_version=v0.11.1-RC1 +ARG librdkafka_version=v0.11.1 ARG yajl_version=2.1.0 COPY . /usr/src/kafkacat diff --git a/kafkacat.c b/kafkacat.c index f0771655..80c616a1 100644 --- a/kafkacat.c +++ b/kafkacat.c @@ -57,6 +57,7 @@ struct conf conf = { .partition = RD_KAFKA_PARTITION_UA, .msg_size = 1024*1024, .null_str = "NULL", + .fixed_key = NULL }; static struct stats { @@ -350,6 +351,11 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) { } } + if (!key && conf.fixed_key) { + key = conf.fixed_key; + key_len = conf.fixed_key_len; + } + if (!(msgflags & RD_KAFKA_MSG_F_COPY) && len > 1024 && !(conf.flags & CONF_F_TEE)) { /* If message is larger than this arbitrary @@ -936,6 +942,9 @@ static void RD_NORETURN usage (const char *argv0, int exitcode, " -p -1 Use random partitioner\n" " -D Delimiter to split input into messages\n" " -K Delimiter to split input key and message\n" + " -k Use a fixed key for all messages.\n" + " If combined with -K, per-message keys\n" + " takes precendence.\n" " -l Send messages from a file separated by\n" " delimiter, as with stdin.\n" " (only one file allowed)\n" @@ -1146,7 +1155,7 @@ static void argparse (int argc, char **argv, int do_conf_dump = 0; while ((opt = getopt(argc, argv, - "PCG:LQt:p:b:z:o:eED:K:Od:qvX:c:Tuf:ZlVh" + "PCG:LQt:p:b:z:o:eED:K:k:Od:qvX:c:Tuf:ZlVh" #if ENABLE_JSON "J" #endif @@ -1226,6 +1235,10 @@ static void argparse (int argc, char **argv, key_delim = optarg; conf.flags |= CONF_F_KEY_DELIM; break; + case 'k': + conf.fixed_key = optarg; + conf.fixed_key_len = (size_t)(strlen(conf.fixed_key)); + break; case 'l': conf.flags |= CONF_F_LINE; break; diff --git a/kafkacat.h b/kafkacat.h index a0940f9e..189ed2fd 100644 --- a/kafkacat.h +++ b/kafkacat.h @@ -88,6 +88,8 @@ struct conf { char *topic; int32_t partition; char *group; + char *fixed_key; + int32_t fixed_key_len; int64_t offset; int exit_eof; int64_t msg_cnt;