Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add real world topology benchmark #6274

Merged
merged 5 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 348 additions & 1 deletion benches/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,357 @@ fn benchmark_complex(c: &mut Criterion) {
group.finish();
}

fn benchmark_real_world_1(c: &mut Criterion) {
let num_lines: usize = 100_000;

let in_addr = next_addr();
let out_addr_company_api = next_addr();
let out_addr_company_admin = next_addr();
let out_addr_company_media_proxy = next_addr();
let out_addr_company_unfurler = next_addr();
let out_addr_audit = next_addr();

let mut group = c.benchmark_group("real_world_1");
group.sampling_mode(SamplingMode::Flat);
group.throughput(Throughput::Elements(num_lines as u64));
group.bench_function("topology", |b| {
b.iter_batched(
|| {
let mut config = config::Config::builder();
config.add_source(
"in",
sources::socket::SocketConfig::make_basic_tcp_config(in_addr),
);

let toml_cfg = r##"
##
## company-api
##

[transforms.company_api]
type = "field_filter"
inputs = ["in"]
field = "appname"
value = "company-api"

[transforms.company_api_json]
type = "json_parser"
inputs = ["company_api"]
drop_invalid = true

[transforms.company_api_timestamp]
type = "split"
inputs = ["company_api_json"]
field = "timestamp"
field_names = ["timestamp"]
separator = "."

[transforms.company_api_timestamp.types]
timestamp = "timestamp|%s"

[transforms.company_api_metadata]
type = "lua"
inputs = ["company_api_timestamp"]
source = """
event["metadata_trace_id"] = event["metadata.trace_id"]
event["metadata_guild_id"] = event["metadata.guild_id"]
event["metadata_channel_id"] = event["metadata.channel_id"]
event["metadata_method"] = event["metadata.method"]
"""

[transforms.company_api_rename]
type = "rename_fields"
inputs = ["company_api_metadata"]

[transforms.company_api_rename.fields]
timestamp = "time"
host = "hostname"
# "metadata.trace_id" = "metadata_trace_id"
# "metadata.guild_id" = "metadata_guild_id"
# "metadata.channel_id" = "metadata_channel_id"
# "metadata.method" = "metadata_method"

##
## company-admin
##

[transforms.company_admin]
type = "field_filter"
inputs = ["in"]
field = "appname"
value = "company-admin"

[transforms.company_admin_json]
type = "json_parser"
inputs = ["company_admin"]
drop_invalid = true

[transforms.company_admin_timestamp]
type = "split"
inputs = ["company_admin_json"]
field = "timestamp"
field_names = ["timestamp"]
separator = "."

[transforms.company_admin_timestamp.types]
timestamp = "timestamp|%s"

[transforms.company_admin_metadata]
type = "lua"
inputs = ["company_admin_timestamp"]
source = """
event["metadata_trace_id"] = event["metadata.trace_id"]
event["metadata_method"] = event["metadata.method"]
"""

[transforms.company_admin_rename]
type = "rename_fields"
inputs = ["company_admin_metadata"]

[transforms.company_admin_rename.fields]
timestamp = "time"
host = "hostname"
# "metadata.trace_id" = "metadata_trace_id"
# "metadata.method" = "metadata_method"

##
## company-media-proxy
##

[transforms.company_media_proxy]
type = "field_filter"
inputs = ["in"]
field = "appname"
value = "company-media-proxy"

[transforms.company_media_proxy_json]
type = "json_parser"
inputs = ["company_media_proxy"]
drop_invalid = true

[transforms.company_media_proxy_timestamp]
type = "split"
inputs = ["company_media_proxy_json"]
field = "ts"
field_names = ["ts"]
separator = "."

[transforms.company_media_proxy_timestamp.types]
ts = "timestamp|%s"

[transforms.company_media_proxy_rename]
type = "rename_fields"
inputs = ["company_media_proxy_timestamp"]

[transforms.company_media_proxy_rename.fields]
ts = "time"
host = "hostname"

##
## company-unfurler
##

[transforms.company_unfurler]
type = "field_filter"
inputs = ["in"]
field = "appname"
value = "company-unfurler"

[transforms.company_unfurler_hostname]
type = "rename_fields"
inputs = ["company_unfurler"]

[transforms.company_unfurler_hostname.fields]
host = "hostname"

[transforms.company_unfurler_json]
type = "json_parser"
inputs = ["company_unfurler_hostname"]
drop_invalid = true

[transforms.company_unfurler_timestamp]
type = "coercer"
inputs = ["company_unfurler_json"]

[transforms.company_unfurler_timestamp.types]
ts = "timestamp"

[transforms.company_unfurler_rename]
type = "rename_fields"
inputs = ["company_unfurler_timestamp"]

[transforms.company_unfurler_rename.fields]
ts = "time"

[transforms.company_unfurler_filter]
type = "field_filter"
inputs = ["company_unfurler_rename"]
field = "msg"
value = "unfurl"

##
## audit
##

[transforms.audit]
type = "field_filter"
inputs = ["in"]
field = "appname"
value = "audit"

[transforms.audit_timestamp]
type = "coercer"
inputs = ["audit"]

[transforms.audit_timestamp.types]
timestamp = "timestamp"

[transforms.audit_rename]
type = "rename_fields"
inputs = ["audit_timestamp"]

[transforms.audit_rename.fields]
appname = "tag"
host = "hostname"
message = "content"
timestamp = "time"
"##;

let parsed =
config::format::deserialize(toml_cfg, Some(config::Format::TOML)).unwrap();
config.append(parsed).unwrap();

config.add_sink(
"company_api_sink",
&["company_api_rename"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(
out_addr_company_api.to_string(),
),
);
config.add_sink(
"company_admin_sink",
&["company_admin_rename"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(
out_addr_company_admin.to_string(),
),
);
config.add_sink(
"company_media_proxy_sink",
&["company_media_proxy_rename"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(
out_addr_company_media_proxy.to_string(),
),
);
config.add_sink(
"company_unfurler_sink",
&["company_unfurler_filter"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(
out_addr_company_unfurler.to_string(),
),
);
config.add_sink(
"audit_sink",
&["audit_rename"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(
out_addr_audit.to_string(),
),
);

let mut rt = runtime();
let (
output_lines_company_api,
output_lines_company_admin,
output_lines_company_media_proxy,
output_lines_company_unfurler,
output_lines_audit,
topology,
) = rt.block_on(async move {
let output_lines_company_api =
CountReceiver::receive_lines(out_addr_company_api);
let output_lines_company_admin =
CountReceiver::receive_lines(out_addr_company_admin);
let output_lines_company_media_proxy =
CountReceiver::receive_lines(out_addr_company_media_proxy);
let output_lines_company_unfurler =
CountReceiver::receive_lines(out_addr_company_unfurler);
let output_lines_audit = CountReceiver::receive_lines(out_addr_audit);

let (topology, _crash) = start_topology(config.build().unwrap(), false).await;
wait_for_tcp(in_addr).await;
(
output_lines_company_api,
output_lines_company_admin,
output_lines_company_media_proxy,
output_lines_company_unfurler,
output_lines_audit,
topology,
)
});
(
rt,
topology,
output_lines_company_api,
output_lines_company_admin,
output_lines_company_media_proxy,
output_lines_company_unfurler,
output_lines_audit,
)
},
|(
mut rt,
topology,
output_lines_company_api,
output_lines_company_admin,
output_lines_company_media_proxy,
output_lines_company_unfurler,
output_lines_audit,
)| {
rt.block_on(async move {
// Generate the inputs.
let lines = [
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
r#"<118>3 2020-03-13T20:45:38.119Z my.host.com company-api 2004 ID960 - {"metadata": {"trace_id": "trace123", "guide_id": "guild123", "channel_id": "channel123", "method": "method"}}"#,
r#"<118>3 2020-03-13T20:45:38.119Z my.host.com company-admin 2004 ID960 - {"metadata": {"trace_id": "trace123", "guide_id": "guild123", "channel_id": "channel123", "method": "method"}}"#,
r#"<118>3 2020-03-13T20:45:38.119Z my.host.com company-media-proxy 2004 ID960 - {"ts": "2020-03-13T20:45:38.119Z"}"#,
r#"<118>3 2020-03-13T20:45:38.119Z my.host.com company-unfurler 2004 ID960 - {"ts": "2020-03-13T20:45:38.119Z", "msg": "unfurl"}"#,
r#"<118>3 2020-03-13T20:45:38.119Z my.host.com company-admin 2004 ID960 - {"metadata": {"trace_id": "trace123", "guide_id": "guild123", "channel_id": "channel123", "method": "method"}}"#,
].iter().cycle().take(num_lines).map(|&s| s.to_owned());
send_lines(in_addr, lines).await.unwrap();

topology.stop().await;

let output_lines_company_api = output_lines_company_api.await.len();
let output_lines_company_admin = output_lines_company_admin.await.len();
let output_lines_company_media_proxy =
output_lines_company_media_proxy.await.len();
let output_lines_company_unfurler = output_lines_company_unfurler.await.len();
let output_lines_audit = output_lines_audit.await.len();

debug_assert!(output_lines_company_api > 0);
debug_assert!(output_lines_company_admin > 0);
debug_assert!(output_lines_company_media_proxy > 0);
debug_assert!(output_lines_company_unfurler > 0);
debug_assert!(output_lines_audit > 0);

(
output_lines_company_api,
output_lines_company_admin,
output_lines_company_media_proxy,
output_lines_company_unfurler,
output_lines_audit,
)
});
},
BatchSize::PerIteration,
);
});

group.finish();
}

criterion_group!(
name = benches;
// encapsulates CI noise we saw in
// https://github.com/timberio/vector/issues/5394
config = Criterion::default().noise_threshold(0.20);
targets = benchmark_simple_pipes, benchmark_interconnected, benchmark_transforms, benchmark_complex
targets = benchmark_simple_pipes, benchmark_interconnected, benchmark_transforms, benchmark_complex, benchmark_real_world_1
);
2 changes: 1 addition & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod builder;
mod compiler;
pub mod component;
mod diff;
mod format;
pub mod format;
mod loading;
mod log_schema;
mod unit_test;
Expand Down