Skip to content

Commit

Permalink
Merge branch 'master' into sinks-sftp
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Jul 29, 2023
2 parents bd86fbf + a06c711 commit 23426d0
Show file tree
Hide file tree
Showing 113 changed files with 1,686 additions and 489 deletions.
27 changes: 15 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ tower-http = { version = "0.4.3", default-features = false, features = ["decompr
serde = { version = "1.0.175", default-features = false, features = ["derive"] }
serde-toml-merge = { version = "0.3.0", default-features = false }
serde_bytes = { version = "0.11.12", default-features = false, features = ["std"], optional = true }
serde_json = { version = "1.0.103", default-features = false, features = ["raw_value"] }
serde_json = { version = "1.0.104", default-features = false, features = ["raw_value"] }
serde_with = { version = "3.1.0", default-features = false, features = ["macros", "std"] }
serde_yaml = { version = "0.9.25", default-features = false }

Expand Down Expand Up @@ -305,7 +305,7 @@ rand_distr = { version = "0.4.3", default-features = false }
rdkafka = { version = "0.33.2", default-features = false, features = ["tokio", "libz", "ssl", "zstd"], optional = true }
redis = { version = "0.23.0", default-features = false, features = ["connection-manager", "tokio-comp", "tokio-native-tls-comp"], optional = true }
regex = { version = "1.9.1", default-features = false, features = ["std", "perf"] }
roaring = { version = "0.10.1", default-features = false, optional = true }
roaring = { version = "0.10.2", default-features = false, optional = true }
seahash = { version = "4.1.0", default-features = false }
semver = { version = "1.0.18", default-features = false, features = ["serde", "std"], optional = true }
smallvec = { version = "1", default-features = false, features = ["union", "serde"] }
Expand All @@ -321,7 +321,7 @@ tokio-tungstenite = {version = "0.20.0", default-features = false, features = ["
toml = { version = "0.7.6", default-features = false, features = ["parse", "display"] }
tonic = { version = "0.9", optional = true, default-features = false, features = ["transport", "codegen", "prost", "tls", "tls-roots", "gzip"] }
trust-dns-proto = { version = "0.22.0", default-features = false, features = ["dnssec"], optional = true }
typetag = { version = "0.2.11", default-features = false }
typetag = { version = "0.2.12", default-features = false }
url = { version = "2.4.0", default-features = false, features = ["serde"] }
uuid = { version = "1", default-features = false, features = ["serde", "v4"] }
warp = { version = "0.3.5", default-features = false }
Expand Down Expand Up @@ -376,7 +376,7 @@ zstd = { version = "0.12.4", default-features = false }
# https://github.com/chronotope/chrono/pull/578
chrono = { git = "https://github.com/vectordotdev/chrono.git", tag = "v0.4.26-no-default-time-1" }
# The upgrade for `tokio-util` >= 0.6.9 is blocked on https://github.com/vectordotdev/vector/issues/11257.
tokio-util = { git = "https://github.com/vectordotdev/tokio", branch = "tokio-util-0.7.4-framed-read-continue-on-error" }
tokio-util = { git = "https://github.com/vectordotdev/tokio", branch = "tokio-util-0.7.8-framed-read-continue-on-error" }
nix = { git = "https://github.com/vectordotdev/nix.git", branch = "memfd/gnu/musl" }
# The `heim` crates depend on `ntapi` 0.3.7 on Windows, but that version has an
# unaligned access bug fixed in the following revision.
Expand Down
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load('ext://helm_resource', 'helm_resource', 'helm_repo')
docker_build(
ref='timberio/vector',
context='.',
build_args={'RUST_VERSION': '1.70.0'},
build_args={'RUST_VERSION': '1.71.0'},
dockerfile='tilt/Dockerfile'
)

Expand Down
10 changes: 2 additions & 8 deletions benches/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) {
let index = Template::try_from("index-%Y.%m.%d").unwrap();
let mut event = Event::Log(LogEvent::from("hello world"));
event.as_mut_log().insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
log_schema().timestamp_key_target_path().unwrap(),
Utc::now(),
);

Expand All @@ -31,10 +28,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) {
let index = Template::try_from("index").unwrap();
let mut event = Event::Log(LogEvent::from("hello world"));
event.as_mut_log().insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
log_schema().timestamp_key_target_path().unwrap(),
Utc::now(),
);

Expand Down
1 change: 1 addition & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ memchr = { version = "2", default-features = false }
once_cell = { version = "1.18", default-features = false }
ordered-float = { version = "3.7.0", default-features = false }
prost = { version = "0.11.8", default-features = false, features = ["std"] }
prost-reflect = { version = "0.11", default-features = false, features = ["serde"] }
regex = { version = "1.9.1", default-features = false, features = ["std", "perf"] }
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false }
Expand Down
20 changes: 13 additions & 7 deletions lib/codecs/src/decoding/format/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use lookup::OwnedTargetPath;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use vector_core::config::LogNamespace;
use vector_core::schema::meaning;
use vector_core::{
config::{log_schema, DataType},
event::{Event, LogEvent},
schema,
};
use vrl::path::PathPrefix;
use vrl::value::Kind;

use super::Deserializer;
Expand Down Expand Up @@ -36,11 +36,17 @@ impl BytesDeserializerConfig {
/// The schema produced by the deserializer.
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Legacy => schema::Definition::empty_legacy_namespace().with_event_field(
log_schema().message_key().expect("valid message key"),
Kind::bytes(),
Some("message"),
),
LogNamespace::Legacy => {
let definition = schema::Definition::empty_legacy_namespace();
if let Some(message_key) = log_schema().message_key() {
return definition.with_event_field(
message_key,
Kind::bytes(),
Some(meaning::MESSAGE),
);
}
definition
}
LogNamespace::Vector => {
schema::Definition::new_with_default_metadata(Kind::bytes(), [log_namespace])
.with_meaning(OwnedTargetPath::event_root(), "message")
Expand All @@ -63,7 +69,7 @@ impl BytesDeserializer {
LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
LogNamespace::Legacy => {
let mut log = LogEvent::default();
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes);
log.maybe_insert(log_schema().message_key_target_path(), bytes);
log
}
}
Expand Down
15 changes: 6 additions & 9 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Utc};
use derivative::Derivative;
use lookup::{event_path, owned_value_path, PathPrefix};
use lookup::{event_path, owned_value_path};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::collections::HashMap;
Expand Down Expand Up @@ -130,20 +130,17 @@ impl GelfDeserializer {
log.insert(FULL_MESSAGE, full_message.to_string());
}

if let Some(timestamp_key) = log_schema().timestamp_key() {
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
if let Some(timestamp) = parsed.timestamp {
let naive = NaiveDateTime::from_timestamp_opt(
f64::trunc(timestamp) as i64,
f64::fract(timestamp) as u32,
)
.expect("invalid timestamp");
log.insert(
(PathPrefix::Event, timestamp_key),
DateTime::<Utc>::from_utc(naive, Utc),
);
log.insert(timestamp_key, DateTime::<Utc>::from_utc(naive, Utc));
// per GELF spec- add timestamp if not provided
} else {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
log.insert(timestamp_key, Utc::now());
}
}

Expand Down Expand Up @@ -293,7 +290,7 @@ mod tests {
Some(&Value::Bytes(Bytes::from_static(b"example.org")))
);
assert_eq!(
log.get((PathPrefix::Event, log_schema().message_key().unwrap())),
log.get(log_schema().message_key_target_path().unwrap()),
Some(&Value::Bytes(Bytes::from_static(
b"A short message that helps you identify what is going on"
)))
Expand Down Expand Up @@ -348,7 +345,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap())));
assert!(log.contains(log_schema().message_key_target_path().unwrap()));
}

// filter out id
Expand Down
9 changes: 4 additions & 5 deletions lib/codecs/src/decoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::convert::TryInto;
use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use lookup::PathPrefix;
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{
Expand Down Expand Up @@ -133,11 +132,11 @@ impl Deserializer for JsonDeserializer {
LogNamespace::Legacy => {
let timestamp = Utc::now();

if let Some(timestamp_key) = log_schema().timestamp_key() {
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
for event in &mut events {
let log = event.as_mut_log();
if !log.contains((PathPrefix::Event, timestamp_key)) {
log.insert((PathPrefix::Event, timestamp_key), timestamp);
if !log.contains(timestamp_key) {
log.insert(timestamp_key, timestamp);
}
}
}
Expand Down Expand Up @@ -218,7 +217,7 @@ mod tests {
let log = event.as_log();
assert_eq!(log["bar"], 456.into());
assert_eq!(
log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
log.get(log_schema().timestamp_key_target_path().unwrap())
.is_some(),
namespace == LogNamespace::Legacy
);
Expand Down
2 changes: 2 additions & 0 deletions lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod gelf;
mod json;
mod native;
mod native_json;
mod protobuf;
#[cfg(feature = "syslog")]
mod syslog;

Expand All @@ -19,6 +20,7 @@ pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{
NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions,
};
pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig};
use smallvec::SmallVec;
#[cfg(feature = "syslog")]
pub use syslog::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct NativeDeserializerConfig;
impl NativeDeserializerConfig {
/// Build the `NativeDeserializer` from this configuration.
pub fn build(&self) -> NativeDeserializer {
NativeDeserializer::default()
NativeDeserializer
}

/// Return the type of event build by this deserializer.
Expand Down
Loading

0 comments on commit 23426d0

Please sign in to comment.