Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(elasticsearch sink): add support for data streams #5126

Merged
merged 17 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
234e4f3
enhancement(elasticsearch sink): add support for data streams
spencergilbert Nov 19, 2020
ba3905f
enhancement(elasticsearch sink): rename to as_json_pointer
spencergilbert Nov 20, 2020
d82cc00
enhancement(elasticsearch sink): add serde default to BulkAction
spencergilbert Nov 20, 2020
e8fa4aa
enhancement(elasticsearch sink): move serde default to ES config struct
spencergilbert Nov 20, 2020
dc508c0
enhancement(elasticsearch sink): format code
spencergilbert Nov 20, 2020
d2c8e9b
enhancement(elasticsearch sink): improve default for BulkAction
spencergilbert Dec 5, 2020
a4251e8
enhancement(elasticsearch sink): include bulk_action in documentation
spencergilbert Dec 5, 2020
e91eb3c
enhancement(elasticsearch sink): working, but not optimal unit test
spencergilbert Dec 5, 2020
2172a4b
enhancement(elasticsearch sink): remove accidentally re-added Elastic…
spencergilbert Dec 5, 2020
6a27e40
enhancement(elasticsearch sink): fix docs for es sink
spencergilbert Dec 5, 2020
2c8015a
enhancement(elasticsearch sink): more cue fixes
spencergilbert Dec 5, 2020
1e3409d
enhancement(elasticsearch sink): update unit test to include timestam…
spencergilbert Dec 11, 2020
38dd9f6
enhancement(elasticsearch sink): fmt, add data_streams how_it_works
spencergilbert Dec 12, 2020
a76318a
enhancement(elasticsearch sink): add requirements for data streams in…
spencergilbert Dec 12, 2020
0627628
enhancement(elasticsearch sink): fix typo in es docs
spencergilbert Dec 12, 2020
8892547
enhancement(elasticsearch sink): updates to documentation
spencergilbert Dec 18, 2020
d543d71
Remove trailing whitespace
jszwedko Dec 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions docs/reference/components/sinks/elasticsearch.cue
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ components: sinks: elasticsearch: {
"x86_64-unknown-linux-musl": true
}

requirements: []
requirements: [
#"""
Elasticsearch's Data streams feature requires Vector to be configured with the `create` `bulk_action`. *This is not enabled by default.*
"""#,
]
warnings: []
notices: []
}
Expand Down Expand Up @@ -151,6 +155,16 @@ components: sinks: elasticsearch: {
}
}
}
bulk_action: {
common: false
description: "Action to use when making requests to the [Elasticsearch Bulk API](elasticsearch_bulk). Supports `index` and `create`."
required: false
warnings: []
type: string: {
default: "index"
examples: ["index", "create"]
}
}
doc_type: {
common: false
description: "The `doc_type` for your index data. This is only relevant for Elasticsearch <= 6.X. If you are using >= 7.0 you do not need to set this option since Elasticsearch has removed it."
Expand Down Expand Up @@ -236,9 +250,19 @@ components: sinks: elasticsearch: {
title: "Conflicts"
body: """
Vector [batches](#buffers--batches) data flushes it to Elasticsearch's
[`_bulk` API endpoint][urls.elasticsearch_bulk]. All events are inserted
via the `index` action. In the case of an conflict, such as a document with the
same `id`, Vector will add or _replace_ the document as necessary.
[`_bulk` API endpoint][urls.elasticsearch_bulk]. By default, all events are
inserted via the `index` action which will update documents if an existing
one has the same `id`. If `bulk_action` is configured with `create`, Elasticsearch
will _not_ replace an existing document and instead return a conflict error.
"""
}

data_streams: {
title: "Data streams"
body: """
By default, Vector will use the `index` action with Elasticsearch's Bulk API.
To use [Data streams][urls.elasticsearch_data_streams], `bulk_action` must be configured
with the `create` option.
"""
}

Expand Down
1 change: 1 addition & 0 deletions docs/reference/urls.cue
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ urls: {
cidr: "https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing"
elasticsearch: "https://www.elastic.co/products/elasticsearch"
elasticsearch_bulk: "https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html"
elasticsearch_data_streams: "https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html"
elasticsearch_id_field: "https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html"
elasticsearch_id_performance: "https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html#_use_auto_generated_ids"
elasticsearch_ignore_malformed: "https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html"
Expand Down
61 changes: 59 additions & 2 deletions src/sinks/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct ElasticSearchConfig {

pub aws: Option<RegionOrEndpoint>,
pub tls: Option<TlsOptions>,
#[serde(default)]
pub bulk_action: BulkAction,
}

lazy_static! {
Expand All @@ -84,6 +86,31 @@ pub enum ElasticSearchAuth {
Aws { assume_role: Option<String> },
}

#[derive(Derivative, Deserialize, Serialize, Clone, Debug)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
#[derivative(Default)]
pub enum BulkAction {
#[derivative(Default)]
Index,
Create,
}

impl BulkAction {
pub fn as_str(&self) -> &'static str {
match *self {
BulkAction::Index => "index",
BulkAction::Create => "create",
}
}

pub fn as_json_pointer(&self) -> &'static str {
match *self {
BulkAction::Index => "/index",
BulkAction::Create => "/create",
}
}
}

inventory::submit! {
SinkDescription::new::<ElasticSearchConfig>("elasticsearch")
}
Expand Down Expand Up @@ -146,6 +173,7 @@ pub struct ElasticSearchCommon {
compression: Compression,
region: Region,
query_params: HashMap<String, String>,
bulk_action: BulkAction,
}

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -177,14 +205,16 @@ impl HttpSink for ElasticSearchCommon {
.ok()?;

let mut action = json!({
"index": {
self.bulk_action.as_str(): {
"_index": index,
"_type": self.doc_type,
}
});
maybe_set_id(
self.config.id_key.as_ref(),
action.pointer_mut("/index").unwrap(),
action
.pointer_mut(self.bulk_action.as_json_pointer())
.unwrap(),
&mut event,
);

Expand Down Expand Up @@ -372,6 +402,7 @@ impl ElasticSearchCommon {
let index = Template::try_from(index).context(IndexTemplate)?;

let doc_type = config.doc_type.clone().unwrap_or_else(|| "_doc".into());
let bulk_action = config.bulk_action.clone();

let request = config.request.unwrap_with(&REQUEST_DEFAULTS);

Expand Down Expand Up @@ -404,6 +435,7 @@ impl ElasticSearchCommon {
compression,
region,
query_params,
bulk_action,
})
}

Expand Down Expand Up @@ -527,6 +559,31 @@ mod tests {
assert_eq!(json!({}), action);
}

#[test]
fn sets_create_action_when_configured() {
use crate::config::log_schema;
use chrono::{TimeZone, Utc};

let config = ElasticSearchConfig {
bulk_action: BulkAction::Create,
index: Some(String::from("vector")),
endpoint: String::from("https://example.com"),
..Default::default()
};
let es = ElasticSearchCommon::parse_config(&config).unwrap();

let mut event = Event::from("hello there");
event.as_mut_log().insert(
log_schema().timestamp_key(),
Utc.ymd(2020, 12, 1).and_hms(1, 2, 3),
);
let encoded = es.encode_event(event).unwrap();
let expected = r#"{"create":{"_index":"vector","_type":"_doc"}}
{"message":"hello there","timestamp":"2020-12-01T01:02:03Z"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), &expected[..]);
}

#[test]
fn handles_error_response() {
let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
Expand Down