Skip to content

Commit

Permalink
fix(codecs): native JSON serialization/deserialization for special f6…
Browse files Browse the repository at this point in the history
…4 values (vectordotdev#18650)

* fix: native JSON serialization/deserialization for histogram buckets

* regenerated fixtures

* ignore prev24 tests - will revisit

* remove debugging stuff

* update tests

* first attempt at a backwards compatible solution

* cleanup, remove unused functions, clippy shenanigans

* Update lib/vector-core/src/event/metric/value.rs

Co-authored-by: Bruce Guenter <bruce.guenter@datadoghq.com>

* implemented visitor pattern deserializer optimization

---------

Co-authored-by: Bruce Guenter <bruce.guenter@datadoghq.com>
  • Loading branch information
pront and bruceg authored Oct 4, 2023
1 parent 1452d54 commit 39b9298
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 4 deletions.
24 changes: 23 additions & 1 deletion lib/codecs/src/encoding/format/native_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ impl Encoder<Event> for NativeJsonSerializer {
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use vector_core::event::{LogEvent, Value};
use vector_core::buckets;
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, Value};
use vrl::btreemap;

use super::*;
Expand Down Expand Up @@ -84,4 +85,25 @@ mod tests {

assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
}

#[test]
fn serialize_aggregated_histogram() {
let histogram_event = Event::from(Metric::new(
"histogram",
MetricKind::Absolute,
MetricValue::AggregatedHistogram {
count: 1,
sum: 1.0,
buckets: buckets!(f64::NEG_INFINITY => 0 ,2.0 => 1, f64::INFINITY => 0),
},
));

let mut serializer = NativeJsonSerializer::new();
let mut bytes = BytesMut::new();
serializer
.encode(histogram_event.clone(), &mut bytes)
.unwrap();
let json = serializer.to_json_value(histogram_event).unwrap();
assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
}
}
86 changes: 86 additions & 0 deletions lib/codecs/tests/native_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use bytes::BytesMut;
use codecs::decoding::format::Deserializer;
use codecs::encoding::format::Serializer;
use codecs::{NativeJsonDeserializerConfig, NativeJsonSerializerConfig};
use vector_core::buckets;
use vector_core::config::LogNamespace;
use vector_core::event::{Event, Metric};
use vector_core::event::{MetricKind, MetricValue};

fn assert_roundtrip(
input_event: Event,
serializer: &mut dyn Serializer<Error = vector_common::Error>,
deserializer: &dyn Deserializer,
expected_json_value: serde_json::Value,
) {
let mut bytes_mut = BytesMut::new();
serializer
.encode(input_event.clone(), &mut bytes_mut)
.unwrap();
let bytes = bytes_mut.freeze();
let events = deserializer.parse(bytes, LogNamespace::Vector).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0], input_event);

let json_value = serde_json::to_value(input_event.as_metric()).unwrap();
assert_eq!(json_value, expected_json_value);
}

#[test]
fn histogram_metric_roundtrip() {
let histogram_event = Event::from(Metric::new(
"histogram",
MetricKind::Absolute,
MetricValue::AggregatedHistogram {
count: 1,
sum: 1.0,
buckets: buckets!(
f64::NEG_INFINITY => 10 ,
f64::MIN => 10, 1.5 => 10,
f64::MAX => 10,
f64::INFINITY => 10),
},
));

let expected_json_value = serde_json::from_str(
r#"
{
"aggregated_histogram": {
"buckets": [
{
"count": 10,
"upper_limit": "-inf"
},
{
"count": 10,
"upper_limit": -1.7976931348623157e308
},
{
"count": 10,
"upper_limit": 1.5
},
{
"count": 10,
"upper_limit": 1.7976931348623157e308
},
{
"count": 10,
"upper_limit": "inf"
}
],
"count": 1,
"sum": 1.0
},
"kind": "absolute",
"name": "histogram"
}"#,
)
.unwrap();

assert_roundtrip(
histogram_event,
&mut NativeJsonSerializerConfig.build(),
&NativeJsonDeserializerConfig::default().build(),
expected_json_value,
)
}
61 changes: 58 additions & 3 deletions lib/vector-core/src/event/metric/value.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use core::fmt;
use std::collections::BTreeSet;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

use vector_common::byte_size_of::ByteSizeOf;
use vector_config::configurable_component;

use super::{samples_to_buckets, write_list, write_word};
use crate::{float_eq, metrics::AgentDDSketch};

use super::{samples_to_buckets, write_list, write_word};

const INFINITY: &str = "inf";
const NEG_INFINITY: &str = "-inf";
const NAN: &str = "NaN";

/// Metric value.
#[configurable_component]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -597,14 +604,62 @@ impl ByteSizeOf for Sample {
}
}

/// Custom serialization function which converts special `f64` values to strings.
/// Non-special values are serialized as numbers.
#[allow(clippy::trivially_copy_pass_by_ref)]
fn serialize_f64<S>(value: &f64, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if value.is_infinite() {
serializer.serialize_str(if *value > 0.0 { INFINITY } else { NEG_INFINITY })
} else if value.is_nan() {
serializer.serialize_str(NAN)
} else {
serializer.serialize_f64(*value)
}
}

/// Custom deserialization function for handling special f64 values.
fn deserialize_f64<'de, D>(deserializer: D) -> Result<f64, D::Error>
where
D: Deserializer<'de>,
{
struct UpperLimitVisitor;

impl<'de> de::Visitor<'de> for UpperLimitVisitor {
type Value = f64;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a number or a special string value")
}

fn visit_f64<E: de::Error>(self, value: f64) -> Result<Self::Value, E> {
Ok(value)
}

fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
match value {
NAN => Ok(f64::NAN),
INFINITY => Ok(f64::INFINITY),
NEG_INFINITY => Ok(f64::NEG_INFINITY),
_ => Err(E::custom("unsupported string value")),
}
}
}

deserializer.deserialize_any(UpperLimitVisitor)
}

/// A histogram bucket.
///
/// Histogram buckets represent the `count` of observations where the value of the observations does
/// not exceed the specified `upper_limit`.
#[configurable_component]
#[derive(Clone, Copy, Debug)]
#[configurable_component(no_deser, no_ser)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct Bucket {
/// The upper limit of values in the bucket.
#[serde(serialize_with = "serialize_f64", deserialize_with = "deserialize_f64")]
pub upper_limit: f64,

/// The number of values tracked in this bucket.
Expand Down

0 comments on commit 39b9298

Please sign in to comment.