-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(new sink): Add sinks sftp support #18076
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Xuanwo <github@xuanwo.io>
✅ Deploy Preview for vector-project canceled.
|
✅ Deploy Preview for vrl-playground canceled.
|
Are you still seeing the compilation errors with Thanks for opening this to get the conversation started! We'll get a more thorough review soon. |
Thanks for the remind from @gaby, all build issues have been fixed.
Let's confirm the correct direction before pushing any works to docs and configs. |
@Xuanwo, just to confirm, if we merge this will you be willing to continue supporting this sink in future? |
This PR is far away from been merged. So I'm guessing we are talking the future: when this PR is ready and sftp is able to use, will I keep supporting sftp sink? My answer is YES. As you may know, both webhdfs and sftp are OpenDAL based sinks. Most logic are vector internal and not related to sinks. And sink related things breaking means OpenDAL is broken. In this case, I will make sure it fixed so that all opendal users will benefit from the fix too. |
There is an ongoing discussion on adding a doc about how to implement and maintain sinks that based opendal. |
Yes. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this looks good to me once the comments are addressed.
I would like to see us add an integration test for this sink, similar to the webhdfs
sink.
Also, can you run make generate-component-docs
to update the generated cue docs to add the sftp to the user-facing documentation? We'll also want to add a note that this only works on unix systems, as mentioned in the crate's docs.
//! `sftp` sink. | ||
//! | ||
//! `sftp` SFTP, or Secure File Transfer Protocol, is a network protocol used for | ||
//! securely transferring files over the internet. It operates over the Secure | ||
//! Shell (SSH) data stream, providing secure file transfer by both encrypting | ||
//! the data and maintaining the integrity of the transfer. SFTP also supports | ||
//! file management operations like moving and deleting files on the server, unlike FTP. | ||
//! | ||
//! For more information, please refer to: | ||
//! | ||
//! - [sftp(1) — Linux manual page](https://man7.org/linux/man-pages/man1/sftp.1.html) | ||
//! | ||
//! `sftp` is an OpenDal based services. This mod itself only provide config to build an | ||
//! [`crate::sinks::opendal_common::OpenDalSink`]. All real implement are powered by | ||
//! [`crate::sinks::opendal_common::OpenDalSink`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding a module doc! I have a few small suggestions.
//! `sftp` sink. | |
//! | |
//! `sftp` SFTP, or Secure File Transfer Protocol, is a network protocol used for | |
//! securely transferring files over the internet. It operates over the Secure | |
//! Shell (SSH) data stream, providing secure file transfer by both encrypting | |
//! the data and maintaining the integrity of the transfer. SFTP also supports | |
//! file management operations like moving and deleting files on the server, unlike FTP. | |
//! | |
//! For more information, please refer to: | |
//! | |
//! - [sftp(1) — Linux manual page](https://man7.org/linux/man-pages/man1/sftp.1.html) | |
//! | |
//! `sftp` is an OpenDal based services. This mod itself only provide config to build an | |
//! [`crate::sinks::opendal_common::OpenDalSink`]. All real implement are powered by | |
//! [`crate::sinks::opendal_common::OpenDalSink`]. | |
//! `sftp` sink. | |
//! | |
//! SFTP, or Secure File Transfer Protocol, is a network protocol used for | |
//! securely transferring files over the internet. It operates over the Secure | |
//! Shell (SSH) data stream, providing secure file transfer by both encrypting | |
//! the data and maintaining the integrity of the transfer. SFTP also supports | |
//! file management operations like moving and deleting files on the server, unlike FTP. | |
//! | |
//! For more information, please refer to: | |
//! | |
//! - [sftp(1) — Linux manual page](https://man7.org/linux/man-pages/man1/sftp.1.html) | |
//! | |
//! `sftp` is an OpenDal-based sink. This module itself only provide config to build an | |
//! [`crate::sinks::opendal_common::OpenDalSink`]. |
use vector_config::configurable_component; | ||
use vector_core::{ | ||
config::{AcknowledgementsConfig, DataType, Input}, | ||
sink::VectorSink, | ||
}; | ||
|
||
use crate::{ | ||
codecs::{Encoder, EncodingConfigWithFraming, SinkType}, | ||
config::{GenerateConfig, SinkConfig, SinkContext}, | ||
sinks::{ | ||
opendal_common::*, | ||
util::{ | ||
partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings, | ||
Compression, | ||
}, | ||
Healthcheck, | ||
}, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small nit - we can remove some of the common imports by including crate::sinks::prelude::*
.
use vector_config::configurable_component; | |
use vector_core::{ | |
config::{AcknowledgementsConfig, DataType, Input}, | |
sink::VectorSink, | |
}; | |
use crate::{ | |
codecs::{Encoder, EncodingConfigWithFraming, SinkType}, | |
config::{GenerateConfig, SinkConfig, SinkContext}, | |
sinks::{ | |
opendal_common::*, | |
util::{ | |
partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings, | |
Compression, | |
}, | |
Healthcheck, | |
}, | |
}; | |
use vector_core::config::DataType; | |
use crate::{ | |
codecs::{EncodingConfigWithFraming, SinkType}, | |
sinks::{ | |
prelude::*, | |
opendal_common::*, | |
util::{ | |
partitioner::KeyPartitioner, BulkSizeBasedDefaultBatchSettings, | |
}, | |
}, | |
}; |
}; | ||
|
||
/// Configuration for the `sftp` sink. | ||
#[configurable_component(sink("sftp", "Sftp."))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a more detailed description to this component (the second string)? This description ends up getting populated in the docs at vector.dev.
#[configurable_component(sink("sftp", "Sftp."))] | |
#[configurable_component(sink("sftp", "Secure File Transfer Protocol (SFTP)."))] |
#[derive(Clone, Debug)] | ||
#[serde(deny_unknown_fields)] | ||
pub struct SftpConfig { | ||
/// The root path for Sftp. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest adopting the language from the opendal sftp docs here to make these easier for Vector users to configure.
/// The root path for Sftp. | |
/// The working directory for the backend. Defaults to the default directory | |
/// set by the remote SFTP server. |
#[configurable(metadata(docs::templateable))] | ||
pub prefix: String, | ||
|
||
/// The endpoint to connect to sftp. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// The endpoint to connect to sftp. | |
/// The endpoint of the SFTP backend. |
} | ||
|
||
#[test] | ||
fn Sftp_build_request() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn Sftp_build_request() { | |
fn sftp_build_request() { |
fn request_builder(sink_config: &SftpConfig) -> OpenDalRequestBuilder { | ||
let transformer = sink_config.encoding.transformer(); | ||
let (framer, serializer) = sink_config | ||
.encoding | ||
.build(SinkType::MessageBased) | ||
.expect("encoding must build with success"); | ||
let encoder = Encoder::<Framer>::new(framer, serializer); | ||
|
||
OpenDalRequestBuilder { | ||
encoder: (transformer, encoder), | ||
compression: sink_config.compression, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than re-implement this logic here, we should extract the corresponding logic out of SftpConfig::build_processor
into a separate method so that we can test it in isolation.
} | ||
|
||
#[test] | ||
fn Sftp_generate_config() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn Sftp_generate_config() { | |
fn sftp_generate_config() { |
let req = build_request(Compression::None); | ||
assert!(req.metadata.partition_key.ends_with(".log")); | ||
|
||
let req = build_request(Compression::None); | ||
assert!(req.metadata.partition_key.ends_with(".log")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These lines seem to be redundant.
let req = build_request(Compression::None); | |
assert!(req.metadata.partition_key.ends_with(".log")); | |
let req = build_request(Compression::None); | |
assert!(req.metadata.partition_key.ends_with(".log")); | |
let req = build_request(Compression::None); | |
assert!(req.metadata.partition_key.ends_with(".log")); |
@@ -87,6 +87,8 @@ pub mod redis; | |||
pub mod s3_common; | |||
#[cfg(feature = "sinks-sematext")] | |||
pub mod sematext; | |||
#[cfg(feature = "sinks-sftp")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also gate this behind unix
as the sftp opendal service states that it only works for unix.
#[cfg(feature = "sinks-sftp")] | |
#[cfg(all(unix, feature = "sinks-sftp"))] |
Thanks for the review! I will come back next week. |
@Xuanwo Any updates on this? |
@Xuanwo Do you have plans for updating this PR? |
@dsmith3197 This can probably be closed. The author hasnt responded in months. |
I think we can leave it open until someone else picks up the torch so that they have a base to start from. Otherwise someone might start from scratch, not realizing some efforts have been made already. I'll mark it as draft though. |
This PR is part of #3382.