Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/out_azure_blob/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(src
azure_blob_appendblob.c
azure_blob_blockblob.c
azure_blob_store.c
azure_blob_msiauth.c
)

FLB_PLUGIN(out_azure_blob "${src}" "")
28 changes: 27 additions & 1 deletion plugins/out_azure_blob/azure_blob.c
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "auth_type", "key",
0, FLB_TRUE, offsetof(struct flb_azure_blob, auth_type),
"Set the auth type: key or sas"
"Set the auth type: key, sas, managed_identity, or workload_identity"
},

{
Expand All @@ -1835,6 +1835,32 @@ static struct flb_config_map config_map[] = {
"Azure Blob SAS token"
},

{
FLB_CONFIG_MAP_STR, "client_id", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, client_id),
"Azure client ID for managed identity or workload identity auth. "
"For system-assigned managed identity, set to 'system'"
},

{
FLB_CONFIG_MAP_STR, "tenant_id", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, tenant_id),
"Azure tenant ID (required for workload identity auth)"
},

{
FLB_CONFIG_MAP_STR, "client_secret", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, client_secret),
"Azure client secret (optional, for service principal auth)"
},

{
FLB_CONFIG_MAP_STR, "workload_identity_token_file", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, workload_identity_token_file),
"Path to the workload identity token file. "
"Default: /var/run/secrets/azure/tokens/azure-identity-token"
},

{
FLB_CONFIG_MAP_STR, "database_file", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, database_file),
Expand Down
21 changes: 19 additions & 2 deletions plugins/out_azure_blob/azure_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_oauth2.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_sqldb.h>

Expand All @@ -48,8 +49,15 @@
#define AZURE_BLOB_APPENDBLOB 0
#define AZURE_BLOB_BLOCKBLOB 1

#define AZURE_BLOB_AUTH_KEY 0
#define AZURE_BLOB_AUTH_SAS 1
#define AZURE_BLOB_AUTH_KEY 0
#define AZURE_BLOB_AUTH_SAS 1
#define AZURE_BLOB_AUTH_MI_SYSTEM 2
#define AZURE_BLOB_AUTH_MI_USER 3
#define AZURE_BLOB_AUTH_WI 4

/* MSAL authorization URL template */
#define FLB_AZURE_BLOB_MSAL_AUTH_URL_TEMPLATE \
"https://login.microsoftonline.com/%s/oauth2/v2.0/token"

struct flb_azure_blob {
int auto_create_container;
Expand All @@ -65,6 +73,10 @@ struct flb_azure_blob {
flb_sds_t date_key;
flb_sds_t auth_type;
flb_sds_t sas_token;
flb_sds_t client_id;
flb_sds_t tenant_id;
flb_sds_t client_secret;
flb_sds_t workload_identity_token_file;
flb_sds_t database_file;
size_t part_size;
time_t upload_parts_timeout;
Expand Down Expand Up @@ -121,6 +133,11 @@ struct flb_azure_blob {
unsigned char *decoded_sk; /* decoded shared key */
size_t decoded_sk_size; /* size of decoded shared key */

/* OAuth2 (managed identity / workload identity) */
flb_sds_t oauth_url;
struct flb_oauth2 *o;
pthread_mutex_t token_mutex;

#ifdef FLB_HAVE_SQLDB
/*
* SQLite by default is not built with multi-threading enabled, and
Expand Down
164 changes: 130 additions & 34 deletions plugins/out_azure_blob/azure_blob_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "azure_blob.h"
#include "azure_blob_conf.h"
#include "azure_blob_db.h"
#include "azure_blob_msiauth.h"

#include <sys/types.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -562,32 +563,15 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
/* Load config map */
ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
flb_free(ctx);

return NULL;
goto error;
}

if (ctx->account_name == NULL) {
flb_plg_error(ctx->ins, "'account_name' has not been set");
return NULL;
}

if (ctx->configuration_endpoint_url != NULL) {
ret = flb_azure_blob_apply_remote_configuration(ctx);

if (ret != 0) {
flb_free(ctx);

return NULL;
}
}

if (!ctx->container_name) {
flb_plg_error(ctx->ins, "'container_name' has not been set");
return NULL;
goto error;
}

/* Set Auth type */
/* Set Auth type (must happen before remote config, which branches on atype) */
tmp = (char *) flb_output_get_property("auth_type", ins);
if (!tmp) {
ctx->atype = AZURE_BLOB_AUTH_KEY;
Expand All @@ -599,21 +583,58 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
else if (strcasecmp(tmp, "sas") == 0) {
ctx->atype = AZURE_BLOB_AUTH_SAS;
}
else if (strcasecmp(tmp, "managed_identity") == 0) {
if (!ctx->client_id) {
flb_plg_error(ctx->ins,
"managed_identity auth requires 'client_id' "
"(set to 'system' for system-assigned)");
goto error;
}
if (strcasecmp(ctx->client_id, "system") == 0) {
ctx->atype = AZURE_BLOB_AUTH_MI_SYSTEM;
}
else {
ctx->atype = AZURE_BLOB_AUTH_MI_USER;
}
}
else if (strcasecmp(tmp, "workload_identity") == 0) {
ctx->atype = AZURE_BLOB_AUTH_WI;
if (!ctx->tenant_id || !ctx->client_id) {
flb_plg_error(ctx->ins,
"workload_identity auth requires "
"'tenant_id' and 'client_id'");
goto error;
}
}
else {
flb_plg_error(ctx->ins, "invalid auth_type value '%s'", tmp);
return NULL;
goto error;
}
}

if (ctx->configuration_endpoint_url != NULL) {
ret = flb_azure_blob_apply_remote_configuration(ctx);

if (ret != 0) {
goto error;
}
}

if (!ctx->container_name) {
flb_plg_error(ctx->ins, "'container_name' has not been set");
goto error;
}

if (ctx->atype == AZURE_BLOB_AUTH_KEY &&
ctx->shared_key == NULL) {
flb_plg_error(ctx->ins, "'shared_key' has not been set");
return NULL;
goto error;
}

if (ctx->atype == AZURE_BLOB_AUTH_SAS) {
if (ctx->sas_token == NULL) {
flb_plg_error(ctx->ins, "'sas_token' has not been set");
return NULL;
goto error;
}
if (ctx->sas_token[0] == '?') {
ctx->sas_token++;
Expand All @@ -625,7 +646,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
ctx->shared_key != NULL) {
ret = set_shared_key(ctx);
if (ret == -1) {
return NULL;
goto error;
}
}

Expand All @@ -643,7 +664,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
}
else {
flb_plg_error(ctx->ins, "invalid blob_type value '%s'", tmp);
return NULL;
goto error;
}
}

Expand All @@ -652,7 +673,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
flb_plg_error(ctx->ins,
"buffering is not supported with 'appendblob' blob_type. "
"Please use 'blockblob' blob_type or disable buffering.");
return NULL;
goto error;
}

/* Compress (gzip) */
Expand All @@ -669,7 +690,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
flb_plg_error(ctx->ins,
"the option 'compress_blob' is not compatible with 'appendblob' "
"blob_type");
return NULL;
goto error;
}

/*
Expand All @@ -690,15 +711,15 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
io_flags, ins->tls);
if (!ctx->u) {
flb_plg_error(ctx->ins, "invalid endpoint '%s'", ctx->endpoint);
return NULL;
goto error;
}
ctx->real_endpoint = flb_sds_create(ctx->endpoint);
}
else {
ctx->real_endpoint = flb_sds_create_size(256);
if (!ctx->real_endpoint) {
flb_plg_error(ctx->ins, "cannot create endpoint");
return NULL;
goto error;
}
flb_sds_printf(&ctx->real_endpoint, "%s%s",
ctx->account_name,
Expand All @@ -725,7 +746,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
if (!ctx->u) {
flb_plg_error(ctx->ins, "cannot create upstream for endpoint '%s'",
ctx->real_endpoint);
return NULL;
goto error;
}
}
flb_output_upstream_set(ctx->u, ins);
Expand All @@ -735,7 +756,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
if (!ctx->base_uri) {
flb_plg_error(ctx->ins, "cannot create base_uri for endpoint '%s'",
ctx->real_endpoint);
return NULL;
goto error;
}

if (ctx->emulator_mode == FLB_TRUE) {
Expand All @@ -750,11 +771,64 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
ctx->shared_key_prefix = flb_sds_create_size(256);
if (!ctx->shared_key_prefix) {
flb_plg_error(ctx->ins, "cannot create shared key prefix");
return NULL;
goto error;
}
flb_sds_printf(&ctx->shared_key_prefix, "SharedKey %s:", ctx->account_name);
}

/* Create OAuth2 context for managed identity / workload identity */
if (ctx->atype == AZURE_BLOB_AUTH_MI_SYSTEM ||
ctx->atype == AZURE_BLOB_AUTH_MI_USER) {
/* Construct IMDS URL */
if (ctx->atype == AZURE_BLOB_AUTH_MI_SYSTEM) {
ctx->oauth_url = flb_sds_create_size(
sizeof(FLB_AZURE_BLOB_MSIAUTH_URL_TEMPLATE) + 1);
if (!ctx->oauth_url) {
goto error;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_BLOB_MSIAUTH_URL_TEMPLATE, "", "");
}
else {
ctx->oauth_url = flb_sds_create_size(
sizeof(FLB_AZURE_BLOB_MSIAUTH_URL_TEMPLATE) +
sizeof("&client_id=") + flb_sds_len(ctx->client_id));
if (!ctx->oauth_url) {
goto error;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_BLOB_MSIAUTH_URL_TEMPLATE,
"&client_id=", ctx->client_id);
}

ctx->o = flb_oauth2_create(config, ctx->oauth_url, 3000);
if (!ctx->o) {
flb_plg_error(ctx->ins, "cannot create OAuth2 context for IMDS");
goto error;
}
flb_stream_disable_async_mode(&ctx->o->u->base);
pthread_mutex_init(&ctx->token_mutex, NULL);
}
else if (ctx->atype == AZURE_BLOB_AUTH_WI) {
/* Construct Azure AD token endpoint URL */
ctx->oauth_url = flb_sds_create_size(
sizeof(FLB_AZURE_BLOB_MSAL_AUTH_URL_TEMPLATE) +
flb_sds_len(ctx->tenant_id));
if (!ctx->oauth_url) {
goto error;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_BLOB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id);

ctx->o = flb_oauth2_create(config, ctx->oauth_url, 3000);
if (!ctx->o) {
flb_plg_error(ctx->ins, "cannot create OAuth2 context for workload identity");
goto error;
}
flb_stream_disable_async_mode(&ctx->o->u->base);
pthread_mutex_init(&ctx->token_mutex, NULL);
}

/* Sanitize path: remove any ending slash */
if (ctx->path) {
if (ctx->path[flb_sds_len(ctx->path) - 1] == '/') {
Expand All @@ -766,7 +840,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
if (ctx->database_file) {
ctx->db = azb_db_open(ctx, ctx->database_file);
if (!ctx->db) {
return NULL;
goto error;
}
}

Expand All @@ -778,8 +852,16 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
ctx->btype == AZURE_BLOB_APPENDBLOB ? "appendblob" : "blockblob",
ctx->emulator_mode ? "yes" : "no",
ctx->real_endpoint ? ctx->real_endpoint : "no",
ctx->atype == AZURE_BLOB_AUTH_KEY ? "key" : "sas");
ctx->atype == AZURE_BLOB_AUTH_KEY ? "key" :
ctx->atype == AZURE_BLOB_AUTH_SAS ? "sas" :
ctx->atype == AZURE_BLOB_AUTH_MI_SYSTEM ? "managed_identity (system)" :
ctx->atype == AZURE_BLOB_AUTH_MI_USER ? "managed_identity (user)" :
"workload_identity");
return ctx;

error:
flb_azure_blob_conf_destroy(ctx);
return NULL;
}

void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx)
Expand Down Expand Up @@ -822,6 +904,20 @@ void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx)
flb_sds_destroy(ctx->shared_key_prefix);
}

if (ctx->oauth_url) {
flb_sds_destroy(ctx->oauth_url);
}

if (ctx->o) {
flb_oauth2_destroy(ctx->o);
}

if (ctx->atype == AZURE_BLOB_AUTH_MI_SYSTEM ||
ctx->atype == AZURE_BLOB_AUTH_MI_USER ||
ctx->atype == AZURE_BLOB_AUTH_WI) {
pthread_mutex_destroy(&ctx->token_mutex);
}

if (ctx->u) {
flb_upstream_destroy(ctx->u);
}
Expand Down
Loading