Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx)
ret = flb_azure_workload_identity_token_get(ctx->o,
ctx->workload_identity_token_file,
ctx->client_id,
ctx->tenant_id);
ctx->tenant_id,
ctx->kusto_scope);
if (ret == -1) {
flb_plg_error(ctx->ins, "error retrieving workload identity token");
return -1;
Expand All @@ -82,7 +83,9 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
return -1;
}

ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);
ret = flb_oauth2_payload_append(ctx->o, "scope", 5,
ctx->kusto_scope,
flb_sds_len(ctx->kusto_scope));
if (ret == -1) {
flb_plg_error(ctx->ins, "error appending oauth2 params");
return -1;
Expand Down Expand Up @@ -1532,6 +1535,27 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, auth_type_str),
"Set the authentication type: 'service_principal', 'managed_identity', or 'workload_identity'. "
"For managed_identity, use 'system' as client_id for system-assigned identity, or specify the managed identity's client ID"},
{FLB_CONFIG_MAP_STR, "cloud_name", "AzureCloud", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, cloud_name),
"Set the Azure cloud environment. Supported values: "
"'AzureCloud' (default), 'AzureChinaCloud', 'AzureUSGovernmentCloud'. "
"For private clouds (USSEC, USNAT, BLEU, etc.), set "
"cloud_login_host, cloud_kusto_scope, and cloud_kusto_resource instead"},
{FLB_CONFIG_MAP_STR, "cloud_login_host", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, custom_login_host),
"Custom OAuth login host for private/sovereign clouds "
"(e.g. login.microsoftonline.eaglex.ic.gov). When set, cloud_kusto_scope "
"and cloud_kusto_resource must also be provided"},
{FLB_CONFIG_MAP_STR, "cloud_kusto_scope", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, custom_kusto_scope),
"Custom Kusto OAuth scope for private/sovereign clouds "
"(e.g. https://help.kusto.core.eaglex.ic.gov/.default). When set, "
"cloud_login_host and cloud_kusto_resource must also be provided"},
{FLB_CONFIG_MAP_STR, "cloud_kusto_resource", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, custom_kusto_resource),
"Custom Kusto IMDS resource URL for private/sovereign clouds "
"(e.g. https://api.kusto.core.eaglex.ic.gov/). When set, cloud_login_host "
"and cloud_kusto_scope must also be provided"},
{FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint),
"Set the Kusto cluster's ingestion endpoint URL (e.g. "
Expand Down
40 changes: 35 additions & 5 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,31 @@ typedef enum {
FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY /* Workload Identity */
} flb_azure_kusto_auth_type;

/* Kusto streaming inserts oauth scope */
#define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default"
/* Azure cloud environment types */
typedef enum {
FLB_AZURE_CLOUD_PUBLIC = 0, /* AzureCloud (default) */
FLB_AZURE_CLOUD_CHINA, /* AzureChinaCloud */
FLB_AZURE_CLOUD_US_GOVERNMENT /* AzureUSGovernmentCloud */
} flb_azure_cloud_type;

/* MSAL authorization URL */
/* MSAL authorization URL template: %s = login host, %s = tenant_id */
#define FLB_MSAL_AUTH_URL_TEMPLATE \
"https://login.microsoftonline.com/%s/oauth2/v2.0/token"
"https://%s/%s/oauth2/v2.0/token"

/* Cloud-specific login hosts */
#define FLB_AZURE_LOGIN_HOST_PUBLIC "login.microsoftonline.com"
#define FLB_AZURE_LOGIN_HOST_CHINA "login.chinacloudapi.cn"
#define FLB_AZURE_LOGIN_HOST_US_GOVERNMENT "login.microsoftonline.us"

/* Cloud-specific Kusto scopes */
#define FLB_AZURE_KUSTO_SCOPE_PUBLIC "https://help.kusto.windows.net/.default"
#define FLB_AZURE_KUSTO_SCOPE_CHINA "https://help.kusto.chinacloudapi.cn/.default"
#define FLB_AZURE_KUSTO_SCOPE_US_GOVERNMENT "https://help.kusto.usgovcloudapi.net/.default"

/* Cloud-specific Kusto IMDS resources */
#define FLB_AZURE_KUSTO_RESOURCE_PUBLIC "https://api.kusto.windows.net/"
#define FLB_AZURE_KUSTO_RESOURCE_CHINA "https://api.kusto.chinacloudapi.cn/"
#define FLB_AZURE_KUSTO_RESOURCE_US_GOVERNMENT "https://api.kusto.usgovcloudapi.net/"

#define FLB_AZURE_KUSTO_MGMT_URI_PATH "/v1/rest/mgmt"
#define FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE "{\"csl\":\"%s\", \"db\": \"NetDefaultDB\"}"
Expand All @@ -74,7 +93,6 @@ typedef enum {

#define FLB_AZURE_IMDS_ENDPOINT "/metadata/identity/oauth2/token"
#define FLB_AZURE_IMDS_API_VERSION "2018-02-01"
#define FLB_AZURE_IMDS_RESOURCE "https://api.kusto.windows.net/"


struct flb_azure_kusto_resources {
Expand Down Expand Up @@ -105,6 +123,18 @@ struct flb_azure_kusto {
char *auth_type_str;
char *workload_identity_token_file;

/* Cloud environment */
int cloud_type;
char *cloud_name;
flb_sds_t kusto_scope;
flb_sds_t kusto_resource;
flb_sds_t login_host;

/* Custom cloud overrides (for private/sovereign clouds like USSEC, USNAT, BLEU) */
flb_sds_t custom_login_host;
flb_sds_t custom_kusto_scope;
flb_sds_t custom_kusto_resource;

/* compress payload */
int compression_enabled;

Expand Down
148 changes: 143 additions & 5 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,115 @@ static int flb_azure_kusto_resources_destroy(struct flb_azure_kusto_resources *r
return 0;
}

/**
* Resolves cloud-specific endpoints based on the cloud_name configuration.
* Sets kusto_scope, kusto_resource, and login_host in the context.
*
* @param ctx Pointer to the plugin's context
* @return int 0 on success, -1 on failure
*/
static int azure_kusto_resolve_cloud_endpoints(struct flb_azure_kusto *ctx)
{
const char *scope = NULL;
const char *resource = NULL;
const char *login_host = NULL;
int has_custom_login_host;
int has_custom_scope;
int has_custom_resource;

/* Check if custom overrides are provided */
has_custom_login_host = (ctx->custom_login_host && flb_sds_len(ctx->custom_login_host) > 0);
has_custom_scope = (ctx->custom_kusto_scope && flb_sds_len(ctx->custom_kusto_scope) > 0);
has_custom_resource = (ctx->custom_kusto_resource && flb_sds_len(ctx->custom_kusto_resource) > 0);

/* If all three custom properties are provided, use them directly */
if (has_custom_login_host && has_custom_scope && has_custom_resource) {
ctx->login_host = flb_sds_create(ctx->custom_login_host);
if (!ctx->login_host) {
flb_errno();
return -1;
}

ctx->kusto_scope = flb_sds_create(ctx->custom_kusto_scope);
if (!ctx->kusto_scope) {
flb_errno();
return -1;
}

ctx->kusto_resource = flb_sds_create(ctx->custom_kusto_resource);
if (!ctx->kusto_resource) {
flb_errno();
return -1;
}

flb_plg_info(ctx->ins,
"using custom cloud endpoints: login_host='%s', scope='%s', resource='%s'",
ctx->login_host, ctx->kusto_scope, ctx->kusto_resource);
return 0;
}

/* If some but not all custom properties are set, error out */
if (has_custom_login_host || has_custom_scope || has_custom_resource) {
flb_plg_error(ctx->ins,
"When using custom cloud endpoints, all three properties must be set: "
"cloud_login_host, cloud_kusto_scope, cloud_kusto_resource");
return -1;
}

/* Resolve from well-known cloud names */
if (!ctx->cloud_name || strcasecmp(ctx->cloud_name, "AzureCloud") == 0) {
ctx->cloud_type = FLB_AZURE_CLOUD_PUBLIC;
scope = FLB_AZURE_KUSTO_SCOPE_PUBLIC;
resource = FLB_AZURE_KUSTO_RESOURCE_PUBLIC;
login_host = FLB_AZURE_LOGIN_HOST_PUBLIC;
}
else if (strcasecmp(ctx->cloud_name, "AzureChinaCloud") == 0) {
ctx->cloud_type = FLB_AZURE_CLOUD_CHINA;
scope = FLB_AZURE_KUSTO_SCOPE_CHINA;
resource = FLB_AZURE_KUSTO_RESOURCE_CHINA;
login_host = FLB_AZURE_LOGIN_HOST_CHINA;
}
else if (strcasecmp(ctx->cloud_name, "AzureUSGovernmentCloud") == 0) {
ctx->cloud_type = FLB_AZURE_CLOUD_US_GOVERNMENT;
scope = FLB_AZURE_KUSTO_SCOPE_US_GOVERNMENT;
resource = FLB_AZURE_KUSTO_RESOURCE_US_GOVERNMENT;
login_host = FLB_AZURE_LOGIN_HOST_US_GOVERNMENT;
}
else {
flb_plg_error(ctx->ins,
"Unknown cloud_name '%s'. Use a well-known cloud name "
"('AzureCloud', 'AzureChinaCloud', 'AzureUSGovernmentCloud') "
"or specify custom endpoints via "
"cloud_login_host, cloud_kusto_scope, and cloud_kusto_resource",
ctx->cloud_name);
return -1;
}

ctx->kusto_scope = flb_sds_create(scope);
if (!ctx->kusto_scope) {
flb_errno();
return -1;
}

ctx->kusto_resource = flb_sds_create(resource);
if (!ctx->kusto_resource) {
flb_errno();
return -1;
}

ctx->login_host = flb_sds_create(login_host);
if (!ctx->login_host) {
flb_errno();
return -1;
}

flb_plg_info(ctx->ins, "cloud environment='%s', login_host='%s', scope='%s'",
ctx->cloud_name ? ctx->cloud_name : "AzureCloud",
ctx->login_host, ctx->kusto_scope);

return 0;
}

struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *ins,
struct flb_config *config)
{
Expand All @@ -722,6 +831,14 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}

/* Resolve cloud-specific endpoints */
ret = azure_kusto_resolve_cloud_endpoints(ctx);
if (ret == -1) {
flb_plg_error(ins, "failed to resolve cloud endpoints");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* Auth method validation and setup */
if (strcasecmp(ctx->auth_type_str, "service_principal") == 0) {
ctx->auth_type = FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL;
Expand Down Expand Up @@ -802,38 +919,44 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
/* MSI auth */
/* Construct the URL template with or without client_id for managed identity */
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM) {
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1);
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 +
flb_sds_len(ctx->kusto_resource));
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", "");
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", "",
ctx->kusto_resource);
} else {
/* User-assigned managed identity */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 +
sizeof("&client_id=") - 1 +
flb_sds_len(ctx->client_id));
flb_sds_len(ctx->client_id) +
flb_sds_len(ctx->kusto_resource));
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->client_id);
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=",
ctx->client_id, ctx->kusto_resource);
}
} else {
/* Standard OAuth2 for service principal or workload identity */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 +
flb_sds_len(ctx->login_host) +
flb_sds_len(ctx->tenant_id));
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id);
FLB_MSAL_AUTH_URL_TEMPLATE, ctx->login_host,
ctx->tenant_id);
}

ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources));
Expand All @@ -857,6 +980,21 @@ int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx)

flb_plg_info(ctx->ins, "before exiting the plugin kusto conf destroy called");

if (ctx->kusto_scope) {
flb_sds_destroy(ctx->kusto_scope);
ctx->kusto_scope = NULL;
}

if (ctx->kusto_resource) {
flb_sds_destroy(ctx->kusto_resource);
ctx->kusto_resource = NULL;
}

if (ctx->login_host) {
flb_sds_destroy(ctx->login_host);
ctx->login_host = NULL;
}

if (ctx->oauth_url) {
flb_sds_destroy(ctx->oauth_url);
ctx->oauth_url = NULL;
Expand Down
Loading
Loading