diff --git a/Makefile b/Makefile index 01c8dcc..8daf796 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ TGT=libkfk.dll else KAFKA_ROOT=${HOME} KFK_INCLUDE=${KAFKA_ROOT}/include -OPTS=-DKXVER=3 -Wall -Wno-strict-aliasing -Wno-parentheses -shared -fPIC +OPTS=-DKXVER=3 -Wall -Wno-strict-aliasing -Wno-parentheses -shared -fPIC -Wextra -Werror -Wsign-compare -Wwrite-strings LDOPTS_STATIC=${KAFKA_ROOT}/lib/librdkafka.a -lz -lpthread -lssl -g -O2 LDOPTS=-L${KAFKA_ROOT}/lib/ -lrdkafka -lz -lpthread -lssl -g -O2 OSXOPTS=-undefined dynamic_lookup -mmacosx-version-min=10.12 diff --git a/README.md b/README.md index 3be6c19..4636606 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,17 @@ See [code.kx.com/q/interfaces/kafka](http://code.kx.com/q/interfaces/kafka/) for Build and install the latest version of `librdkafka`. The minimum required version is v0.11.0. -#### Requirements +## Step 1 +Build and install latest version of librdkafka. Minimum required version is v0.11.0. +### Requirements +As noted on librdkafka page https://github.com/edenhill/librdkafka#requirements + The GNU toolchain + GNU make + pthreads + zlib (optional, for gzip compression support) + libssl-dev (optional, for SSL and SASL SCRAM support) + libsasl2-dev (optional, for SASL GSSAPI support) + As noted on the [librdkafka page](https://github.com/edenhill/librdkafka#requirements) ``` @@ -33,11 +43,10 @@ sudo yum install glibc-devel.i686 libgcc.i686 libstdc++.i686 zlib-devel.i686 # Ubuntu sudo apt-get install gcc-multilib ``` - -#### Librdkafka - -##### Package installation +### Librdkafka +#### Package installation ``` + #macOS brew install librdkafka #Ubuntu/Debian(unstable) @@ -45,10 +54,9 @@ sudo apt-get install librdkafka-dev #RHEL/CentOS sudo yum install librdkafka-devel ``` +#### Building from source +### macOS and Linux -##### Building from source - -#### macOS and Linux ```bash git clone https://github.com/edenhill/librdkafka.git cd librdkafka @@ -65,9 +73,9 @@ make clean # to make sure nothing left from previous build or if upgrading/rebu make make install ``` +### Windows (to be added) +Using Nuget redistributable(https://www.nuget.org/packages/librdkafka.redist) -#### Windows (to be added) -Using the Nuget redistributable (https://www.nuget.org/packages/librdkafka.redist) ``` nuget install librdkafka.redist ``` diff --git a/k.h b/k.h index eb2f27b..ab85ef9 100644 --- a/k.h +++ b/k.h @@ -69,8 +69,8 @@ extern V m9(); extern V m9(V); #endif extern I khpun(const S,I,const S,I),khpu(const S,I,const S),khp(const S,I),okx(K),ymd(I,I,I),dj(I);extern V r0(K),sd0(I),kclose(I);extern S sn(S,I),ss(S); -extern K ktj(I,J),ka(I),kb(I),kg(I),kh(I),ki(I),kj(J),ke(F),kf(F),kc(I),ks(S),kd(I),kz(F),kt(I),sd1(I,K(*)(I)),dl(V*f,I), - knk(I,...),kp(S),ja(K*,V*),js(K*,S),jk(K*,K),jv(K*k,K),k(I,const S,...)__attribute__((sentinel)),xT(K),xD(K,K),ktd(K),r1(K),krr(const S),orr(const S),dot(K,K),b9(I,K),d9(K); +extern K ktj(I,J),ka(I),kb(I),kg(I),kh(I),ki(I),kj(J),ke(F),kf(F),kc(I),ks(const C*),kd(I),kz(F),kt(I),sd1(I,K(*)(I)),dl(V*f,I), + knk(I,...),kp(S),ja(K*,V*),js(K*,S),jk(K*,K),jv(K*k,K),k(I,const C*,...)__attribute__((sentinel)),xT(K),xD(K,K),ktd(K),r1(K),krr(const C*),orr(const C*),dot(K,K),b9(I,K),d9(K); #ifdef __cplusplus } #endif diff --git a/kfk.c b/kfk.c index 369cea4..ccf1665 100644 --- a/kfk.c +++ b/kfk.c @@ -17,6 +17,11 @@ typedef unsigned int UI; #define KR -128 #define KNL (K) 0 #define KFK_OK RD_KAFKA_RESP_ERR_NO_ERROR +#ifdef __GNUC__ +# define UNUSED(x) x __attribute__((__unused__)) +#else +# define UNUSED(x) x +#endif // create dictionary q dictionary from list of items (s1;v1;s2;v2;...) K xd0(I n, ...) __attribute__((sentinel)); K xd0(I n, ...) { @@ -35,12 +40,12 @@ static I spair[2]; static K S0; // check type // letter as usual, + for table, ! for dict -static I checkType(S tc, ...) { +static I checkType(const C* tc, ...) { va_list args; K x; static C lt[256]= " tvunzdmpscfejihg xb*BX GHIJEFCSPMDZNUVT"; static C b[256]; - S tc0= tc; + const C* tc0= tc; I match=0; lt[20 + 98]= '+'; lt[20 + 99]= '!'; @@ -92,10 +97,10 @@ static I printr0(K x) { r0(x); return 0; } -static I statscb(rd_kafka_t *rk, S json, size_t json_len, V *opaque) { +static I statscb(rd_kafka_t*UNUSED(rk), S json, size_t json_len, V*UNUSED(opaque)) { return printr0(k(0, (S) ".kfk.statcb", kpn(json, json_len), KNL)); } // should return 0 to indicate mem free to kafka -static V logcb(const rd_kafka_t *rk, int level, const char *fac, +static V logcb(const rd_kafka_t *UNUSED(rk), int level, const char *fac, const char *buf) { printr0(k(0, (S) ".kfk.logcb", ki(level), kp((S) fac), kp((S) buf), KNL)); } @@ -293,15 +298,25 @@ K kfkPub(K tid, K partid, K data, K key) { K kfkSub(K cid, K topic, K partitions) { rd_kafka_resp_err_t err; rd_kafka_t *rk; - J i; - if(!checkType("isI", cid, topic, partitions)) + J i,*o=NULL; + I*p; + if(!checkType("is[I!]", cid, topic, partitions)) return KNL; if(!(rk= clientIndex(cid))) return KNL; rd_kafka_topic_partition_list_t *t_partition= rd_kafka_topic_partition_list_new(partitions->n); - for(i= 0; i < partitions->n; ++i) - rd_kafka_topic_partition_list_add(t_partition, topic->s, kI(partitions)[i]); + for(i= 0; i < partitions->n; ++i){ + if(partitions->t==XD){ + p=kI(kK(partitions)[0]); + o=kJ(kK(partitions)[1]); + }else{ + p=kI(partitions); + } + rd_kafka_topic_partition_list_add(t_partition, topic->s, p[i]); + if(o) + rd_kafka_topic_partition_list_set_offset(t_partition, topic->s, p[i],o[i]); + } if(KFK_OK != (err= rd_kafka_subscribe(rk, t_partition))) return krr((S) rd_kafka_err2str(err)); return knk(0); @@ -391,11 +406,10 @@ K kfkOutQLen(K cid) { return KNL; return ki(rd_kafka_outq_len(rk)); } -K kfkVersion(K _) { return ki(rd_kafka_version()); } -K kfkExportErr(K _) { +K kfkVersion(K UNUSED(x)) { return ki(rd_kafka_version()); } +K kfkExportErr(K UNUSED(dummy)) { const struct rd_kafka_err_desc *errdescs; - size_t n; - J i; + size_t i,n; K x= ktn(0, 0), y= ktn(0, 0), z= ktn(0, 0); rd_kafka_get_err_descs(&errdescs, &n); for(i= 0; i < n; ++i)