Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/plugins_options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ DEFINE_OPTION(FLB_IN_EBPF "Enable Linux eBPF input plugin"
# Processors
# ==========
DEFINE_OPTION(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_CUMULATIVE_TO_DELTA "Enable cumulative to delta metrics processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON)
Expand Down
2 changes: 1 addition & 1 deletion lib/cmetrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
# CMetrics Version
set(CMT_VERSION_MAJOR 2)
set(CMT_VERSION_MINOR 0)
set(CMT_VERSION_PATCH 2)
set(CMT_VERSION_PATCH 3)
set(CMT_VERSION_STR "${CMT_VERSION_MAJOR}.${CMT_VERSION_MINOR}.${CMT_VERSION_PATCH}")

# Include helpers
Expand Down
143 changes: 136 additions & 7 deletions lib/cmetrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,149 @@

> DISCLAIMER: THIS LIBRARY IS STILL IN ACTIVE DEVELOPMENT

The [CMetrics](https://github.com/calyptia/cmetrics) project is a standalone C library to create and maintain a context of different sets of metrics with labels support such as:
[CMetrics](https://github.com/calyptia/cmetrics) is a standalone C library to
create, mutate, aggregate, encode, and decode metrics contexts.

- Counters
- Gauges
- Histograms
- Summaries
## Supported Metric Types

This project is heavily based on Go Prometheus Client API design:
- Counter
- Gauge
- Untyped
- Histogram
- Exponential Histogram
- Summary

All metric points store a sample `timestamp` in nanoseconds.

## Datapoint Start Timestamp (OTLP)

CMetrics also supports an optional native `start_timestamp` per datapoint.
This is primarily relevant for OTLP cumulative streams.

API (`cmt_metric.h`):

- `cmt_metric_set_start_timestamp(...)`
- `cmt_metric_unset_start_timestamp(...)`
- `cmt_metric_has_start_timestamp(...)`
- `cmt_metric_get_start_timestamp(...)`

Backward compatibility: existing code that only uses `timestamp` is unchanged.

## Supported Encoders

- OpenTelemetry Metrics (OTLP protobuf)
- Prometheus text exposition
- Prometheus Remote Write
- Influx line protocol
- Splunk HEC
- CloudWatch EMF
- CMetrics msgpack (internal format)
- Text (human-readable)

## Supported Decoders

- OpenTelemetry Metrics (OTLP protobuf)
- Prometheus text exposition
- Prometheus Remote Write
- StatsD
- CMetrics msgpack (internal format)

## OTLP and `start_timestamp`

- OTLP decoder populates native `start_timestamp` from
`start_time_unix_nano`.
- OTLP encoder prefers native `start_timestamp` and falls back to OTLP metadata
when needed.
- Internal CMetrics msgpack supports optional `start_ts` to preserve this value
across internal encode/decode flows.

Non-OTLP formats (for example Prometheus text, Influx, Splunk HEC, and
CloudWatch EMF) do not define an OTLP-style start timestamp field, so they
serialize sample timestamps only.

## C Usage Example

```c
#include <stdint.h>
#include <stdio.h>

#include <cmetrics/cmetrics.h>
#include <cmetrics/cmt_counter.h>
#include <cmetrics/cmt_map.h>
#include <cmetrics/cmt_metric.h>
#include <cmetrics/cmt_encode_opentelemetry.h>

int main(void)
{
struct cmt *ctx;
struct cmt_counter *requests_total;
struct cmt_metric *sample;
cfl_sds_t otlp_payload;
uint64_t start_ns;
uint64_t sample_ns;

ctx = cmt_create();
if (ctx == NULL) {
return 1;
}

requests_total = cmt_counter_create(ctx,
"demo", /* namespace */
"service", /* subsystem */
"requests_total",
"Total requests",
0, /* label keys */
NULL);
if (requests_total == NULL) {
cmt_destroy(ctx);
return 1;
}

start_ns = 1700000000000000000ULL;
sample_ns = start_ns + 5000000000ULL;

/* Write sample value (cumulative stream example). */
if (cmt_counter_set(requests_total, sample_ns, 42.0, 0, NULL) != 0) {
cmt_destroy(ctx);
return 1;
}

/* Access the same datapoint and attach native start timestamp. */
sample = cmt_map_metric_get(&requests_total->opts,
requests_total->map,
0, NULL,
CMT_FALSE);
if (sample == NULL) {
cmt_destroy(ctx);
return 1;
}
cmt_metric_set_start_timestamp(sample, start_ns);

/* Encode OTLP metrics payload. */
otlp_payload = cmt_encode_opentelemetry_create(ctx);
if (otlp_payload == NULL) {
cmt_destroy(ctx);
return 1;
}

printf("Encoded OTLP payload size: %zu bytes\n", cfl_sds_len(otlp_payload));

cmt_encode_opentelemetry_destroy(otlp_payload);
cmt_destroy(ctx);
return 0;
}
```

## Design Reference

CMetrics is heavily inspired by the Go Prometheus Client API design:

- https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#section-documentation

## License

This program is under the terms of the [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0).
This program is under the terms of the
[Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0).

## Authors

Expand Down
34 changes: 32 additions & 2 deletions lib/cmetrics/include/cmetrics/cmt_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct cmt_metric {
uint64_t hist_sum;

/* exponential histogram */
int exp_hist_sum_set;
uint64_t exp_hist_sum_set;
int32_t exp_hist_scale;
uint64_t exp_hist_zero_count;
double exp_hist_zero_threshold;
Expand All @@ -56,7 +56,7 @@ struct cmt_metric {
uint64_t exp_hist_sum;

/* summary */
int sum_quantiles_set; /* specify if quantive values has been set */
uint64_t sum_quantiles_set; /* specify if quantive values has been set */
uint64_t *sum_quantiles; /* 0, 0.25, 0.5, 0.75 and 1 */
size_t sum_quantiles_count;
uint64_t sum_count;
Expand All @@ -65,10 +65,28 @@ struct cmt_metric {
/* internal */
uint64_t hash;
uint64_t timestamp;
uint64_t start_timestamp;
uint64_t start_timestamp_set;
uint64_t exp_hist_lock;
struct cfl_list labels;
struct cfl_list _head;
};

struct cmt_exp_histogram_snapshot {
int32_t scale;
uint64_t zero_count;
double zero_threshold;
int32_t positive_offset;
uint64_t *positive_buckets;
size_t positive_count;
int32_t negative_offset;
uint64_t *negative_buckets;
size_t negative_count;
uint64_t count;
uint64_t sum_set;
uint64_t sum;
};

void cmt_metric_set(struct cmt_metric *metric, uint64_t timestamp, double val);
void cmt_metric_set_double(struct cmt_metric *metric, uint64_t timestamp, double val);
void cmt_metric_set_int64(struct cmt_metric *metric, uint64_t timestamp, int64_t val);
Expand All @@ -86,6 +104,18 @@ void cmt_metric_get_value_snapshot(struct cmt_metric *metric,
int64_t *out_int64,
uint64_t *out_uint64);
uint64_t cmt_metric_get_timestamp(struct cmt_metric *metric);
void cmt_metric_set_timestamp(struct cmt_metric *metric, uint64_t timestamp);
void cmt_metric_set_start_timestamp(struct cmt_metric *metric, uint64_t start_timestamp);
void cmt_metric_unset_start_timestamp(struct cmt_metric *metric);
int cmt_metric_has_start_timestamp(struct cmt_metric *metric);
uint64_t cmt_metric_get_start_timestamp(struct cmt_metric *metric);
void cmt_metric_set_exp_hist_count(struct cmt_metric *metric, uint64_t count);
void cmt_metric_set_exp_hist_sum(struct cmt_metric *metric, int sum_set, double sum);
void cmt_metric_exp_hist_lock(struct cmt_metric *metric);
void cmt_metric_exp_hist_unlock(struct cmt_metric *metric);
int cmt_metric_exp_hist_get_snapshot(struct cmt_metric *metric,
struct cmt_exp_histogram_snapshot *snapshot);
void cmt_metric_exp_hist_snapshot_destroy(struct cmt_exp_histogram_snapshot *snapshot);

void cmt_metric_hist_inc(struct cmt_metric *metric, uint64_t timestamp,
int bucket_id);
Expand Down
4 changes: 4 additions & 0 deletions lib/cmetrics/include/cmetrics/cmt_summary.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ uint64_t cmt_summary_get_count_value(struct cmt_metric *metric);

void cmt_summary_quantile_set(struct cmt_metric *metric, uint64_t timestamp,
int quantile_id, double val);
void cmt_summary_sum_set(struct cmt_metric *metric, uint64_t timestamp,
double val);
void cmt_summary_count_set(struct cmt_metric *metric, uint64_t timestamp,
uint64_t count);


#endif
44 changes: 24 additions & 20 deletions lib/cmetrics/src/cmt_atomic_generic.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
* limitations under the License.
*/

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <cmetrics/cmt_atomic.h>

pthread_mutex_t atomic_operation_lock;
static int atomic_operation_system_initialized = 0;
static pthread_once_t atomic_operation_system_once = PTHREAD_ONCE_INIT;
static int atomic_operation_system_initialized = 0;
static int atomic_operation_system_status = 0;

/* TODO: Determne if we want to keep this backend as well as how / if we want to handle
* pthread_mutex_unlock errors (investigate and understand what could cause them),
Expand All @@ -32,18 +32,22 @@ static int atomic_operation_system_initialized = 0;
*
*/

inline int cmt_atomic_initialize()
static void cmt_atomic_bootstrap()
{
int result;
atomic_operation_system_status =
pthread_mutex_init(&atomic_operation_lock, NULL);

if (0 == atomic_operation_system_initialized) {
result = pthread_mutex_init(&atomic_operation_lock, NULL);
if (atomic_operation_system_status == 0) {
atomic_operation_system_initialized = 1;
}
}

if (0 != result) {
return 1;
}
inline int cmt_atomic_initialize()
{
pthread_once(&atomic_operation_system_once, cmt_atomic_bootstrap);

atomic_operation_system_initialized = 1;
if (atomic_operation_system_status != 0) {
return 1;
}

return 0;
Expand All @@ -54,9 +58,9 @@ inline int cmt_atomic_compare_exchange(uint64_t *storage,
{
int result;

if (0 == atomic_operation_system_initialized) {
printf("CMT ATOMIC : Atomic operation backend not initalized\n");
exit(1);
if (cmt_atomic_initialize() != 0 ||
atomic_operation_system_initialized == 0) {
return 0;
}

result = pthread_mutex_lock(&atomic_operation_lock);
Expand All @@ -83,9 +87,9 @@ inline void cmt_atomic_store(uint64_t *storage, uint64_t new_value)
{
int result;

if (0 == atomic_operation_system_initialized) {
printf("CMT ATOMIC : Atomic operation backend not initalized\n");
exit(1);
if (cmt_atomic_initialize() != 0 ||
atomic_operation_system_initialized == 0) {
return;
}

result = pthread_mutex_lock(&atomic_operation_lock);
Expand All @@ -104,9 +108,9 @@ inline uint64_t cmt_atomic_load(uint64_t *storage)
int result;
uint64_t retval;

if (0 == atomic_operation_system_initialized) {
printf("CMT ATOMIC : Atomic operation backend not initalized\n");
exit(1);
if (cmt_atomic_initialize() != 0 ||
atomic_operation_system_initialized == 0) {
return 0;
}

result = pthread_mutex_lock(&atomic_operation_lock);
Expand Down
Loading
Loading