diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 9c1c3c84402..1375ba2c463 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -50,6 +50,10 @@ pthread_key_t oauth2_type; pthread_key_t oauth2_token; pthread_key_t oauth2_token_expires; +static int oauth2_cache_initialized = FLB_FALSE; +static int oauth2_cache_users = 0; +static pthread_mutex_t oauth2_cache_lock = PTHREAD_MUTEX_INITIALIZER; + static void oauth2_cache_exit(void *ptr) { if (ptr) { @@ -64,12 +68,60 @@ static void oauth2_cache_free_expiration(void *ptr) } } -static void oauth2_cache_init() +static int oauth2_cache_init() { - /* oauth2 pthread key */ - pthread_key_create(&oauth2_type, oauth2_cache_exit); - pthread_key_create(&oauth2_token, oauth2_cache_exit); - pthread_key_create(&oauth2_token_expires, oauth2_cache_free_expiration); + int ret; + + ret = 0; + + pthread_mutex_lock(&oauth2_cache_lock); + + if (oauth2_cache_initialized == FLB_FALSE) { + /* oauth2 pthread key */ + ret = pthread_key_create(&oauth2_type, oauth2_cache_exit); + if (ret != 0) { + goto done; + } + ret = pthread_key_create(&oauth2_token, oauth2_cache_exit); + if (ret != 0) { + pthread_key_delete(oauth2_type); + goto done; + } + ret = pthread_key_create(&oauth2_token_expires, + oauth2_cache_free_expiration); + if (ret != 0) { + pthread_key_delete(oauth2_type); + pthread_key_delete(oauth2_token); + goto done; + } + oauth2_cache_initialized = FLB_TRUE; + } + + if (ret == 0) { + oauth2_cache_users++; + } + +done: + pthread_mutex_unlock(&oauth2_cache_lock); + return ret; +} + +static void oauth2_cache_cleanup(void) +{ + pthread_mutex_lock(&oauth2_cache_lock); + + if (oauth2_cache_users > 0) { + oauth2_cache_users--; + if (oauth2_cache_users == 0 && + oauth2_cache_initialized == FLB_TRUE) { + pthread_key_delete(oauth2_type); + pthread_key_delete(oauth2_token); + pthread_key_delete(oauth2_token_expires); + oauth2_cache_initialized = FLB_FALSE; + } + } + + pthread_mutex_unlock(&oauth2_cache_lock); } /* Set oauth2 type and token in pthread keys */ @@ -1221,7 +1273,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, /* Load config map */ ret = flb_output_config_map_set(ins, (void *) ctx); if (ret == -1) { - return -1; + goto error; } /* Set context */ @@ -1237,10 +1289,20 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, } /* Initialize oauth2 cache pthread keys */ - oauth2_cache_init(); + ret = oauth2_cache_init(); + if (ret != 0) { + flb_plg_error(ins, "failed to initialize oauth2 cache"); + goto error; + } + ctx->oauth2_cache_acquired = FLB_TRUE; /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */ - pthread_mutex_init(&ctx->token_mutex, NULL); + ret = pthread_mutex_init(&ctx->token_mutex, NULL); + if (ret != 0) { + flb_plg_error(ins, "failed to initialize token mutex"); + goto error; + } + ctx->token_mutex_initialized = FLB_TRUE; /* Create Upstream context for Stackdriver Logging (no oauth2 service) */ ctx->u = flb_upstream_create_url(config, ctx->cloud_logging_write_url, @@ -1253,15 +1315,15 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, if (!ctx->u) { flb_plg_error(ctx->ins, "upstream creation failed"); - return -1; + goto error; } if (!ctx->metadata_u) { flb_plg_error(ctx->ins, "metadata upstream creation failed"); - return -1; + goto error; } if (!ctx->o) { flb_plg_error(ctx->ins, "cannot create oauth2 context"); - return -1; + goto error; } flb_output_upstream_set(ctx->u, ins); @@ -1282,19 +1344,19 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, if (ctx->metadata_server_auth) { ret = gce_metadata_read_project_id(ctx); if (ret == -1) { - return -1; + goto error; } if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE && ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) { ret = gce_metadata_read_zone(ctx); if (ret == -1) { - return -1; + goto error; } ret = gce_metadata_read_instance_id(ctx); if (ret == -1) { - return -1; + goto error; } } } @@ -1302,7 +1364,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, /* Validate project_id */ if (!ctx->project_id) { flb_plg_error(ctx->ins, "property 'project_id' is not set"); - return -1; + goto error; } if (!ctx->export_to_project_id) { @@ -1312,10 +1374,22 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, ret = flb_stackdriver_regex_init(ctx); if (ret == -1) { flb_plg_error(ctx->ins, "failed to init stackdriver custom regex"); - return -1; + goto error; } return 0; + +error: + if (ctx->token_mutex_initialized == FLB_TRUE) { + pthread_mutex_destroy(&ctx->token_mutex); + ctx->token_mutex_initialized = FLB_FALSE; + } + if (ctx->oauth2_cache_acquired == FLB_TRUE) { + oauth2_cache_cleanup(); + ctx->oauth2_cache_acquired = FLB_FALSE; + } + flb_stackdriver_conf_destroy(ctx); + return -1; } static int validate_severity_level(severity_t * s, @@ -3087,6 +3161,10 @@ static int cb_stackdriver_exit(void *data, struct flb_config *config) return -1; } + if (ctx->oauth2_cache_acquired == FLB_TRUE) { + oauth2_cache_cleanup(); + ctx->oauth2_cache_acquired = FLB_FALSE; + } flb_stackdriver_conf_destroy(ctx); return 0; } diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index dd1536963b9..f54faa9d339 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -202,8 +202,12 @@ struct flb_stackdriver { /* environment variable settings */ struct flb_stackdriver_env *env; + /* oauth2 cache reference */ + int oauth2_cache_acquired; + /* mutex for acquiring oauth tokens */ pthread_mutex_t token_mutex; + int token_mutex_initialized; /* upstream context for stackdriver write end-point */ struct flb_upstream *u; diff --git a/plugins/out_stackdriver/stackdriver_conf.c b/plugins/out_stackdriver/stackdriver_conf.c index 635752d0c57..7c36b2e02b9 100644 --- a/plugins/out_stackdriver/stackdriver_conf.c +++ b/plugins/out_stackdriver/stackdriver_conf.c @@ -706,6 +706,9 @@ int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) flb_kv_release(&ctx->config_labels); flb_kv_release(&ctx->resource_labels_kvs); + if (ctx->token_mutex_initialized) { + pthread_mutex_destroy(&ctx->token_mutex); + } flb_free(ctx); return 0;