Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lumberjack to Lumberjack data loss under congestion #3691

Closed
jsvd opened this issue Aug 4, 2015 · 29 comments
Closed

Lumberjack to Lumberjack data loss under congestion #3691

jsvd opened this issue Aug 4, 2015 · 29 comments

Comments

@jsvd
Copy link
Member

jsvd commented Aug 4, 2015

Sending from Logstash A to Logstash B using lumberjack input/output plugins results in data loss if Logstash B suffers back pressure from its filters/outputs. (The Forwader show the same behavior)

My test using two logstash instances (Client and Server):

Client config:

input {  generator { count => 1000 } }
filter {
  mutate { replace => ["message", "%{sequence}" ] }  # use sequence codes to track loss
  ruby { code => "sleep 1" }  # slow production to 1 event per second
}
output {
  stdout { }
  lumberjack {
    hosts => "localhost"
    port => 6782
    ssl_certificate => "/tmp/server.crt"
  }
}

Server config:

input {
  lumberjack {
    port => 6782
    ssl_certificate => "/tmp/server.crt"
    ssl_key => "/tmp/server.key"
  }
}
filter {
  ruby { code => "sleep 5" }  # simulate slow consumer in ouputs
}
output { stdout { } }

Starting the server then the client and allowing both to run for a while makes the server output flows such as:

...
2015-08-04T13:03:52.524Z %{host} 2015-08-04T13:03:34.437Z Joaos-MBP.lan 39
2015-08-04T13:03:53.530Z %{host} 2015-08-04T13:03:35.442Z Joaos-MBP.lan 40
2015-08-04T13:03:54.535Z %{host} 2015-08-04T13:03:36.446Z Joaos-MBP.lan 41
2015-08-04T13:03:55.541Z %{host} 2015-08-04T13:03:36.446Z Joaos-MBP.lan 42
2015-08-04T13:03:56.545Z %{host} 2015-08-04T13:03:38.457Z Joaos-MBP.lan 43
2015-08-04T13:04:38.053Z %{host} 2015-08-04T13:03:53.526Z Joaos-MBP.lan 58
2015-08-04T13:04:38.057Z %{host} 2015-08-04T13:03:54.533Z Joaos-MBP.lan 59
2015-08-04T13:05:18.420Z %{host} 2015-08-04T13:04:28.693Z Joaos-MBP.lan 93
2015-08-04T13:05:18.422Z %{host} 2015-08-04T13:04:29.698Z Joaos-MBP.lan 94
2015-08-04T13:05:18.424Z %{host} 2015-08-04T13:04:29.698Z Joaos-MBP.lan 95
...
@ph
Copy link
Contributor

ph commented Aug 4, 2015

There is some possible cause of this behavior that I will investigate:

  • The circuit breaker errors handling changes how the server is doing package acknowledgement and the clients thinks that the server has correctly received the whole transmission.
  • or the clients doesn't try to resend the whole packet when and error occur.

@ph
Copy link
Contributor

ph commented Aug 4, 2015

One of the thing I see here is the number of events sent are much lower than the default window size. (1000). I believe this is still a bug.

@ph
Copy link
Contributor

ph commented Aug 4, 2015

Edited to make mention of the forwarder

@ph
Copy link
Contributor

ph commented Aug 4, 2015

From discussion on hipchat with @jordansissel and @jsvd

Summary:

  1. Make CircuitBreaker#execute raise an exception if the breaker trips or is already open
  2. Make lumberjack input rescue an exception from CircuitBreaker#execute cause lumberjack input to terminate the connection from lsf

@jordansissel
Copy link
Contributor

@jsvd and @ph and I met to discuss this issue. We think the problem is easily solved and is a bug in interaction between the lumberjack input and the circuit breaker

@jordansissel
Copy link
Contributor

@ph haha you are too fast ;)

@ph
Copy link
Contributor

ph commented Aug 4, 2015

@jordansissel It because of zoom for iphone.

@suyograo suyograo added the v1.5.4 label Aug 4, 2015
@ph
Copy link
Contributor

ph commented Aug 4, 2015

I've tried a few of the things we have discussed,

  • Throw exceptions and explicitly close the connection.
  • Reduced the size of the internal buffer, after thinking having more than a size == 1 could be an issue since we acknowledge the events in bulk using the last sequence number, so it may cause us to skip event.
              begin
                @circuit_breaker.execute { @buffered_queue << event }
              rescue LogStash::CircuitBreaker::OpenBreaker,
                LogStash::CircuitBreaker::HalfOpenBreaker => e
                logger.error("!!!!Forcing a close on the connection")
                connection.close
              end

I am revisiting my conditions to check If I can narrow the problem more.

@ph
Copy link
Contributor

ph commented Aug 4, 2015

Also by looking at the code I dont explicitly need to rescue it, since the #run will close the connection on any exception.

https://github.com/elastic/ruby-lumberjack/blob/master/lib/lumberjack/server.rb#L238-L240

@jordansissel
Copy link
Contributor

since we acknowledge the events in bulk using the last sequence number

@ph yeah, the idea is that we trend towards "at least once" delivery. A failure in a batch causing the connection to be terminated will cause LSF to retransmit the whole batch (the full window size, likely). This causes duplicates, but it is better than loss. Long term (filebeats) we we'll likely do partial acks (like log-courier) or something similar to reduce duplicates while still achieveing at-least-once delivery.

@ph
Copy link
Contributor

ph commented Aug 4, 2015

I was expecting duplicates, actually I saw some on the log and I was expecting this behavior.
I am still checking my code, I am still seeing event being skipped so I think its a logic problem on my side.

@jordansissel
Copy link
Contributor

hmm, ok. Let me know what I can help test.

@ph
Copy link
Contributor

ph commented Aug 4, 2015

Some missing and some duplicates

{"message":"20","@version":"1","@timestamp":"2015-08-04T19:09:01.333Z","file":"/tmp/log","host":"sashimi","offset":"50"}
{"message":"21","@version":"1","@timestamp":"2015-08-04T19:09:01.335Z","file":"/tmp/log","host":"sashimi","offset":"53"}
{"message":"22","@version":"1","@timestamp":"2015-08-04T19:09:01.335Z","file":"/tmp/log","host":"sashimi","offset":"56"}
{"message":"23","@version":"1","@timestamp":"2015-08-04T19:09:01.335Z","file":"/tmp/log","host":"sashimi","offset":"59"}
{"message":"20","@version":"1","@timestamp":"2015-08-04T19:09:41.830Z","file":"/tmp/log","host":"sashimi","offset":"50"}
{"message":"22","@version":"1","@timestamp":"2015-08-04T19:09:43.838Z","file":"/tmp/log","host":"sashimi","offset":"56"}
{"message":"25","@version":"1","@timestamp":"2015-08-04T19:10:22.233Z","file":"/tmp/log","host":"sashimi","offset":"65"}
{"message":"29","@version":"1","@timestamp":"2015-08-04T19:10:28.251Z","file":"/tmp/log","host":"sashimi","offset":"77"}
{"message":"30","@version":"1","@timestamp":"2015-08-04T19:10:26.297Z","file":"/tmp/log","host":"sashimi","offset":"80"}
{"message":"30","@version":"1","@timestamp":"2015-08-04T19:10:28.282Z","file":"/tmp/log","host":"sashimi","offset":"80"}
{"message":"31","@version":"1","@timestamp":"2015-08-04T19:10:28.939Z","file":"/tmp/log","host":"sashimi","offset":"83"}

@ph
Copy link
Contributor

ph commented Aug 4, 2015

I am using this https://gist.github.com/jsvd/cbb2371f9da1e3a733cc to run my test.

@ph
Copy link
Contributor

ph commented Aug 5, 2015

I took some steps back.

Okay, I have fixed some exception handling issues (blocks and threads sometime are no fun).
This is what I have changed and tested on my side with the LSF.

  • I've changed the size of the SizeQueueWithTimeout to 1, this will help debug the issues because the block will be sooner and we will lose less events.
  • I've moved the exception handling inside the #invoke method, since the thread will not bubble them..
  • The CircuitBreaker will throw an exception if it encounter a timeout, this will force the connection.run from the ruby-lumberjack to actually close the connection of the client. This will stop any ack to be sent to the client.

This is what I have found so far:

We still lose events

I have added the following logs on different part of the event lifecycle in the plugins.

  • Log every event id that we received.
  • Log event id that raised an exception closing the connection.
  • Log what actually logstash has received.

Event that raised an exception: https://gist.github.com/322b0e9a1fee3ce4061f
Events actually logged by logstash: https://gist.github.com/701a9475cd52b6b01c74
All event ID received: https://gist.github.com/70e8b63b813c100c9e42

Look at document 25:

  • It did a timeout
  • it raised an exception
  • It closed the connection
  • I would have expect it not to be ack
  • We did not received it back ever,
  • Also every id near 25 is a bit weird.

So something is fishy on the ack/retry side.

@ph
Copy link
Contributor

ph commented Aug 5, 2015

@ph
Copy link
Contributor

ph commented Aug 5, 2015

Add thread number in the debug files

@ph
Copy link
Contributor

ph commented Aug 5, 2015

Event that raised an exception: https://gist.github.com/d955b79b547ab45ec300
Events actually logged by logstash: https://gist.github.com/71764dd823004f7fc9ff
All event ID received: https://gist.github.com/7b483b41de27cc480dc9

@ph
Copy link
Contributor

ph commented Aug 5, 2015

Adding another log to know which thread succesfully send an event down the pipeline.
https://gist.github.com/ac5429c6ab5bcb616224

@ph
Copy link
Contributor

ph commented Aug 5, 2015

I've removed the reuse thread logic an the event that raised an exception is easier to follow.
https://gist.github.com/f8303c7f2ceacdca38a3

@ph
Copy link
Contributor

ph commented Aug 5, 2015

If we look at the events we see how LSF is retrying to resend some events
Each thread ID represent a new connection from one LSF client.

We fail at event 22, so the LSF try to send back from 17 his last ack point and fail at 18.
Then send 24, so a ack happened without sending the data to the pipeline.

See.

th 165154, event id 18 -> was send to the pipeline successfully
th 165154, event id 19 ->  was send to the pipeline successfully
th 165154, event id 20 -> was send to the pipeline successfully
th 165154, event id 21 -> was send to the pipeline successfully
th 165154, event id 22 -> exception raise
th 165156, event id 17 -> was send to the pipeline successfully
th 165156, event id 18 -> exception raise
th 165158, event id 24 -> exception raise
th 165160, event id 24 -> exception raise
th 165162, event id 24 -> exception raise
th 165164, event id 24 -> exception raise
th 165166, event id 24 -> was send to the pipeline successfully from 
th 165166, event id 25 -> was send to the pipeline successfully from

Note I didnt see any timeout connection the LSF side so all the reconnection originated from the server.

@ph
Copy link
Contributor

ph commented Aug 6, 2015

Event received and ack, look around 24

th 168514, 0 -> send to the pipeline successfully.
th 168514, 1 -> send to the pipeline successfully.
th 168514, 2 -> send to the pipeline successfully.
th 168514, 3 -> send to the pipeline successfully.
th 168514, ack sequence 4 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"6", "line"=>"3"}
th 168514, 4 -> send to the pipeline successfully.
th 168514, 5 -> send to the pipeline successfully.
th 168514, 6 -> send to the pipeline successfully.
th 168514, 7 -> send to the pipeline successfully.
th 168514, 8 -> send to the pipeline successfully.
th 168514, 9 -> send to the pipeline successfully.
th 168514, 10 -> send to the pipeline successfully.
th 168514, ack sequence 11 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"20", "line"=>"10"}
th 168514, 11 -> send to the pipeline successfully.
th 168514, 12 -> send to the pipeline successfully.
th 168514, 13 -> send to the pipeline successfully.
th 168514, 14 -> send to the pipeline successfully.
th 168514, 15 -> send to the pipeline successfully.
th 168514, 16 -> send to the pipeline successfully.
th 168514, 17 -> send to the pipeline successfully.
th 168514, 18 -> send to the pipeline successfully.
th 168514, ack sequence 19 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"44", "line"=>"18"}
th 168514, 19 -> send to the pipeline successfully.
th 168514, 20 -> send to the pipeline successfully.
th 168514, 21 -> send to the pipeline successfully.
th 168514, 22 -> send to the pipeline successfully.
th 168514, 23 -> send to the pipeline successfully.
th 168514, ack sequence 24 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"59", "line"=>"23"}
th 168514, 24 -> send to the pipeline successfully.
th 168514 25 -> exception raise.
th 168514 -> I've closed the connection.
th 168516 24 -> exception raise.
th 168516 -> I've closed the connection.
th 168518 24 -> exception raise.
th 168518 -> I've closed the connection.
th 168520 24 -> exception raise.
th 168520 -> I've closed the connection.
th 168522 24 -> exception raise.
th 168522 -> I've closed the connection.
th 168524 24 -> exception raise.
th 168524 -> I've closed the connection.
th 168526, 24 -> send to the pipeline successfully.
th 168526, ack sequence 25 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"62", "line"=>"24"}
th 168526 25 -> exception raise.
th 168526 -> I've closed the connection.
th 168528 29 -> exception raise.
th 168528 -> I've closed the connection.
th 168530 29 -> exception raise.
th 168530 -> I've closed the connection.
th 168532 29 -> exception raise.
th 168532 -> I've closed the connection.
th 168534 29 -> exception raise.
th 168534 -> I've closed the connection.
th 168536 29 -> exception raise.
th 168536 -> I've closed the connection.
th 168538, 29 -> send to the pipeline successfully.
th 168538, ack sequence 30 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"77", "line"=>"29"}
th 168538 30 -> exception raise.
th 168538 -> I've closed the connection.
th 168540 34 -> exception raise.
th 168540 -> I've closed the connection.
th 168542 34 -> exception raise.
th 168542 -> I've closed the connection.
th 168544 34 -> exception raise.
th 168544 -> I've closed the connection.
th 168546, 34 -> send to the pipeline successfully.
th 168546, ack sequence 35 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"92", "line"=>"34"}
th 168546, 35 -> send to the pipeline successfully.
th 168546, 36 -> send to the pipeline successfully.
th 168546, 37 -> send to the pipeline successfully.
th 168546, 38 -> send to the pipeline successfully.
th 168546, 39 -> send to the pipeline successfully.
th 168546, ack sequence 40 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"107", "line"=>"39"}
th 168546, 40 -> send to the pipeline successfully.
th 168546 41 -> exception raise.
th 168546 -> I've closed the connection.
th 168548 40 -> exception raise.
th 168548 -> I've closed the connection.
th 168550, 40 -> send to the pipeline successfully.
th 168550, ack sequence 41 event: {"file"=>"/tmp/log", "host"=>"sashimi", "offset"=>"110", "line"=>"40"}
th 168550, 41 -> send to the pipeline successfully.
th 168550, 42 -> send to the pipeline successfully.
th 168550, 43 -> send to the pipeline successfully.
th 168550, 44 -> send to the pipeline successfully.
th 168550, 45 -> send to the pipeline successfully.
th 168550, 46 -> send to the pipeline successfully.
th 168550, 47 -> send to the pipeline successfully.
th 168550, 48 -> send to the pipeline successfully.
th 168550, 49 -> send to the pipeline successfully.
th 168550, 50 -> send to the pipeline successfully.
th 168550 51 -> exception raise.
th 168550 -> I've closed the connection.
th 168552, 42 -> send to the pipeline successfully.
th 168552 43 -> exception raise.
th 168552 -> I've closed the connection.
th 168554 42 -> exception raise.
th 168554 -> I've closed the connection.
th 168556 42 -> exception raise.
th 168556 -> I've closed the connection.
th 168558 42 -> exception raise.
th 168558 -> I've closed the connection.
th 168560 42 -> exception raise.
th 168560 -> I've closed the connection.

@ph
Copy link
Contributor

ph commented Aug 7, 2015

After a lot of logging in different place of the application and pairing with @jordansissel on this issue we have found the problem, but lets take a few notes here:

  • LSF send a payload, the high level format is this: the window size (number of events), the events with a sequence number for each events.
  • When the server is done processing the payload he send the last sequence number of the payload to ACK that he received the whole payload.
  • LSF is checking for ACK after sending a payload to the lumberjack server.
  • LSF doesn't actually verify the ACK payload (He doesn't check if the sequence number send back from the server actually match his internal sequence number)
  • If something goes wrong with the connection, LSF will resend the payload.

There was an issue on server side to decide when to actually send the ACK, so on retry instead of sending the sequence number of the last event we were sending the sequence number of the first event received. Since we aren't doing any verification on the sequence number LSF think that the whole payload was correctly received by LSF, also after this situation the sequence number on the LSF and the server side become out of sync.

  • We have a patch that change the sending the ack logic on the server side that should fix this issue.
  • We need to do more testing on the ruby client to see if the logic is the same.

@ph
Copy link
Contributor

ph commented Aug 7, 2015

Since this issue also talk about the lumberjack to lumberjack problem, I have found that the ruby client doesn't use the same contract protocol as the LSF.

The ruby client uses a statically defined windows size at the beginning of the connection and assume the window to be 5000 and doesn't allow you to send events in bulk. So the ruby client check for the ACK every 5000 events. This explain the difference of behavior in the congestion scenario between the LSF and the lumberjack output.

see: elastic/ruby-lumberjack#7

@ph
Copy link
Contributor

ph commented Aug 7, 2015

Configuring the window_size to 1 to the lumberjack could be a temporary solution.

@ph
Copy link
Contributor

ph commented Aug 11, 2015

I've reverted the window_size == 1 solution and implemented the bulk send in the ruby client and added a buffer in the lumberjack output using Stud::Buffer.

@ph
Copy link
Contributor

ph commented Aug 11, 2015

I have also used this PR elastic/logstash-forwarder#508 to debug the issue.

@ph
Copy link
Contributor

ph commented Aug 21, 2015

Fixed in logstash 1.5.4

@ph ph closed this as completed Aug 21, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants