Skip to content

Commit

Permalink
feat: add oauthbearer configuration and refresh token callback
Browse files Browse the repository at this point in the history
  • Loading branch information
macasado86 committed Jul 23, 2024
1 parent 227d24d commit e90a7cd
Show file tree
Hide file tree
Showing 13 changed files with 366 additions and 11 deletions.
7 changes: 7 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ sasl_username | * | |
sasl_password | * | | | SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism
sasl_oauthbearer_config | * | | | SASL/OAUTHBEARER configuration. The format is implementation-dependent and must be parsed accordingly. The default unsecured token implementation (see https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes space-separated name=value pairs with valid names including principalClaimName, principal, scopeClaimName, scope, and lifeSeconds. The default value for principalClaimName is "sub", the default value for scopeClaimName is "scope", and the default value for lifeSeconds is 3600. The scope value is CSV format with the default value being no/empty scope. For example: `principalClaimName=azp principal=admin scopeClaimName=roles scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions can be communicated to the broker via `extension_NAME=value`. For example: `principal=admin extension_traceId=123`.
enable_sasl_oauthbearer_unsecure_jwt | * | true, false | false | Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set. This builtin handler should only be used for development or testing, and not in production.
oauthbearer_token_refresh_callback | * | module or fun/2 | undefined | A callback to implement SASL/OAUTHBEARER token refresh.
sasl_oauthbearer_method | * | default, oidc | default | Set to "default" or "oidc" to control which login method to be used. If set to "oidc", the following properties must also be be specified: `sasl_oauthbearer_client_id`, `sasl_oauthbearer_client_secret`, and `sasl_oauthbearer_token_endpoint_url`.
sasl_oauthbearer_client_id | * | | | Public identifier for the application. Must be unique across all clients that the authorization server handles. Only used when `sasl_oauthbearer_method` is set to "oidc".
sasl_oauthbearer_client_secret | * | | | Client secret only known to the application and the authorization server. This should be a sufficiently random string that is not guessable. Only used when `sasl_oauthbearer_method` is set to "oidc".
sasl_oauthbearer_scope | * | | | Client use this to specify the scope of the access request to the broker. Only used when `sasl_oauthbearer_method` is set to "oidc".
sasl.oauthbearer.extensions | * | | | Allow additional information to be provided to the broker. Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea".Only used when `sasl_oauthbearer_method` is set to "oidc".
sasl_oauthbearer_token_endpoint_url | * | | | OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. Only used when `sasl_oauthbearer_method` is set to "oidc".
plugin_library_paths | * | | undefined| Path where `librdkafka` plugins are located
group_instance_id | C | | | Enable static group membership. Static group members are able to leave and rejoin a group within the configured `session.timeout.ms` without prompting a group rebalance. This should be used in combination with a larger `session.timeout.ms` to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.
partition_assignment_strategy | C | | range, roundrobin | Name of partition assignment strategy to use when elected group leader assigns partitions to group members
Expand Down
127 changes: 127 additions & 0 deletions c_src/erlkaf_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <future>
#include <string.h>
#include <unistd.h>
#include <sstream>
#include <iostream>

namespace {

Expand Down Expand Up @@ -165,6 +167,26 @@ int stats_callback(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
return 0;
}

void oauthbearer_token_refresh_callback(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque)
{
UNUSED(rk);

enif_consumer* consumer = static_cast<enif_consumer*>(opaque);
ErlNifEnv* env = enif_alloc_env();

if (oauthbearer_config == NULL)
{
enif_send(NULL, &consumer->owner, env, enif_make_tuple2(env, ATOMS.atomOauthbearerTokenRefresh, ATOMS.atomUndefined));
}
else
{
ERL_NIF_TERM config = make_binary(env, oauthbearer_config, strlen(oauthbearer_config));
enif_send(NULL, &consumer->owner, env, enif_make_tuple2(env, ATOMS.atomOauthbearerTokenRefresh, config));
}

enif_free_env(env);
}

rd_kafka_topic_partition_list_t* topic_subscribe(ErlNifEnv* env, ERL_NIF_TERM list)
{
uint32_t length;
Expand Down Expand Up @@ -270,6 +292,8 @@ ERL_NIF_TERM enif_consumer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
rd_kafka_conf_set_log_cb(client_conf.get(), logger_callback);
rd_kafka_conf_set_rebalance_cb(client_conf.get(), rebalance_cb);
rd_kafka_conf_set_stats_cb(client_conf.get(), stats_callback);
rd_kafka_conf_set_oauthbearer_token_refresh_cb(client_conf.get(), oauthbearer_token_refresh_callback);
rd_kafka_conf_enable_sasl_queue(client_conf.get(), 1);

scoped_ptr(rk, rd_kafka_t, rd_kafka_new(RD_KAFKA_CONSUMER, client_conf.get(), errstr, sizeof(errstr)), rd_kafka_destroy);

Expand Down Expand Up @@ -299,6 +323,8 @@ ERL_NIF_TERM enif_consumer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv

data->notifier_->watch(consumer->kf, true);

rd_kafka_sasl_background_callbacks_enable(consumer->kf);

ERL_NIF_TERM term = enif_make_resource(env, consumer.get());
return enif_make_tuple2(env, ATOMS.atomOk, term);
}
Expand Down Expand Up @@ -441,3 +467,104 @@ ERL_NIF_TERM enif_consumer_cleanup(ErlNifEnv* env, int argc, const ERL_NIF_TERM
return ATOMS.atomOk;
}

char** split_consumer_extensions(const std::string extensions_str, size_t* length)
{
std::stringstream extensions_stream(extensions_str);
std::string extension_tmp;
std::string kv_tmp;
std::vector<std::string> extensions_vector;

while (getline(extensions_stream, extension_tmp, ','))
{
std::stringstream kv_stream(extension_tmp);
while (getline(kv_stream, kv_tmp, '='))
extensions_vector.push_back(kv_tmp);
}

*length = extensions_vector.size();
char ** extensions = new char*[*length];

for(size_t i = 0; i < *length; ++i)
{
extensions[i] = new char[extensions_vector[i].size() + 1];
strcpy(extensions[i], extensions_vector[i].c_str());
}

return extensions;
}

void free_consumer_extensions(char** extensions, size_t length)
{
if (extensions != nullptr)
{
for (size_t i = 0; i < length; ++i)
if (extensions[i] != nullptr)
delete[] extensions[i];

delete[] extensions;
}
}

ERL_NIF_TERM enif_consumer_oauthbearer_set_token(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
UNUSED(argc);

std::string token;
long lifetime;
std::string principal;
std::string extensions_str;

erlkaf_data* data = static_cast<erlkaf_data*>(enif_priv_data(env));
enif_consumer* consumer;

if(!enif_get_resource(env, argv[0], data->res_consumer, reinterpret_cast<void**>(&consumer)))
return make_badarg(env);

if(!get_string(env, argv[1], &token))
return make_badarg(env);

if(!enif_get_long(env, argv[2], &lifetime))
return make_badarg(env);

if(!get_string(env, argv[3], &principal))
return make_badarg(env);

if(!get_string(env, argv[4], &extensions_str))
return make_badarg(env);

char set_token_errstr[512];
size_t extension_key_value_cnt = 0;
char **extension_key_value = NULL;

if (extensions_str != "")
extension_key_value = split_consumer_extensions(extensions_str, &extension_key_value_cnt);

if (rd_kafka_oauthbearer_set_token(consumer->kf, token.c_str(), lifetime * 1000, principal.c_str(),
(const char **)extension_key_value, extension_key_value_cnt,
set_token_errstr, sizeof(set_token_errstr)) != RD_KAFKA_RESP_ERR_NO_ERROR)
{
rd_kafka_oauthbearer_set_token_failure(consumer->kf, set_token_errstr);
free_consumer_extensions(extension_key_value, extension_key_value_cnt);
return ATOMS.atomError;
}
else
{
free_consumer_extensions(extension_key_value, extension_key_value_cnt);
return ATOMS.atomOk;
}
}

ERL_NIF_TERM enif_consumer_oauthbearer_set_token_failure(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
UNUSED(argc);

erlkaf_data* data = static_cast<erlkaf_data*>(enif_priv_data(env));
enif_consumer* consumer;

if(!enif_get_resource(env, argv[0], data->res_consumer, reinterpret_cast<void**>(&consumer)))
return make_badarg(env);

char set_token_errstr[512];
rd_kafka_oauthbearer_set_token_failure(consumer->kf, set_token_errstr);
return ATOMS.atomOk;
}
2 changes: 2 additions & 0 deletions c_src/erlkaf_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ ERL_NIF_TERM enif_consumer_queue_poll(ErlNifEnv* env, int argc, const ERL_NIF_TE
ERL_NIF_TERM enif_consumer_queue_cleanup(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_consumer_offset_store(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_consumer_cleanup(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_consumer_oauthbearer_set_token(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_consumer_oauthbearer_set_token_failure(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);

#endif // C_SRC_ERLKAF_CONSUMER_H_
8 changes: 7 additions & 1 deletion c_src/erlkaf_nif.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const char kAtomClientStopped[] = "client_stopped";
const char kAtomNotAvailable[] = "not_available";
const char kAtomCreateTime[] = "create_time";
const char kAtomLogAppendTime[] = "log_append_time";
const char kAtomOauthbearerTokenRefresh[] = "oauthbearer_token_refresh";

atoms ATOMS;

Expand Down Expand Up @@ -75,6 +76,7 @@ int on_nif_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOMS.atomNotAvailable = make_atom(env, kAtomNotAvailable);
ATOMS.atomCreateTime = make_atom(env, kAtomCreateTime);
ATOMS.atomLogAppendTime = make_atom(env, kAtomLogAppendTime);
ATOMS.atomOauthbearerTokenRefresh = make_atom(env, kAtomOauthbearerTokenRefresh);

erlkaf_data* data = static_cast<erlkaf_data*>(enif_alloc(sizeof(erlkaf_data)));
open_resources(env, data);
Expand Down Expand Up @@ -122,13 +124,17 @@ static ErlNifFunc nif_funcs[] =
{"producer_cleanup", 1, enif_producer_cleanup},
{"produce", 7, enif_produce},
{"get_metadata", 1, enif_get_metadata, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"producer_oauthbearer_set_token", 5, enif_producer_oauthbearer_set_token},
{"producer_oauthbearer_set_token_failure", 1, enif_producer_oauthbearer_set_token_failure},

{"consumer_new", 4, enif_consumer_new},
{"consumer_partition_revoke_completed", 1, enif_consumer_partition_revoke_completed},
{"consumer_queue_poll", 2, enif_consumer_queue_poll},
{"consumer_queue_cleanup", 1, enif_consumer_queue_cleanup},
{"consumer_offset_store", 4, enif_consumer_offset_store},
{"consumer_cleanup", 1, enif_consumer_cleanup}
{"consumer_cleanup", 1, enif_consumer_cleanup},
{"consumer_oauthbearer_set_token", 5, enif_consumer_oauthbearer_set_token},
{"consumer_oauthbearer_set_token_failure", 1, enif_consumer_oauthbearer_set_token_failure}
};

ERL_NIF_INIT(erlkaf_nif, nif_funcs, on_nif_load, NULL, on_nif_upgrade, on_nif_unload)
1 change: 1 addition & 0 deletions c_src/erlkaf_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct atoms
ERL_NIF_TERM atomNotAvailable;
ERL_NIF_TERM atomCreateTime;
ERL_NIF_TERM atomLogAppendTime;
ERL_NIF_TERM atomOauthbearerTokenRefresh;
};

struct erlkaf_data
Expand Down
129 changes: 129 additions & 0 deletions c_src/erlkaf_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include <memory>
#include <string>
#include <future>
#include <vector>
#include <sstream>
#include <iostream>

namespace {

Expand Down Expand Up @@ -65,6 +68,26 @@ int stats_callback(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
return 0;
}

void oauthbearer_token_refresh_callback(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque)
{
UNUSED(rk);

enif_producer* producer = static_cast<enif_producer*>(opaque);
ErlNifEnv* env = enif_alloc_env();

if (oauthbearer_config == NULL)
{
enif_send(NULL, &producer->owner_pid, env, enif_make_tuple2(env, ATOMS.atomOauthbearerTokenRefresh, ATOMS.atomUndefined));
}
else
{
ERL_NIF_TERM config = make_binary(env, oauthbearer_config, strlen(oauthbearer_config));
enif_send(NULL, &producer->owner_pid, env, enif_make_tuple2(env, ATOMS.atomOauthbearerTokenRefresh, config));
}

enif_free_env(env);
}

bool populate_headers(ErlNifEnv* env, ERL_NIF_TERM headers_term, rd_kafka_headers_t* out)
{
ERL_NIF_TERM head;
Expand Down Expand Up @@ -164,6 +187,8 @@ ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv

rd_kafka_conf_set_log_cb(config.get(), logger_callback);
rd_kafka_conf_set_stats_cb(config.get(), stats_callback);
rd_kafka_conf_set_oauthbearer_token_refresh_cb(config.get(), oauthbearer_token_refresh_callback);
rd_kafka_conf_enable_sasl_queue(config.get(), 1);

if(has_dr_callback)
rd_kafka_conf_set_dr_msg_cb(config.get(), delivery_report_callback);
Expand Down Expand Up @@ -214,6 +239,8 @@ ERL_NIF_TERM enif_producer_set_owner(ErlNifEnv* env, int argc, const ERL_NIF_TER
if(!enif_get_local_pid(env, argv[1], &producer->owner_pid))
return make_badarg(env);

rd_kafka_sasl_background_callbacks_enable(producer->kf);

return ATOMS.atomOk;
}

Expand Down Expand Up @@ -417,3 +444,105 @@ ERL_NIF_TERM enif_produce(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])

return ATOMS.atomOk;
}

char** split_producer_extensions(const std::string extensions_str, size_t* length)
{
std::stringstream extensions_stream(extensions_str);
std::string extension_tmp;
std::string kv_tmp;
std::vector<std::string> extensions_vector;

while (getline(extensions_stream, extension_tmp, ','))
{
std::stringstream kv_stream(extension_tmp);
while (getline(kv_stream, kv_tmp, '='))
extensions_vector.push_back(kv_tmp);
}

*length = extensions_vector.size();
char ** extensions = new char*[*length];

for(size_t i = 0; i < *length; ++i)
{
extensions[i] = new char[extensions_vector[i].size() + 1];
strcpy(extensions[i], extensions_vector[i].c_str());
}

return extensions;
}

void free_producer_extensions(char** extensions, size_t length)
{
if (extensions != nullptr)
{
for (size_t i = 0; i < length; ++i)
if (extensions[i] != nullptr)
delete[] extensions[i];

delete[] extensions;
}
}

ERL_NIF_TERM enif_producer_oauthbearer_set_token(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
UNUSED(argc);

std::string token;
long lifetime;
std::string principal;
std::string extensions_str;

erlkaf_data* data = static_cast<erlkaf_data*>(enif_priv_data(env));
enif_producer* producer;

if(!enif_get_resource(env, argv[0], data->res_producer, reinterpret_cast<void**>(&producer)))
return make_badarg(env);

if(!get_string(env, argv[1], &token))
return make_badarg(env);

if(!enif_get_long(env, argv[2], &lifetime))
return make_badarg(env);

if(!get_string(env, argv[3], &principal))
return make_badarg(env);

if(!get_string(env, argv[4], &extensions_str))
return make_badarg(env);

char set_token_errstr[512];
size_t extension_key_value_cnt = 0;
char **extension_key_value = NULL;

if (extensions_str != "")
extension_key_value = split_producer_extensions(extensions_str, &extension_key_value_cnt);

if (rd_kafka_oauthbearer_set_token(producer->kf, token.c_str(), lifetime * 1000, principal.c_str(),
(const char **)extension_key_value, extension_key_value_cnt,
set_token_errstr, sizeof(set_token_errstr)) != RD_KAFKA_RESP_ERR_NO_ERROR)
{
rd_kafka_oauthbearer_set_token_failure(producer->kf, set_token_errstr);
free_producer_extensions(extension_key_value, extension_key_value_cnt);
return ATOMS.atomError;
}
else
{
free_producer_extensions(extension_key_value, extension_key_value_cnt);
return ATOMS.atomOk;
}
}

ERL_NIF_TERM enif_producer_oauthbearer_set_token_failure(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
UNUSED(argc);

erlkaf_data* data = static_cast<erlkaf_data*>(enif_priv_data(env));
enif_producer* producer;

if(!enif_get_resource(env, argv[0], data->res_producer, reinterpret_cast<void**>(&producer)))
return make_badarg(env);

char set_token_errstr[512];
rd_kafka_oauthbearer_set_token_failure(producer->kf, set_token_errstr);
return ATOMS.atomOk;
}
2 changes: 2 additions & 0 deletions c_src/erlkaf_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ ERL_NIF_TERM enif_producer_set_owner(ErlNifEnv* env, int argc, const ERL_NIF_TER
ERL_NIF_TERM enif_producer_cleanup(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_produce(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_get_metadata(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_producer_oauthbearer_set_token(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_producer_oauthbearer_set_token_failure(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);

#endif // C_SRC_ERLKAF_PRODUCER_H_
Loading

0 comments on commit e90a7cd

Please sign in to comment.