Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 94 additions & 16 deletions plugins/out_stackdriver/stackdriver.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ pthread_key_t oauth2_type;
pthread_key_t oauth2_token;
pthread_key_t oauth2_token_expires;

static int oauth2_cache_initialized = FLB_FALSE;
static int oauth2_cache_users = 0;
static pthread_mutex_t oauth2_cache_lock = PTHREAD_MUTEX_INITIALIZER;

static void oauth2_cache_exit(void *ptr)
{
if (ptr) {
Expand All @@ -64,12 +68,60 @@ static void oauth2_cache_free_expiration(void *ptr)
}
}

static void oauth2_cache_init()
static int oauth2_cache_init()
{
/* oauth2 pthread key */
pthread_key_create(&oauth2_type, oauth2_cache_exit);
pthread_key_create(&oauth2_token, oauth2_cache_exit);
pthread_key_create(&oauth2_token_expires, oauth2_cache_free_expiration);
int ret;

ret = 0;

pthread_mutex_lock(&oauth2_cache_lock);

if (oauth2_cache_initialized == FLB_FALSE) {
/* oauth2 pthread key */
ret = pthread_key_create(&oauth2_type, oauth2_cache_exit);
if (ret != 0) {
goto done;
}
ret = pthread_key_create(&oauth2_token, oauth2_cache_exit);
if (ret != 0) {
pthread_key_delete(oauth2_type);
goto done;
}
ret = pthread_key_create(&oauth2_token_expires,
oauth2_cache_free_expiration);
if (ret != 0) {
pthread_key_delete(oauth2_type);
pthread_key_delete(oauth2_token);
goto done;
}
oauth2_cache_initialized = FLB_TRUE;
}

if (ret == 0) {
oauth2_cache_users++;
}

done:
pthread_mutex_unlock(&oauth2_cache_lock);
return ret;
}

static void oauth2_cache_cleanup(void)
{
pthread_mutex_lock(&oauth2_cache_lock);

if (oauth2_cache_users > 0) {
oauth2_cache_users--;
if (oauth2_cache_users == 0 &&
oauth2_cache_initialized == FLB_TRUE) {
pthread_key_delete(oauth2_type);
pthread_key_delete(oauth2_token);
pthread_key_delete(oauth2_token_expires);
oauth2_cache_initialized = FLB_FALSE;
}
}

pthread_mutex_unlock(&oauth2_cache_lock);
}

/* Set oauth2 type and token in pthread keys */
Expand Down Expand Up @@ -1221,7 +1273,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,
/* Load config map */
ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
return -1;
goto error;
}

/* Set context */
Expand All @@ -1237,10 +1289,20 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,
}

/* Initialize oauth2 cache pthread keys */
oauth2_cache_init();
ret = oauth2_cache_init();
if (ret != 0) {
flb_plg_error(ins, "failed to initialize oauth2 cache");
goto error;
}
ctx->oauth2_cache_acquired = FLB_TRUE;

/* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */
pthread_mutex_init(&ctx->token_mutex, NULL);
ret = pthread_mutex_init(&ctx->token_mutex, NULL);
if (ret != 0) {
flb_plg_error(ins, "failed to initialize token mutex");
goto error;
}
ctx->token_mutex_initialized = FLB_TRUE;

/* Create Upstream context for Stackdriver Logging (no oauth2 service) */
ctx->u = flb_upstream_create_url(config, ctx->cloud_logging_write_url,
Expand All @@ -1253,15 +1315,15 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,

if (!ctx->u) {
flb_plg_error(ctx->ins, "upstream creation failed");
return -1;
goto error;
}
if (!ctx->metadata_u) {
flb_plg_error(ctx->ins, "metadata upstream creation failed");
return -1;
goto error;
}
if (!ctx->o) {
flb_plg_error(ctx->ins, "cannot create oauth2 context");
return -1;
goto error;
}
flb_output_upstream_set(ctx->u, ins);

Expand All @@ -1282,27 +1344,27 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,
if (ctx->metadata_server_auth) {
ret = gce_metadata_read_project_id(ctx);
if (ret == -1) {
return -1;
goto error;
}

if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
&& ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) {
ret = gce_metadata_read_zone(ctx);
if (ret == -1) {
return -1;
goto error;
}

ret = gce_metadata_read_instance_id(ctx);
if (ret == -1) {
return -1;
goto error;
}
}
}

/* Validate project_id */
if (!ctx->project_id) {
flb_plg_error(ctx->ins, "property 'project_id' is not set");
return -1;
goto error;
}

if (!ctx->export_to_project_id) {
Expand All @@ -1312,10 +1374,22 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,
ret = flb_stackdriver_regex_init(ctx);
if (ret == -1) {
flb_plg_error(ctx->ins, "failed to init stackdriver custom regex");
return -1;
goto error;
}

return 0;

error:
if (ctx->token_mutex_initialized == FLB_TRUE) {
pthread_mutex_destroy(&ctx->token_mutex);
ctx->token_mutex_initialized = FLB_FALSE;
}
if (ctx->oauth2_cache_acquired == FLB_TRUE) {
oauth2_cache_cleanup();
ctx->oauth2_cache_acquired = FLB_FALSE;
}
flb_stackdriver_conf_destroy(ctx);
return -1;
}

static int validate_severity_level(severity_t * s,
Expand Down Expand Up @@ -3087,6 +3161,10 @@ static int cb_stackdriver_exit(void *data, struct flb_config *config)
return -1;
}

if (ctx->oauth2_cache_acquired == FLB_TRUE) {
oauth2_cache_cleanup();
ctx->oauth2_cache_acquired = FLB_FALSE;
}
flb_stackdriver_conf_destroy(ctx);
return 0;
}
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_stackdriver/stackdriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,12 @@ struct flb_stackdriver {
/* environment variable settings */
struct flb_stackdriver_env *env;

/* oauth2 cache reference */
int oauth2_cache_acquired;

/* mutex for acquiring oauth tokens */
pthread_mutex_t token_mutex;
int token_mutex_initialized;

/* upstream context for stackdriver write end-point */
struct flb_upstream *u;
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_stackdriver/stackdriver_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,9 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx)

flb_kv_release(&ctx->config_labels);
flb_kv_release(&ctx->resource_labels_kvs);
if (ctx->token_mutex_initialized) {
pthread_mutex_destroy(&ctx->token_mutex);
}
flb_free(ctx);

return 0;
Expand Down
Loading