Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new sink): new postgres sink #22481

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

Ichmed
Copy link

@Ichmed Ichmed commented Feb 20, 2025

Summary

A zero-copy postgres sink that requires no new dependencies (it adds one feature on tokio-postgres).
The sink uses a prepared statement to insert the data in pure SQL instead of serializing the data to JSON and deserializing it in the database.

For now the sink can only handle Logs and Traces.

Tests are still missing but it can be E2E tested it using this setup:

sources:
  stdin:
    type: stdin

transforms:
  foobar:
    type: remap
    inputs:
      - stdin
    source: |-
      .json_field = del(.)
      .array_field = [true, true, true]
      .id = "some_id"
      .ignored_field = 1324

sinks:
  posti:
    type: postgres
    host: localhost
    port: 5432
    table: jsontest
    inputs:
      - foobar
CREATE TABLE IF NOT EXISTS public.jsontest
(
    id character varying(255) COLLATE pg_catalog."default",
    json_field json,
    array_field boolean[]
)

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Checklist

  • Please read our Vector contributor resources.
    • make check-all is a good command to run locally. This check is
      defined here. Some of these
      checks might not be relevant to your PR. For Rust changes, at the very least you should run:
      • cargo fmt --all
      • cargo clippy --workspace --all-targets -- -D warnings
      • cargo nextest run --workspace (alternatively, you can run cargo test --all)
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run dd-rust-license-tool write to regenerate the license inventory and commit the changes (if any). More details here.

References

@Ichmed Ichmed requested a review from a team as a code owner February 20, 2025 13:18
@bits-bot
Copy link

bits-bot commented Feb 20, 2025

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added the domain: sinks Anything related to the Vector's sinks label Feb 20, 2025
@pront
Copy link
Member

pront commented Feb 20, 2025

Hi @Ichmed, thank you this PR! There is an existing PR that introduces a postgres sink which is almost there: #21248

@isbm
Copy link

isbm commented Feb 20, 2025

@pront We are fully aware of it and analysed it. 😉 And yet we don't think it is the right way to do. Please take a closer look at the code. Features we can add, no worries. But we have a really enormous data and we need it in large amounts into Postgres and TimescaleDB. We specifically need that optimised for cloud usage (mem/CPU matters!).

In a worst case you will have two sinks! 😆 Call it "lightweight PgSink".

@pront
Copy link
Member

pront commented Feb 20, 2025

@pront We are fully aware of it and analysed it. 😉 And yet we don't think it is the right way to do. Please take a closer look at the code. Features we can add, no worries.

Sure will do. It will take some time though so please bear with me.

But we have a really enormous data and we need it in large amounts into Postgres and TimescaleDB. We specifically need that optimised for cloud usage (mem/CPU matters!).

Did you compare both implementations against some benchmarks?

In a worst case you will have two sinks! 😆 Call it "lightweight PgSink".

Having two sinks doing the same thing is probably not what we want. I do like that #21248 has support for all telemetry data, Vector features such as ACKs and good UX. And most importantly, a lot of testing.

Again, I didn't dive into the differences and I need some time to do so. I wonder, since you looked the existing PR, can you work on optimizing that after it lands?

@isbm
Copy link

isbm commented Feb 20, 2025

Sure will do. It will take some time though so please bear with me.

Thanks!

Having two sinks doing the same thing is probably not what we want. I do like that #21248 has support for all telemetry data, Vector features such as ACKs and good UX. And most importantly, a lot of testing.

To our defence, our day one Chapter 1 is not half-year Chapter 128 😛. We specifically focused on having it zero-copy, no dependencies generic micro-sink. Adding features is not a problem, ACKs are coming, as it is a necessity.

Again, I didn't dive into the differences and I need some time to do so. I wonder, since you looked the existing PR, can you work on optimizing that after it lands?

We would definitely support and maintain ours — that's for sure, because it will go into production straight away. Alternatively, it can land in "contrib" section: more options to choose from is always better. We are interested in bringing more sinks/transforms in a near future.

@Ichmed
Copy link
Author

Ichmed commented Feb 21, 2025

Hi, @pront with these changes we should have feature parity with the other PR aside from configuration.

Is there a nice way to do benchmarks? I looked at the benches directory but didn't really understand how to apply that to this use case.
AFAIK this implementation should be faster than the other, since we are simply doing less work, should have zero allocations per event, are using a prepared statement and have no deserialization happening on the DB side, so if we are slower in any usecase I would consider that a bug that can be fixed.

@jorgehermo9
Copy link
Contributor

jorgehermo9 commented Feb 22, 2025

Hi, I would like to drop my opinion on this.

AFAIK this implementation should be faster than the other, since we are simply doing less work

I'm not really sure about this and claiming about performance improvements and optimizations without measuring it, is a mistake.

I see that you are not batching events and every ingested event results in a network trip. I would be surprised to see that this approach results in a higher throughput than batching them.

Taking a look at your implementation, I'm not sure it would work in a general case. For example, this prepared statement

"INSERT INTO {table} ({}) VALUES ({})",
formats the columns in a specific order (which is the one that the columns are returned by the DB, not deterministic as you are not ordering them in the query)

and then when inserting the column values

.map(|k| v.get(k.as_str()).unwrap_or(&Value::Null))
you are depending on the BTreeMap ordering (which is alphabetically), but it is a different order (at least right now) from the one that you are formatting the columns in the prepared statement (which is non-deterministic and decided by the DB, althought it should be aphabetically ordered also, but you can't ensure that with the current implementation)

Moreover, as you are loading the table column's on sink's startup

let columns: Vec<_> = client.query("SELECT column_name from INFORMATION_SCHEMA.COLUMNS WHERE table_name = $1 AND table_schema = $2", &[&table, &schema]).await?.into_iter().map(|x| x.get(0)).collect();
your implementation does not allow to alter tables and insert new columns while runnning (which #21248 does), you must have to restart the sink so new columns are taken into account. Also, deleting columns while running would cause all events to fail until Vector is restarted.

And also, I'm not sure your implementation works for Composite types (maybe does, but I'm currently not sure if it does).

The implementations are not feature-wise equal so I don't think that a performance comparison makes sense in this case though (whichever would be the fastest).

should have zero allocations per event

No allocation does not always imply to be faster. It generally is faster to not allocate, but does not imply to be faster.

are using a prepared statement

so does #21248. https://docs.rs/sqlx/latest/sqlx/fn.query.html
The connection will transparently prepare and cache the statement, which means it only needs to be parsed once in the connection’s lifetime

more options to choose from is always better

This is also a fallacy. From a new user experience, not having a single solution is actually worse, as users would struggle deciding which one to use, for example. Moreover, it is a maintenance overhead for maintainers to have multiple implementations for nearly the same.

From my point of view, we should not be talking about should be faster and actually measuring it, but as I think this is not feature-wise equal to #21248, I don't know if it makes sense to just choose the fastest

@jorgehermo9
Copy link
Contributor

jorgehermo9 commented Feb 22, 2025

Also, you state that

should have zero allocations per event

but clearly using a BytesMut to encode every event's field

.map(Wrapper);
, you are doing several allocations per event. Those statements about no allocation done should come from validating it with tools like valgrind. Stating that allocations are not done purely based on your written code and not on your dependencies' code (which also must be taken into account) is wrong.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sinks Anything related to the Vector's sinks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants