-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(new sink): Add possibility to use nats jetstream in nats sink #20834
feat(new sink): Add possibility to use nats jetstream in nats sink #20834
Conversation
5deb812
to
243c492
Compare
243c492
to
b2d1f5a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good from NATS maintainer perspective.
We can also make async JetStream publishing in the future possible here.
Thanks @whatcouldbepizza ! We'll review this shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, thanks. Just a couple of minor nits with the code.
I'm having difficulty testing the code, possibly just down to my misunderstanding.
Running NATS in Docker:
docker run -p 4222:4222 -p 8222:8222 -p 6222:6222 --name nats-server -ti nats:latest
I'm pulling messages from NATS with one instance of Vector configured with:
sources:
nats:
type: nats
url: "nats://localhost:4222"
subject: nork
connection_name: zork
sinks:
console:
type: console
inputs:
- nats
target: stdout
encoding:
codec: json
And I'm pushing messages to NATS with another instance of Vector running this PR branch.
sources:
in:
type: stdin
sinks:
nats:
type: nats
inputs:
- in
subject: nork
url: "nats://localhost:4222"
jetstream: true
encoding:
codec: json
I type a message into stdin
for the NATS sink Vector instance and I get the following:
zork
2024-07-11T12:40:26.908904Z WARN sink{component_kind="sink" component_id=nats component_type=nats}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=NATS Server Error: NATS Publish Error: timed out: didn't receive ack in time internal_log_rate_limit=true
2024-07-11T12:40:32.642235Z WARN sink{component_kind="sink" component_id=nats component_type=nats}:request{request_id=1}: vector::sinks::util::retries: Internal log [Retrying after error.] is being suppressed to avoid flooding.
2024-07-11T12:40:38.539561Z WARN sink{component_kind="sink" component_id=nats component_type=nats}:request{request_id=1}: vector::sinks::util::retries: Internal log [Retrying after error.] has been suppressed 1 times.
2024-07-11T12:40:38.539593Z WARN sink{component_kind="sink" component_id=nats component_type=nats}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=NATS Server Error: NATS Publish Error: timed out: didn't receive ack in time internal_log_rate_limit=true
2024-07-11T12:40:44.042736Z WARN sink{component_kind="sink" component_id=nats component_type=nats}:request{request_id=1}: vector::sinks::util::retries: Internal log [Retrying after error.] is being suppressed to avoid flooding.
The Vector instance running the NATS source receives this message repeatedly since the NATS sink keeps retrying to send:
{"message":"{\"host\":\"stephenwakely\",\"message\":\"zork\",\"source_type\":\"stdin\",\"timestamp\":\"2024-07-11T12:40:21.905962302Z\"}","source_type":"nats","subject":"nork","timestamp":"2024-07-11T12:40:21.908185811Z"}
{"message":"{\"host\":\"stephenwakely\",\"message\":\"zork\",\"source_type\":\"stdin\",\"timestamp\":\"2024-07-11T12:40:21.905962302Z\"}","source_type":"nats","subject":"nork","timestamp":"2024-07-11T12:40:27.641924291Z"}
{"message":"{\"host\":\"stephenwakely\",\"message\":\"zork\",\"source_type\":\"stdin\",\"timestamp\":\"2024-07-11T12:40:21.905962302Z\"}","source_type":"nats","subject":"nork","timestamp":"2024-07-11T12:40:33.539870420Z"}
{"message":"{\"host\":\"stephenwakely\",\"message\":\"zork\",\"source_type\":\"stdin\",\"timestamp\":\"2024-07-11T12:40:21.905962302Z\"}","source_type":"nats","subject":"nork","timestamp":"2024-07-11T12:40:39.042825638Z"}
{"message":"{\"host\":\"stephenwakely\",\"message\":\"zork\",\"source_type\":\"stdin\",\"timestamp\":\"2024-07-11T12:40:21.905962302Z\"}","source_type":"nats","subject":"nork","timestamp":"2024-07-11T12:40:44.557572405Z"}
Do you know where I am going wrong? Or is there an error in this code somewhere?
This may be code problem, I'll test it and come back a bit later |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approved for docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @whatcouldbepizza I approved this but suggested to remove the """"
from the description strings :)
@@ -383,6 +383,15 @@ base: components: sinks: nats: configuration: { | |||
} | |||
} | |||
} | |||
jetstream: { | |||
description: """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
description: """ | |
description: |
actually these should be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file has been generated from the doc comments added in the code, so it won't be possible to remove these quotes. They are a part of cue
and don't make it all the way through to the final docs on the vector.dev website..
Send messages using [Jetstream][jetstream]. | ||
|
||
[jetstream]: https://docs.nats.io/nats-concepts/jetstream | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" |
""" | ||
required: false | ||
type: bool: default: false | ||
} | ||
request: { | ||
description: """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
description: """ | |
description: |
@StephenWakely unfortunately, I can't reproduce the problem, for me sinking works just fine both with and without I can see that jetstream should be configured for nats, did you enable it? |
Hi @aliciascott |
Curious. So I hadn't enabled jetstream. So I try again, this time with it enabled: 〉docker run --rm -p 4222:4222 -p 8222:8222 -p 6222:6222 --name nats-server -ti nats:latest --jetstream nats-jetstream-option
[1] 2024/07/12 10:30:51.558624 [INF] Starting nats-server
[1] 2024/07/12 10:30:51.558695 [INF] Version: 2.10.17
[1] 2024/07/12 10:30:51.558697 [INF] Git: [b91de03]
[1] 2024/07/12 10:30:51.558698 [INF] Name: NDUJEGWDTBUA43AXYO7FQELMWSVNUCGBOP4A7NXVD2R6Z7FZ7XIEIQSA
[1] 2024/07/12 10:30:51.558702 [INF] Node: Y0rIweGa
[1] 2024/07/12 10:30:51.558703 [INF] ID: NDUJEGWDTBUA43AXYO7FQELMWSVNUCGBOP4A7NXVD2R6Z7FZ7XIEIQSA
[1] 2024/07/12 10:30:51.558912 [INF] Starting JetStream
[1] 2024/07/12 10:30:51.559112 [INF] _ ___ _____ ___ _____ ___ ___ _ __ __
[1] 2024/07/12 10:30:51.559116 [INF] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ |
[1] 2024/07/12 10:30:51.559117 [INF] | || | _| | | \__ \ | | | / _| / _ \| |\/| |
[1] 2024/07/12 10:30:51.559118 [INF] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_|
[1] 2024/07/12 10:30:51.559120 [INF]
[1] 2024/07/12 10:30:51.559121 [INF] https://docs.nats.io/jetstream
[1] 2024/07/12 10:30:51.559123 [INF]
[1] 2024/07/12 10:30:51.559123 [INF] ---------------- JETSTREAM ----------------
[1] 2024/07/12 10:30:51.559126 [INF] Max Memory: 23.26 GB
[1] 2024/07/12 10:30:51.559128 [INF] Max Storage: 8.00 GB
[1] 2024/07/12 10:30:51.559129 [INF] Store Directory: "/tmp/nats/jetstream"
[1] 2024/07/12 10:30:51.559131 [INF] -------------------------------------------
[1] 2024/07/12 10:30:51.559406 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2024/07/12 10:30:51.559483 [INF] Server is ready But I still get the same error. Running NATS in Docker on Ubuntu 22.04.4 LTS. |
I still get the error when running Nats outside of docker. Tried with Vector in debug and release mode. |
Did you create a stream in NATS? Can I see its |
Ah ok. That was it. It now works great. Would it be possible to add something like this to the doc comments for
It's interesting how it managed to send the message through even though an error was returned. |
I think that's the way NATS works: it something like accepts messages in P. S. Pushed changes to field description and generated docs |
Is there a way we could verify if the stream exists in the healthcheck? |
There is actually a way to filter stream list by subjects. I just need to expose it in Rust client. Will do that and make it part of a next release. That should solve the issue. |
Oh awesome. That would be amazing! @whatcouldbepizza can you remove the healthcheck for now and create an issue to remind us to update it when the next version of |
Removed checking if stream exists in |
@whatcouldbepizza There are some clippy errors, mentioned here. Also can you add a changelog entry to detail the change? Details can be found here. |
Hi @StephenWakely! Sorry, was on a short vacation :) I've added |
### Example | ||
|
||
#### Config | ||
|
||
``` | ||
sinks: | ||
nats: | ||
type: nats | ||
inputs: | ||
- in | ||
subject: nork | ||
url: "nats://localhost:4222" | ||
jetstream: true | ||
encoding: | ||
codec: json | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changelog should be fairly succinct, so we don't need the example here. We do need the authors included for external contributors.
### Example | |
#### Config | |
``` | |
sinks: | |
nats: | |
type: nats | |
inputs: | |
- in | |
subject: nork | |
url: "nats://localhost:4222" | |
jetstream: true | |
encoding: | |
codec: json | |
``` | |
authors: whatcouldbepizza |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also can you make sure it ends with a newline please.
Thanks for the comments, fixed the changelog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect! Thank you.
Regression Detector ResultsRun ID: 039e15d8-5fdf-40b4-b5f3-096e6b93e3d8 Metrics dashboard Baseline: 3b6a738 Performance changes are noted in the perf column of each table:
No significant changes in experiment optimization goalsConfidence level: 90.00% There were no significant changes in experiment optimization goals at this confidence level and effect size tolerance.
|
perf | experiment | goal | Δ mean % | Δ mean % CI | links |
---|---|---|---|---|---|
➖ | file_to_blackhole | egress throughput | +1.91 | [-5.21, +9.03] |
Fine details of change detection per experiment
perf | experiment | goal | Δ mean % | Δ mean % CI | links |
---|---|---|---|---|---|
➖ | syslog_regex_logs2metric_ddmetrics | ingress throughput | +2.05 | [+1.89, +2.20] | |
➖ | file_to_blackhole | egress throughput | +1.91 | [-5.21, +9.03] | |
➖ | http_elasticsearch | ingress throughput | +1.81 | [+1.63, +1.99] | |
➖ | datadog_agent_remap_datadog_logs_acks | ingress throughput | +1.80 | [+1.63, +1.98] | |
➖ | syslog_log2metric_humio_metrics | ingress throughput | +1.75 | [+1.65, +1.86] | |
➖ | splunk_hec_route_s3 | ingress throughput | +1.70 | [+1.40, +2.01] | |
➖ | socket_to_socket_blackhole | ingress throughput | +1.38 | [+1.30, +1.46] | |
➖ | otlp_http_to_blackhole | ingress throughput | +1.27 | [+1.09, +1.45] | |
➖ | syslog_log2metric_tag_cardinality_limit_blackhole | ingress throughput | +1.23 | [+1.14, +1.33] | |
➖ | datadog_agent_remap_blackhole | ingress throughput | +1.23 | [+1.14, +1.32] | |
➖ | datadog_agent_remap_blackhole_acks | ingress throughput | +0.38 | [+0.28, +0.48] | |
➖ | datadog_agent_remap_datadog_logs | ingress throughput | +0.37 | [+0.16, +0.57] | |
➖ | http_to_http_noack | ingress throughput | +0.03 | [-0.10, +0.16] | |
➖ | splunk_hec_indexer_ack_blackhole | ingress throughput | +0.02 | [-0.06, +0.11] | |
➖ | splunk_hec_to_splunk_hec_logs_noack | ingress throughput | +0.01 | [-0.10, +0.11] | |
➖ | splunk_hec_to_splunk_hec_logs_acks | ingress throughput | +0.00 | [-0.10, +0.11] | |
➖ | http_to_s3 | ingress throughput | -0.06 | [-0.33, +0.20] | |
➖ | http_to_http_json | ingress throughput | -0.08 | [-0.14, -0.01] | |
➖ | syslog_splunk_hec_logs | ingress throughput | -0.19 | [-0.28, -0.11] | |
➖ | otlp_grpc_to_blackhole | ingress throughput | -0.24 | [-0.37, -0.11] | |
➖ | fluent_elasticsearch | ingress throughput | -1.09 | [-1.64, -0.54] | |
➖ | http_to_http_acks | ingress throughput | -1.28 | [-2.61, +0.05] | |
➖ | syslog_humio_logs | ingress throughput | -1.36 | [-1.45, -1.26] | |
➖ | syslog_log2metric_splunk_hec_metrics | ingress throughput | -1.55 | [-1.65, -1.45] | |
➖ | http_text_to_http_json | ingress throughput | -2.17 | [-2.30, -2.04] | |
➖ | syslog_loki | ingress throughput | -2.68 | [-2.75, -2.60] |
Explanation
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
Unfortunately the Provided you have Docker installed, to run the integration tests on your machine you can run
|
I haven't look at the test itself, but as we removed |
Flushing messages after driver |
Ah, apologies, I missed that in my earlier review. We do unfortunately need to flush each message after each call. This is because of our E2E acknowledgement guarantees. If someone turns that on we need to be able to guarantee to the connected source that the message has been successfully delivered. If the message is sent without being flushed we risk passing a success back to the source, but then the flush fails and so the message isn't delivered. Tiny bit of discussion here. Could you put the flush back after each message please? |
Just put back flush after each message |
Regression Detector ResultsRun ID: d020c965-97d9-4591-8a83-a6114183dadc Metrics dashboard Baseline: 787e1ec Performance changes are noted in the perf column of each table:
No significant changes in experiment optimization goalsConfidence level: 90.00% There were no significant changes in experiment optimization goals at this confidence level and effect size tolerance.
|
perf | experiment | goal | Δ mean % | Δ mean % CI | links |
---|---|---|---|---|---|
❌ | file_to_blackhole | egress throughput | -20.84 | [-26.96, -14.72] |
Fine details of change detection per experiment
perf | experiment | goal | Δ mean % | Δ mean % CI | links |
---|---|---|---|---|---|
➖ | datadog_agent_remap_datadog_logs | ingress throughput | +1.74 | [+1.49, +1.99] | |
➖ | datadog_agent_remap_datadog_logs_acks | ingress throughput | +1.54 | [+1.40, +1.67] | |
➖ | splunk_hec_route_s3 | ingress throughput | +1.28 | [+0.99, +1.57] | |
➖ | otlp_grpc_to_blackhole | ingress throughput | +1.05 | [+0.92, +1.18] | |
➖ | syslog_splunk_hec_logs | ingress throughput | +1.00 | [+0.92, +1.07] | |
➖ | http_to_http_acks | ingress throughput | +0.52 | [-0.80, +1.84] | |
➖ | otlp_http_to_blackhole | ingress throughput | +0.31 | [+0.10, +0.52] | |
➖ | fluent_elasticsearch | ingress throughput | +0.11 | [-0.40, +0.61] | |
➖ | splunk_hec_to_splunk_hec_logs_noack | ingress throughput | +0.01 | [-0.09, +0.11] | |
➖ | splunk_hec_indexer_ack_blackhole | ingress throughput | +0.01 | [-0.07, +0.09] | |
➖ | splunk_hec_to_splunk_hec_logs_acks | ingress throughput | +0.00 | [-0.11, +0.12] | |
➖ | http_to_http_noack | ingress throughput | -0.02 | [-0.14, +0.11] | |
➖ | socket_to_socket_blackhole | ingress throughput | -0.02 | [-0.08, +0.04] | |
➖ | http_to_http_json | ingress throughput | -0.13 | [-0.20, -0.06] | |
➖ | syslog_regex_logs2metric_ddmetrics | ingress throughput | -0.26 | [-0.41, -0.12] | |
➖ | http_elasticsearch | ingress throughput | -0.32 | [-0.49, -0.16] | |
➖ | http_to_s3 | ingress throughput | -0.40 | [-0.67, -0.14] | |
➖ | datadog_agent_remap_blackhole_acks | ingress throughput | -0.62 | [-0.72, -0.52] | |
➖ | http_text_to_http_json | ingress throughput | -0.67 | [-0.80, -0.54] | |
➖ | syslog_log2metric_humio_metrics | ingress throughput | -0.99 | [-1.15, -0.82] | |
➖ | syslog_log2metric_splunk_hec_metrics | ingress throughput | -0.99 | [-1.12, -0.87] | |
➖ | syslog_loki | ingress throughput | -1.24 | [-1.32, -1.16] | |
➖ | syslog_log2metric_tag_cardinality_limit_blackhole | ingress throughput | -1.41 | [-1.50, -1.32] | |
➖ | syslog_humio_logs | ingress throughput | -1.83 | [-1.92, -1.74] | |
➖ | datadog_agent_remap_blackhole | ingress throughput | -2.43 | [-2.53, -2.34] | |
❌ | file_to_blackhole | egress throughput | -20.84 | [-26.96, -14.72] |
Explanation
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
…ectordotdev#20834) * use nats jetstream as an option * no need to manually flush messages * fix jetstream option annotation + prettify code * generate component docs * add more precise field description + generate docs * check nats stream existence in healthcheck * do not check stream existance * add new field to struct in tests * add changelog * add author to changelog * remove example config from changelog * flush core messages * flush after each message
…ectordotdev#20834) * use nats jetstream as an option * no need to manually flush messages * fix jetstream option annotation + prettify code * generate component docs * add more precise field description + generate docs * check nats stream existence in healthcheck * do not check stream existance * add new field to struct in tests * add changelog * add author to changelog * remove example config from changelog * flush core messages * flush after each message
Add new option
jetstream
to nats sink configuration that allows using jetstream to sink into nats