Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Malformed messages result in message buffer queues to fill up #25

Closed
warmfusion opened this issue Aug 24, 2016 · 9 comments
Closed

Malformed messages result in message buffer queues to fill up #25

warmfusion opened this issue Aug 24, 2016 · 9 comments

Comments

@warmfusion
Copy link
Collaborator

I'm not 100% on the sequence of events that result in this scenario, but heres what I think happens...

  1. Fluent steadily sends messages to AMQ
  2. Something causes that output process to delay enough that the messages write to disk for a little while
  3. The buffered messages contain an event which cannot be properly parsed which get rejected and pushed back into the buffer to be retried
  4. The loop of retrying un-parseable messages causes any further events to get stuck

Configuration

2016-08-24 08:47:37 +0000 [info]: starting fluentd-0.12.7
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-amqp' version '0.8.1'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-amqp' version '0.8.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-bigquery' version '0.2.12'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-buffer-lightening' version '0.0.2'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-record-reformer' version '0.7.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluentd' version '0.12.7'
2016-08-24 08:47:38 +0000 [info]: using configuration file: <ROOT>
  <filter **>
    type record_transformer
    enable_ruby false
    <record>
      input_tag ${tag}
      last_tag ${tag_parts[-1]}
      hostname sourceserver.priv.example.org
    </record>
  </filter>
  <match **>
    type amqp
    tag_key true
    exchange fluent.fanout
    exchange_type fanout
    hosts ["rmq-tc-vif-01.priv.example.org","rmq-tc-vif-02.priv.example.org"]
    port 5672
    vhost fluent
    user fluent.writer
    pass this_is_a_password_yup
    buffer_type file
    buffer_path /var/log/fluent/matcher-forwarder.*.buffer
    buffer_chunk_limit 8m
    buffer_queue_limit 128
    flush_interval 5s
    retry_wait 10s
  </match>
  <source>
    type tail
    format json
    time_tag timestamp
    tag app_api.access
    path /var/log/nginx/access.log
    pos_file /var/log/fluent/app_api-access.pos
  </source>
  <source>
    type forward
    port 24224
    bind 127.0.0.1
  </source>
  <source>
    type monitor_agent
    bind 127.0.0.1
    port 24220
  </source>
</ROOT>

Message showing error

2016-08-24 08:47:40 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2016-08-24 08:47:48 +0000 error_class="JSON::GeneratorError" error="source sequence is illegal/malformed utf-8" plugin_id="object:14c3514"
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:223:in `generate'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:223:in `generate'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:394:in `dump'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluent-plugin-amqp-0.8.1/lib/fluent/plugin/out_amqp.rb:83:in `block in write'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:117:in `each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:117:in `block in msgpack_each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/plugin/buf_file.rb:64:in `open'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:114:in `msgpack_each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluent-plugin-amqp-0.8.1/lib/fluent/plugin/out_amqp.rb:82:in `write'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:325:in `write_chunk'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:304:in `pop'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/output.rb:321:in `try_flush'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/output.rb:140:in `run'

@warmfusion
Copy link
Collaborator Author

Looks like i might be able to emulate this failure mode using a simple test json object based on this stackoverflow query

@sawanoboly
Copy link
Member

I Got what is issue.
I think there are two ways, drop or force encode with warning.

@warmfusion
Copy link
Collaborator Author

Dropping with warnings might be the safest approach as forcing encoding might change the meaning of the messages being sent.

If the messages are dropped at least they can be reviewed and fixed by an end-user - just need to make sure they're dropped with a clear indication of why they were dropped.

@warmfusion
Copy link
Collaborator Author

Where about's should I be looking to apply changes?

I'm thinking it might be around here; https://github.com/giraffi/fluent-plugin-amqp/blob/master/lib/fluent/plugin/out_amqp.rb#L84-L90

Is it as simple as wrapping the JSON.dump in a begin/rescue and throwing a log warning for encoding errors?

@sawanoboly
Copy link
Member

wrapping the JSON.dump in a begin/rescue and throwing a log

I seemed it's might better.

@warmfusion
Copy link
Collaborator Author

I'm not sure I understand what you mean...?

@sawanoboly
Copy link
Member

Ah, sorry.
Could you try to wrap JSON.dump ?

@warmfusion
Copy link
Collaborator Author

Going to manually hack this into some of our servers;

        begin
           data = JSON.dump( data ) unless data.is_a?( String )
           log.debug "Sending message #{data}, :key => #{routing_key( tag)} :headers => #{headers(tag,time)}"
           @exch.publish(data, :key => routing_key( tag ), :persistent => @persistent, :headers => headers( tag, time ))
        rescue JSON::GeneratorError => e
           log.error "Failure converting data object to json string: #{e.message}"
           # Debug only - otherwise we may pollute the fluent logs with unparseable events and loop
           log.debug "JSON.dump failure converting [#{data}]"
        end

@sawanoboly
Copy link
Member

It looks good enough to avoid crash. Could you create new PR?

warmfusion added a commit to warmfusion/fluent-plugin-amqp that referenced this issue Sep 6, 2016
…rate with rescues to deal with Generate Errors
@warmfusion warmfusion mentioned this issue Sep 6, 2016
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants