Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect new implementation of BufferedTokenizer against OOM #17275

Open
andsel opened this issue Mar 6, 2025 · 3 comments · May be fixed by #17293
Open

Protect new implementation of BufferedTokenizer against OOM #17275

andsel opened this issue Mar 6, 2025 · 3 comments · May be fixed by #17293

Comments

@andsel
Copy link
Contributor

andsel commented Mar 6, 2025

In #17229 is applied an implementation change to the BufferedTokenizer. It switched from RubyArray and JRuby classes to pure Java, so that the method extract now return an Iterable instead of the RubyArray.

The implementation in split in two parts:

  • one iterator that is responsible to accumulate the fragments coming from the extract method invocation
  • an outer iterator that decorates the first and simply apply the sizeLimit verification throwing an exception when it receives a token longer than sizeLimit.

Despite the implementation seems more linear, it reintroduces a problem that was surfaced in logstash-plugins/logstash-codec-json_lines#43.

The inner iterator uses a StringBuilder as accumulator, and on every call of the extract, it dices the tokens by separator.
Now if a very big payload is pushed down to a codec that uses this BufferedTokenizer, where the payload simply require multiple invocations of extract before resulting in a tokenization (that could also never happen, suppose an offending client that never send the separator), the memory gets filled with data that is useless and would be rejected by the next iterator, but simply never reach it because goes OOM before.
In the previous implementation the tokenization part, when firstly detected an overrun of the sizeLimit, dropped every fragment till the next separator was presented, protecting from OOM conditions like this.

Now are my questions:

@robbavey
Copy link
Member

@andsel Did you check in the inputs (tcp/file) that use the BufferedTokenizer whether this behaviour can occur?

eg (from @jsvd)

for example tcp input gets packets that are limited by the mtu, or aggregated to 16k if they’re tls, and file input reads in chunks, not sure if these smaller < 100kb chunks are accumulated elsewhere otherwise the buffer always gets something smaller.

@andsel
Copy link
Contributor Author

andsel commented Mar 12, 2025

It happens also with TCP input, using the following pipeline, with testing data originated from logstash-plugins/logstash-codec-json_lines#43

input {
  tcp {
    port => 1234

    codec => json_lines {
      decode_size_limit_bytes => 32768
    }
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

and streaming the big json with netcat:

cat /path/to/big_single_line.json | netcat localhost 1234
TCP test logs

[2025-03-12T09:43:40,293][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2025-03-12T09:45:29,133][INFO ][logstash.codecs.jsonlines][main][1b68bf0f03797ca47bad81969b1ebe54482840f6f86353b356630f06fecc2da4] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)




java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid3230.hprof ...
Heap dump file created [393883507 bytes in 0.239 secs]
[2025-03-12T09:45:45,209][ERROR][logstash.inputs.tcp      ][main][1b68bf0f03797ca47bad81969b1ebe54482840f6f86353b356630f06fecc2da4] localhost/127.0.0.1:50744: closing due:
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3541) ~[?:?]
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:242) ~[?:?]
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587) ~[?:?]
	at java.lang.StringBuilder.append(StringBuilder.java:179) ~[?:?]
	at org.logstash.common.BufferedTokenizer$DataSplitter.append(BufferedTokenizer.java:102) ~[logstash-core.jar:?]
	at org.logstash.common.BufferedTokenizer.extract(BufferedTokenizer.java:135) ~[logstash-core.jar:?]
	at org.logstash.common.BufferedTokenizerExt.extract(BufferedTokenizerExt.java:83) ~[logstash-core.jar:?]
	at java.lang.invoke.LambdaForm$DMH/0x00000008007dcc00.invokeVirtual(LambdaForm$DMH) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007f0800.invoke(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007e4000.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007e4000.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.Invokers$Holder.linkToCallSite(Invokers$Holder) ~[?:?]
	at Users.andrea.workspace.logstash_plugins.logstash_minus_codec_minus_json_lines.lib.logstash.codecs.json_lines.RUBY$method$decode$0(/Users/andrea/workspace/logstash_plugins/logstash-codec-json_lines/lib/logstash/codecs/json_lines.rb:69) ~[?:?]
	at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x0000000800bda400.invoke(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x0000000800849800.invoke(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007e7800.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008007e7800.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.Invokers$Holder.linkToCallSite(Invokers$Holder) ~[?:?]
	at Users.andrea.workspace.logstash_andsel.vendor.bundle.jruby.$3_dot_1_dot_0.gems.logstash_minus_input_minus_tcp_minus_7_dot_0_dot_2_minus_java.lib.logstash.inputs.tcp.RUBY$method$decode_buffer$0(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/3.1.0/gems/logstash-input-tcp-7.0.2-java/lib/logstash/inputs/tcp.rb:215) ~[?:?]
	at java.lang.invoke.LambdaForm$DMH/0x00000008010cb400.invokeStatic(LambdaForm$DMH) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008010d3400.invoke(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008010d4400.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(DelegatingMethodHandle$Holder) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008010d4400.guard(LambdaForm$MH) ~[?:?]
	at java.lang.invoke.LambdaForm$MH/0x00000008010d0c00.linkToCallSite(LambdaForm$MH) ~[?:?]
	at Users.andrea.workspace.logstash_andsel.vendor.bundle.jruby.$3_dot_1_dot_0.gems.logstash_minus_input_minus_tcp_minus_7_dot_0_dot_2_minus_java.lib.logstash.inputs.tcp.decoder_impl.RUBY$method$decode$0(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/3.1.0/gems/logstash-input-tcp-7.0.2-java/lib/logstash/inputs/tcp/decoder_impl.rb:23) ~[?:?]
[2025-03-12T09:45:45,295][WARN ][io.netty.util.concurrent.DefaultPromise][main][1b68bf0f03797ca47bad81969b1ebe54482840f6f86353b356630f06fecc2da4] An exception was thrown by org.logstash.tcp.InputLoop$InputHandler$FlushOnCloseListener.operationComplete()
java.lang.OutOfMemoryError: Java heap space
	at java.lang.Object.clone(Native Method) ~[?:?]
	at java.lang.String.encode8859_1(String.java:1029) ~[?:?]
	at java.lang.String.encode8859_1(String.java:1024) ~[?:?]
	at java.lang.String.encode(String.java:870) ~[?:?]
	at java.lang.String.getBytes(String.java:1818) ~[?:?]
	at org.logstash.common.BufferedTokenizerExt.toEncodedRubyString(BufferedTokenizerExt.java:96) ~[logstash-core.jar:?]
	at org.logstash.common.BufferedTokenizerExt.flush(BufferedTokenizerExt.java:114) ~[logstash-core.jar:?]
	at org.logstash.common.BufferedTokenizerExt$INVOKER$i$0$0$flush.call(BufferedTokenizerExt$INVOKER$i$0$0$flush.gen) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:456) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:195) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:346) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:103) ~[jruby.jar:?]
	at org.jruby.ir.instructions.CallBase.interpret(CallBase.java:545) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:363) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92) ~[jruby.jar:?]
	at org.jruby.ir.instructions.CallBase.interpret(CallBase.java:548) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:363) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115) ~[jruby.jar:?]
	at org.jruby.gen.LogStash$$Inputs$$Tcp$$DecoderImpl_1354257020.flush(org/jruby/gen/LogStash$$Inputs$$Tcp$$DecoderImpl_1354257020.gen:13) ~[?:?]
	at org.logstash.tcp.InputLoop$InputHandler$FlushOnCloseListener.operationComplete(InputLoop.java:174) ~[logstash-input-tcp-7.0.2.jar:?]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.118.Final.jar:4.1.118.Final]
^C[2025-03-12T09:47:36,392][WARN ][logstash.runner          ] SIGINT received. Shutting down.
[2025-03-12T09:47:40,658][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2025-03-12T09:47:41,404][WARN ][logstash.runner          ] Received shutdown signal, but pipeline is still waiting for in-flight events
to be processed. Sending another ^C will force quit Logstash, but this may cause
data loss.
[2025-03-12T09:47:41,579][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2025-03-12T09:47:41,584][INFO ][logstash.runner          ] Logstash shut down.

With the input file Logstash will fail because at https://github.com/logstash-plugins/logstash-input-file/blob/55a4a7099f05f29351672417036c1342850c7adc/lib/filewatch/watched_file.rb#L250 expect an array, so the iterable has to be converted to a Ruby array and the proposal PR implementation throws a NoMethodError

@andsel
Copy link
Contributor Author

andsel commented Mar 12, 2025

Final decision
Given that the OOM can happen in the file input, the plan is to apply #17293 on top of #17229 once it's merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants