Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug with at-least-once delivery #10146

Closed
valerypetrov opened this issue Nov 23, 2021 · 6 comments
Closed

Bug with at-least-once delivery #10146

valerypetrov opened this issue Nov 23, 2021 · 6 comments
Labels
sink: elasticsearch Anything `elasticsearch` sink related type: bug A code related bug.

Comments

@valerypetrov
Copy link

valerypetrov commented Nov 23, 2021

Intro

Hi there, during the vector evaluation we've faced with an issue with exactly-once delivery
We have the following pipeline:

logstash producer --> (kafka brokers) <-- logstash consumer --> elasticsearch

We've deployed the vector to consume the same data with the same rules of processing(we are comparing it with the logstash):

(kafka brokers) <-- vector consumer --> elasticsearch

We found, that sometimes vector could produce duplicates or drop the data during the usual processing( in comparison of logstash).

For testing purposes were created a script to generate the data and send it to the producers. Look at the Elasticsearch index stat:

➜  ~ curl -s elasticsearch:9201/_cat/indices | grep ttp-application | grep 2021.11.22
green open sjc06-c01-logs-ttp-application-2021.11.22                      dtTxQxl1TmOMNT1nVjVLUA  5 1    8262710       0    4.4gb    2.2gb
green open sjc06-c01-vector-logs-ttp-application-2021.11.22               NcKdTEUlR1GbTZ6aT5YpxQ  5 1    8259232       0    4.5gb    2.2gb

Look at the document amount of index sjc06-c01-logs-ttp-application-2021.11.22 and index sjc06-c01-vector-logs-ttp-application-2021.11.22 , the total diff is 8262710-8259232=3478 .
It looks like that vector has dropped 3478 documents.

If we take a look for 2021.11.23 indices:

➜  ~ curl -s elasticsearch:9201/_cat/indices | grep ttp-application | grep 2021.11.23
green open sjc06-c01-logs-ttp-application-2021.11.23                      FPzK9zmWQ-i1NTh8tUN7Kw  1 1    9652441       0    4.7gb    2.4gb
green open sjc06-c01-vector-logs-ttp-application-2021.11.23               9Uip_bxSSjy8WvAC947Tkg  1 1    9652819       0    4.9gb    2.4gb

Vector generated 9652819-9652441=378 , duplicates.

Validation

Furthermore, we've created a python script to get data from the elasticsearch per logstash and vector indices. We've made the diff between the indices per document. So, the conclusion that vector is really in some cases may produce duplicates, and in other cases - drop the messages.

I'm assuming that this case due to some issues on the ELK side, and the vector is not able to handle bulk exceptions or something like that - correctly.

Vector Version

vector --version
vector 0.17.0 (x86_64-unknown-linux-gnu 3d34cde 2021-10-08)

Vector Configuration File

log_schema.timestamp_key = "@timestamp"

[sources.internal_metrics_app]
  type = "internal_metrics"

[sinks.monitoring_exporter]

  type = "prometheus_exporter"
  inputs = ["internal_metrics_app"]
  address = "0.0.0.0:9598"

[sources.kafka_source]
  type = "kafka"
  bootstrap_servers = "kafka:9092"
  group_id = "vector-test-consumer-group"
  topics = ["^app.*"]
  auto_offset_reset = "earliest"

[transforms.json_parser_app]
  type = "json_parser"
  inputs = ["kafka_source"]
  drop_field = true
  drop_invalid = true
  field = "message"

[transforms.to_downcase_app]
  type = "remap"
  inputs = ["json_parser_app"]
  source = '''
  .component_type, err = downcase(.component_type)
  .log_type, err = downcase(.log_type)
'''


[transforms.regex_component]

  type = "regex_parser"
  inputs = ["to_downcase_app"]
  drop_failed = true
  drop_field = false
  field = "component_type"
  patterns = ['^[\w]{3,}$']

[transforms.regex_log_type]

  type = "regex_parser"
  inputs = ["regex_component"]
  drop_failed = true
  drop_field = false
  field = "log_type"
  patterns =  ['^[\w.-]{3,}$']

[transforms.remap_app]
  type = "remap"
  inputs = ["regex_log_type"]
  source = '''
  if (.component_type == "zzz" || .component_type == "xxx" || .component_type == "ccc" || .component_type == "vvv" || .component_type == "bbb" || .component_type == "nnn" || .component_type == "mmm" || .component_type == "aaa" ){
      .vector_routing_rule = "rule_c01"
   } else if (.component_type == "qqq" || .component_type == "www" || .component_type == "eee" || .component_type == "rrr" || .component_type == "ttt"){
      .vector_routing_rule = "rule_c02"
   } else  {
      .vector_routing_rule = "rule_c03"
   }

  if (exists(.message) && length(.message) > 30000){
   .long_field = "long_field_warn"
   }

  if (exists(.message) && length(.message) > 80000){
   .long_field = "long_field_crit"
   }

  if (exists(.message) && length(.message) > 100000){
   .long_field = "long_field_truncated"
   .message,err = truncate(.message, limit:  100000)
   }
  if (exists(.stack_trace) && length(.stack_trace) > 30000){
   .long_field =  "long_field_warn"
   }

  if (exists(.stack_trace) && length(.stack_trace) > 80000){
   .long_field = "long_field_crit"
   }

  if ( exists(.stack_trace) && length(.stack_trace) > 100000){
   .long_field = "long_field_truncated"
   .stack_trace, err = truncate(.stack_trace, limit:  100000)
   }

  .index_pattern,err = .component_type + "-" + .log_type
  .consumer = "${HOSTNAME}"
  .consumer_time = now()
'''

[sinks.elasticsearch_app_c01]

  type = "elasticsearch"
  inputs = ["remap_app"]
  compression = "none"
  bulk_action = "index"
  doc_type = "_doc"
  endpoint = "http://elasticsearch:9201"
  index = "sjc06-c01-vector-logs-{{ index_pattern }}-%Y.%m.%d"
  batch.max_bytes = 50490000
  request.concurrency = "adaptive"
  buffer.type = "memory"
  buffer.max_events = 500000

[transforms.vector_component_metrics_app]

  type = "log_to_metric"
  inputs = ["remap_app"]

  [[transforms.vector_component_metrics_app.metrics]]
    field = "component_type"
    name = "component_type_vector"
    namespace = "component"
    type = "counter"
    tags.host = "${HOSTNAME}"
    tags.component_type = "{{ component_type }}"
    tags.log_type = "{{ log_type }}"

[transforms.vector_truncate_metrics_app]

  type = "log_to_metric"
  inputs = ["remap_app"]

  [[transforms.vector_truncate_metrics_app.metrics]]
    field = "long_field"
    name = "long_field_type_vector"
    namespace = "long_field"
    type = "counter"
    tags.host = "${HOSTNAME}"
    tags.long_field = "{{ long_field }}"
    tags.log_type = "{{ log_type }}"


[transforms.aggregate_influx]
  type = "aggregate"
  inputs = ["vector_component_metrics_app", "vector_truncate_metrics_app" ]
  interval_ms = 10_000

[transforms.set_timestamp]
  type = "remap"
  inputs = ["aggregate_influx"]
  source = '''
  .timestamp = now()
 '''


[sinks.influxdb]
  type = "influxdb_metrics"
  inputs = ["set_timestamp"]
  endpoint = "http://influxdb:8086"
  namespace = "vector"
  database = "elk-vector-stats"
  healthcheck = true
  batch.max_events = 2000
  batch.timeout = 5

Expected Behavior

We are expecting the exactly-once delivery with kafka & vector.

Actual Behavior

There is no possibility to get exactly once-delivery with usage of vector & kafka.

Example Data

There is no matter which data is used to be processed. The issue with exactly-once delivery occurs on each component that we are covering for log shipping.
But here is the example data that was generated to get the truly results:

{
    'component_type': 'ttp',
    'first_name': 'East Jeffreyhaven',
    'street_address': '7464 John Summit Suite 346',
    'city': 'East Jeffreyhaven',
    'email': 'hreilly@white.net',
    'log_type': "application",
    "log_format": "json",
    'ip': '249.20.105.238'
    'index': '39556'
}
@valerypetrov valerypetrov added the type: bug A code related bug. label Nov 23, 2021
@hhromic
Copy link
Contributor

hhromic commented Nov 23, 2021

As far as I know, Vector for now only provides at-least-once guarantees:
https://vector.dev/docs/about/under-the-hood/guarantees/#delivery-guarantees

Further down below in that same link:

Does Vector support exactly-once delivery?
No, Vector does not support exactly once delivery. There are future plans to partially support this for sources and sinks that support it, for example Kafka, but it remains unclear if Vector will ever be able to achieve this. We recommend subscribing to our mailing list, which will keep you in the loop if this ever changes.

But, let's wait for a Vector developer to chime-in as this information might be outdated.

@valerypetrov
Copy link
Author

@hhromic , Yeah, thanks for paying attention, but this bug is not even fit for "at-least-once" paradigm too. Because In our case vector, is still dropping some messages.
I will change the for the topic from exactly-once to at-least-once.

@valerypetrov valerypetrov changed the title Bug with exactly-once delivery Bug with at-least-once delivery Nov 23, 2021
@spencergilbert
Copy link
Contributor

@valerypetrov have you tested this with acknowledgements enabled?

@jszwedko jszwedko added the sink: elasticsearch Anything `elasticsearch` sink related label Dec 13, 2021
@jszwedko
Copy link
Member

I'm assuming that this case due to some issues on the ELK side, and the vector is not able to handle bulk exceptions or something like that - correctly.

This is accurate in that partial successes will result in Vector dropping any of the errored events. There is some discussion around this in #140 .

@valerypetrov
Copy link
Author

@valerypetrov have you tested this with acknowledgements enabled?

I've tried the acknowledgments option and still no luck. Here are too many duplicates:

➜  ~ curl -s elaticsearch:9201/_cat/indices | grep "kbm-mrs-2022.01.17"
green open sjc06-vector-kbm-mrs-2022.01.17                             nKo_ulApTrau1v9X27vLAA  5 1      80914       0   60.6mb   29.9mb
green open sjc06-c01-logs-kbm-mrs-2022.01.17                           KSOt29RDQFmdm52VbrZ9HA  1 1      50633       0     27mb   13.3mb

@jszwedko
Copy link
Member

This does seem related to mishandling of bulk acknowledgement errors. Recently a request_retry_partial option was added to the elasticsearch sink to have Vector retry bulk failures (make sure to set an _id field for deduplication).

I'll close this as the above seems very likely to have been the issue, but feel free to reopen if this behavior is still being observed even when setting a unique _id and using request_retry_partial.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sink: elasticsearch Anything `elasticsearch` sink related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

4 participants