Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to react on rebalance #27

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions lib_async/kafka_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ let new_producer xs =
ignore (poll' handler));
return { handler; pending_msg; stop_poll }

external new_consumer' : (string * string) list -> Kafka.handler response
external new_consumer' :
?rebalance_callback:(unit -> unit) ->
(string * string) list -> Kafka.handler response
= "ocaml_kafka_async_new_consumer"

external consumer_poll' : Kafka.handler -> Kafka.message option response
Expand All @@ -80,12 +82,12 @@ let handle_incoming_message subscriptions = function
| None -> ()
| Some writer -> Pipe.write_without_pushback writer msg)

let new_consumer xs =
let new_consumer ?rebalance_callback xs =
let open Result.Let_syntax in
let subscriptions = String.Table.create ~size:(8 * 1024) () in
let stop_poll = Ivar.create () in
let start_poll = Ivar.create () in
let%bind handler = new_consumer' xs in
let%bind handler = new_consumer' ?rebalance_callback xs in
every ~start:(Ivar.read start_poll) ~stop:(Ivar.read stop_poll) poll_interval
(fun () ->
match consumer_poll' handler with
Expand Down
2 changes: 1 addition & 1 deletion lib_async/kafka_async.mli
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ val produce :

val new_producer : (string * string) list -> producer response

val new_consumer : (string * string) list -> consumer response
val new_consumer : ?rebalance_callback:(unit -> unit) -> (string * string) list -> consumer response

val new_topic :
producer -> string -> (string * string) list -> Kafka.topic response
Expand Down
39 changes: 37 additions & 2 deletions lib_async/ocaml_async_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,35 @@ CAMLprim value ocaml_kafka_async_new_producer(value caml_delivery_callback, valu
CAMLreturn(result);
}

CAMLprim value ocaml_kafka_async_new_consumer(value caml_consumer_options) {
CAMLparam1(caml_consumer_options);
static void ocaml_kafka_async_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) {
CAMLparam0();
CAMLlocal1(caml_cb);

caml_cb = ((ocaml_kafka_opaque*)opaque)->caml_callback;

// do the default thing, but also call our OCaml code
switch (err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably handle cooperative assignors too, and call rd_kafka_incremental_assign (see https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a10db731dc1a295bd9884e4f8cb199311)

{
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
rd_kafka_assign(rk, partitions);
caml_callback(caml_cb, Val_unit);
break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
rd_kafka_commit(rk, partitions, 0);
rd_kafka_assign(rk, NULL);
caml_callback(caml_cb, Val_unit);
break;

default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is missing a call to rd_kafka_assign():

In this latter case (arbitrary error), the application must call rd_kafka_assign(rk, NULL) to synchronize state.

break;
}

CAMLreturn0;
}

CAMLprim value ocaml_kafka_async_new_consumer(value caml_rebalance_callback, value caml_consumer_options) {
CAMLparam2(caml_rebalance_callback, caml_consumer_options);
CAMLlocal1(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CAMLlocal1(result);
CAMLlocal2(caml_callback, result);


char error_msg[160];
Expand All @@ -88,8 +115,16 @@ CAMLprim value ocaml_kafka_async_new_consumer(value caml_consumer_options) {
CAMLreturn(result);
}

ocaml_kafka_opaque* opaque = NULL;
if (Is_block(caml_rebalance_callback)) {
opaque = ocaml_kafka_opaque_create(caml_rebalance_callback);
Comment on lines +119 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (Is_block(caml_rebalance_callback)) {
opaque = ocaml_kafka_opaque_create(caml_rebalance_callback);
if (Is_block(caml_rebalance_callback)) {
caml_callback = Field(caml_rebalance_callback, 0);
opaque = ocaml_kafka_opaque_create(caml_callback);

rd_kafka_conf_set_opaque(conf, (void*) opaque);
rd_kafka_conf_set_rebalance_cb(conf, ocaml_kafka_async_rebalance_cb);
}

rd_kafka_t *handler = rd_kafka_new(RD_KAFKA_CONSUMER, conf, error_msg, sizeof(error_msg));
if (handler == NULL) {
ocaml_kafka_opaque_destroy(opaque);
rd_kafka_conf_destroy(conf);
result = ERROR(RD_KAFKA_RESP_ERR__FAIL, "Failed to create new kafka consumer (%s)", error_msg);
CAMLreturn(result);
Expand Down