diff --git a/lib_async/kafka_async.ml b/lib_async/kafka_async.ml index 06df324..8b17d9d 100644 --- a/lib_async/kafka_async.ml +++ b/lib_async/kafka_async.ml @@ -66,7 +66,9 @@ let new_producer xs = ignore (poll' handler)); return { handler; pending_msg; stop_poll } -external new_consumer' : (string * string) list -> Kafka.handler response +external new_consumer' : + ?rebalance_callback:(unit -> unit) -> + (string * string) list -> Kafka.handler response = "ocaml_kafka_async_new_consumer" external consumer_poll' : Kafka.handler -> Kafka.message option response @@ -80,12 +82,12 @@ let handle_incoming_message subscriptions = function | None -> () | Some writer -> Pipe.write_without_pushback writer msg) -let new_consumer xs = +let new_consumer ?rebalance_callback xs = let open Result.Let_syntax in let subscriptions = String.Table.create ~size:(8 * 1024) () in let stop_poll = Ivar.create () in let start_poll = Ivar.create () in - let%bind handler = new_consumer' xs in + let%bind handler = new_consumer' ?rebalance_callback xs in every ~start:(Ivar.read start_poll) ~stop:(Ivar.read stop_poll) poll_interval (fun () -> match consumer_poll' handler with diff --git a/lib_async/kafka_async.mli b/lib_async/kafka_async.mli index edee9fa..54c41fc 100644 --- a/lib_async/kafka_async.mli +++ b/lib_async/kafka_async.mli @@ -16,7 +16,7 @@ val produce : val new_producer : (string * string) list -> producer response -val new_consumer : (string * string) list -> consumer response +val new_consumer : ?rebalance_callback:(unit -> unit) -> (string * string) list -> consumer response val new_topic : producer -> string -> (string * string) list -> Kafka.topic response diff --git a/lib_async/ocaml_async_kafka.c b/lib_async/ocaml_async_kafka.c index c3dc6d2..84a8eb6 100644 --- a/lib_async/ocaml_async_kafka.c +++ b/lib_async/ocaml_async_kafka.c @@ -75,8 +75,35 @@ CAMLprim value ocaml_kafka_async_new_producer(value caml_delivery_callback, valu CAMLreturn(result); } -CAMLprim value ocaml_kafka_async_new_consumer(value caml_consumer_options) { - CAMLparam1(caml_consumer_options); +static void ocaml_kafka_async_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) { + CAMLparam0(); + CAMLlocal1(caml_cb); + + caml_cb = ((ocaml_kafka_opaque*)opaque)->caml_callback; + + // do the default thing, but also call our OCaml code + switch (err) + { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + rd_kafka_assign(rk, partitions); + caml_callback(caml_cb, Val_unit); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + rd_kafka_commit(rk, partitions, 0); + rd_kafka_assign(rk, NULL); + caml_callback(caml_cb, Val_unit); + break; + + default: + break; + } + + CAMLreturn0; +} + +CAMLprim value ocaml_kafka_async_new_consumer(value caml_rebalance_callback, value caml_consumer_options) { + CAMLparam2(caml_rebalance_callback, caml_consumer_options); CAMLlocal1(result); char error_msg[160]; @@ -88,8 +115,16 @@ CAMLprim value ocaml_kafka_async_new_consumer(value caml_consumer_options) { CAMLreturn(result); } + ocaml_kafka_opaque* opaque = NULL; + if (Is_block(caml_rebalance_callback)) { + opaque = ocaml_kafka_opaque_create(caml_rebalance_callback); + rd_kafka_conf_set_opaque(conf, (void*) opaque); + rd_kafka_conf_set_rebalance_cb(conf, ocaml_kafka_async_rebalance_cb); + } + rd_kafka_t *handler = rd_kafka_new(RD_KAFKA_CONSUMER, conf, error_msg, sizeof(error_msg)); if (handler == NULL) { + ocaml_kafka_opaque_destroy(opaque); rd_kafka_conf_destroy(conf); result = ERROR(RD_KAFKA_RESP_ERR__FAIL, "Failed to create new kafka consumer (%s)", error_msg); CAMLreturn(result);