Skip to content

Commit

Permalink
update requirement to v0.11.0
Browse files Browse the repository at this point in the history
add offsets on subscribe. cleanup compiler warnings
  • Loading branch information
Sergey Vidyuk committed Mar 7, 2018
1 parent 1f7718b commit 28821dc
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 18 deletions.
12 changes: 12 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
language: c
sudo: required
dist: trusty
os:
- linux

before_install:
- if [ $TRAVIS_OS_NAME = linux ]; then sudo apt-get update;sudo apt-get install gcc-multilib; fi
- if [ $TRAVIS_OS_NAME = linux ]; then sudo apt-get install librdkafka-dev librdkafka-dev:i386; fi

script:
- make all
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ TGT=libkfk.dll
else
KAFKA_ROOT=${HOME}
KFK_INCLUDE=${KAFKA_ROOT}/include
OPTS=-DKXVER=3 -Wall -Wno-strict-aliasing -Wno-parentheses -shared -fPIC
OPTS=-DKXVER=3 -Wall -Wno-strict-aliasing -Wno-parentheses -shared -fPIC -Wextra -Werror -Wsign-compare -Wwrite-strings
LDOPTS_STATIC=${KAFKA_ROOT}/lib/librdkafka.a -lz -lpthread -lssl -g -O2
LDOPTS=-L${KAFKA_ROOT}/lib/ -lrdkafka -lz -lpthread -lssl -g -O2
OSXOPTS=-undefined dynamic_lookup -mmacosx-version-min=10.12
Expand Down
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ For list of options see: https://github.com/edenhill/librdkafka/blob/master/CONF
# Building and installation

## Step 1
Build and install latest version of librdkafka.
Build and install latest version of librdkafka. Minimum required version is v0.11.0.
### Requirements
As noted on librdkafka page https://github.com/edenhill/librdkafka#requirements
The GNU toolchain
Expand All @@ -64,8 +64,18 @@ To build 32-bit versions on 64-bit OS you need to have 32-bit version of librari
sudo yum install glibc-devel.i686 libgcc.i686 libstdc++.i686 zlib-devel.i686
# Ubuntu
sudo apt-get install gcc-multilib
```
### Librdkafka
#### Package installation
```
#macOS
brew install librdkafka
#Ubuntu/Debian(unstable)
sudo apt-get install librdkafka-dev
#RHEL/CentOS
sudo yum install librdkafka-devel
```
#### Building from source
### macOS and Linux
```bash
git clone https://github.com/edenhill/librdkafka.git
Expand All @@ -86,7 +96,7 @@ make install

```
### Windows (to be added)
Using Nuget.
Using Nuget redistributable(https://www.nuget.org/packages/librdkafka.redist)
```
nuget install librdkafka.redist
```
Expand Down
4 changes: 2 additions & 2 deletions k.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ extern V m9();
extern V m9(V);
#endif
extern I khpun(const S,I,const S,I),khpu(const S,I,const S),khp(const S,I),okx(K),ymd(I,I,I),dj(I);extern V r0(K),sd0(I),kclose(I);extern S sn(S,I),ss(S);
extern K ktj(I,J),ka(I),kb(I),kg(I),kh(I),ki(I),kj(J),ke(F),kf(F),kc(I),ks(S),kd(I),kz(F),kt(I),sd1(I,K(*)(I)),dl(V*f,I),
knk(I,...),kp(S),ja(K*,V*),js(K*,S),jk(K*,K),jv(K*k,K),k(I,const S,...)__attribute__((sentinel)),xT(K),xD(K,K),ktd(K),r1(K),krr(const S),orr(const S),dot(K,K),b9(I,K),d9(K);
extern K ktj(I,J),ka(I),kb(I),kg(I),kh(I),ki(I),kj(J),ke(F),kf(F),kc(I),ks(const C*),kd(I),kz(F),kt(I),sd1(I,K(*)(I)),dl(V*f,I),
knk(I,...),kp(S),ja(K*,V*),js(K*,S),jk(K*,K),jv(K*k,K),k(I,const C*,...)__attribute__((sentinel)),xT(K),xD(K,K),ktd(K),r1(K),krr(const C*),orr(const C*),dot(K,K),b9(I,K),d9(K);
#ifdef __cplusplus
}
#endif
Expand Down
38 changes: 26 additions & 12 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ typedef unsigned int UI;
#define KR -128
#define KNL (K) 0
#define KFK_OK RD_KAFKA_RESP_ERR_NO_ERROR
#ifdef __GNUC__
# define UNUSED(x) x __attribute__((__unused__))
#else
# define UNUSED(x) x
#endif
// create dictionary q dictionary from list of items (s1;v1;s2;v2;...)
K xd0(I n, ...) __attribute__((sentinel));
K xd0(I n, ...) {
Expand All @@ -35,12 +40,12 @@ static I spair[2];
static K S0;
// check type
// letter as usual, + for table, ! for dict
static I checkType(S tc, ...) {
static I checkType(const C* tc, ...) {
va_list args;
K x;
static C lt[256]= " tvunzdmpscfejihg xb*BX GHIJEFCSPMDZNUVT";
static C b[256];
S tc0= tc;
const C* tc0= tc;
I match=0;
lt[20 + 98]= '+';
lt[20 + 99]= '!';
Expand Down Expand Up @@ -92,10 +97,10 @@ static I printr0(K x) {
r0(x);
return 0;
}
static I statscb(rd_kafka_t *rk, S json, size_t json_len, V *opaque) {
static I statscb(rd_kafka_t*UNUSED(rk), S json, size_t json_len, V*UNUSED(opaque)) {
return printr0(k(0, (S) ".kfk.statcb", kpn(json, json_len), KNL));
} // should return 0 to indicate mem free to kafka
static V logcb(const rd_kafka_t *rk, int level, const char *fac,
static V logcb(const rd_kafka_t *UNUSED(rk), int level, const char *fac,
const char *buf) {
printr0(k(0, (S) ".kfk.logcb", ki(level), kp((S) fac), kp((S) buf), KNL));
}
Expand Down Expand Up @@ -293,15 +298,25 @@ K kfkPub(K tid, K partid, K data, K key) {
K kfkSub(K cid, K topic, K partitions) {
rd_kafka_resp_err_t err;
rd_kafka_t *rk;
J i;
if(!checkType("isI", cid, topic, partitions))
J i,*o=NULL;
I*p;
if(!checkType("is[I!]", cid, topic, partitions))
return KNL;
if(!(rk= clientIndex(cid)))
return KNL;
rd_kafka_topic_partition_list_t *t_partition=
rd_kafka_topic_partition_list_new(partitions->n);
for(i= 0; i < partitions->n; ++i)
rd_kafka_topic_partition_list_add(t_partition, topic->s, kI(partitions)[i]);
for(i= 0; i < partitions->n; ++i){
if(partitions->t==XD){
p=kI(kK(partitions)[0]);
o=kJ(kK(partitions)[1]);
}else{
p=kI(partitions);
}
rd_kafka_topic_partition_list_add(t_partition, topic->s, p[i]);
if(o)
rd_kafka_topic_partition_list_set_offset(t_partition, topic->s, p[i],o[i]);
}
if(KFK_OK != (err= rd_kafka_subscribe(rk, t_partition)))
return krr((S) rd_kafka_err2str(err));
return knk(0);
Expand Down Expand Up @@ -391,11 +406,10 @@ K kfkOutQLen(K cid) {
return KNL;
return ki(rd_kafka_outq_len(rk));
}
K kfkVersion(K _) { return ki(rd_kafka_version()); }
K kfkExportErr(K _) {
K kfkVersion(K UNUSED(x)) { return ki(rd_kafka_version()); }
K kfkExportErr(K UNUSED(dummy)) {
const struct rd_kafka_err_desc *errdescs;
size_t n;
J i;
size_t i,n;
K x= ktn(0, 0), y= ktn(0, 0), z= ktn(0, 0);
rd_kafka_get_err_descs(&errdescs, &n);
for(i= 0; i < n; ++i)
Expand Down

0 comments on commit 28821dc

Please sign in to comment.