-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AVRO-3802: [Csharp] Fix memory leak on deflate codec decompression #2439
Conversation
{ | ||
CopyTo(Decompress, outStream); | ||
using (MemoryStream outStream = new MemoryStream(inStream.Capacity)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is inStream.Capacity
worthwhile here? It's the compressed size, and the uncompressed data that is written to outStream
could be much larger. I guess it won't hurt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I like to stack the using in the beginning in cases like this. e.g. https://github.com/apache/avro/blob/master/lang/csharp/src/apache/codec/Avro.File.BZip2/BZip2.cs#L78-L79
Addistionaly use the MemoryStream constructor to limit the inSTream access only to length, mentioned by @KalleOlaviNiemitalo
new DeflateStream(inStream, | ||
CompressionMode.Decompress)) | ||
{ | ||
decompress.CopyTo(outStream, length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
length
seems to be the compressed size, not the uncompressed size, so it should not be used in this CopyTo
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, I think you could use the MemoryStream(byte[] buffer, int index, int count)
constructor for inStream
, to prevent the DeflateStream
from reading more than length
bytes of compressed data.
new DeflateStream(inStream, | ||
CompressionMode.Decompress)) | ||
{ | ||
decompress.CopyTo(outStream, length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, private static void CopyTo(Stream from, Stream to)
is no longer used; please delete it then.
long totalMemoryUsedBytes = currentProcess.WorkingSet64 - startMemoryUsedBytes; | ||
|
||
Assert.IsTrue(totalMemoryUsedBytes == 0, "Total memory usage in working set"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test looks unreliable:
- WorkingSet64 is the amount of physical memory reserved for the process. If the process allocates some memory and doesn't use it afterwards, and the operating system pages it out, then it won't be included in WorkingSet64.
- Process.WorkingSet64 updates only when you call Process.Refresh().
- Can other tests be run in parallel with this one? If they can, the memory allocated by them could cause this test to fail.
- The test code between the
currentProcess.WorkingSet64
evaluations allocates new objects whose memory might not be garbage-collected and released to the OS soon enough. The runtime can also allocate memory for JIT-compiled methods.
The test might become more reliable if it were changed to:
- Run the decompression a few times before the measurement, to get to a stable state.
- Measure AppDomain.MonitoringTotalAllocatedMemorySize or GC.GetAllocatedBytesForCurrentThread(), rather than Process.WorkingSet64.
- Allow some amount of allocations but not too much. This might require a larger data file, in order to distinguish the important allocations from noise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with @KalleOlaviNiemitalo. This test is unreliable and IMO not needed.
@@ -296,5 +296,8 @@ private void Skip(long p) | |||
{ | |||
stream.Seek(p, SeekOrigin.Current); | |||
} | |||
|
|||
/// <inheritdoc /> | |||
public void Dispose() => stream?.Dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. This assumes, that the BinaryDecoder object takes ownership if the stream and once the BinaryDecoder is disposed, the stream is disposed as well and cannot be used any more. Most of the nurmal use cases the stream can be disposed here without any harm, however the calling code might need to keep trhe stream alive in special cases, e.g. there is still some additional data in the stream for other purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the stream should be disposed when The BinaryDecoder is disposed, a new constructor with bool ownStream
can be added, which marks the stream for disposal in the Decoder's Dispose. Similiarly what https://github.com/apache/avro/blob/master/lang/csharp/src/apache/main/File/DataFileReader.cs#L125-L137. Of course that feature is not really in the scope of this PR.
@@ -228,5 +228,8 @@ public void Flush() | |||
{ | |||
Stream.Flush(); | |||
} | |||
|
|||
/// <inheritdoc /> | |||
public void Dispose() => Stream?.Dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue as BinaryDecoder.Dispose
long totalMemoryUsedBytes = currentProcess.WorkingSet64 - startMemoryUsedBytes; | ||
|
||
Assert.IsTrue(totalMemoryUsedBytes == 0, "Total memory usage in working set"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with @KalleOlaviNiemitalo. This test is unreliable and IMO not needed.
{ | ||
CopyTo(Decompress, outStream); | ||
using (MemoryStream outStream = new MemoryStream(inStream.Capacity)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I like to stack the using in the beginning in cases like this. e.g. https://github.com/apache/avro/blob/master/lang/csharp/src/apache/codec/Avro.File.BZip2/BZip2.cs#L78-L79
Addistionaly use the MemoryStream constructor to limit the inSTream access only to length, mentioned by @KalleOlaviNiemitalo
While |
@@ -130,20 +130,22 @@ protected override object ReadRecord(object reuse, RecordSchema writerSchema, Sc | |||
} | |||
} | |||
|
|||
var defaultStream = new MemoryStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar code like this is in GenericReader and PreresolvingDatumReader. If SpecificReader fixes the MemoryStream leak, those others should be fixed as well.
In the case of MemoryStream, the using statement is probably is just a good practice, however when disposable objects are used in C#, IMO it should be clear how an object is disposed, without knowing hwo the object is implemented underneath the hood or it uses unmanaged objects or not. E.g. it is very trivial to convert a memorystream in->out code to FileStream. And in that case the disposed is mandatory to avoid issues, e.g. files being kept open. |
It might be possible to reduce memory allocations further, by changing the Codec API so that the caller of Codec.Decompress has to create the MemoryStream to which Codec.Decompress writes the data. This would:
Alternatively, with a different API change, it might be possible to make BinaryDecoder read directly from the DeflateStream, avoiding the output MemoryStream altogether. However, that change might hurt performance if BinaryDecoder.Read does more work per call than MemoryStream.Read. |
I meant, if DeflateStream.Read does more work per call than MemoryStream.Read. |
Agreed on addig new APIs to the Codecs base class/interface, which are more Stream based and not byte[]. There was this PR #1358 which added the Deflate(start, length) api, however I dont really remeber what the real reason was and I was too lazy to read the whole ticket :) |
|
||
obj = rec.Get(rf.Pos); | ||
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder)); | ||
obj = rec.Get(rf.Pos); |
Check warning
Code scanning / CodeQL
Dereferenced variable may be null Warning
rec
this
foreach (Field rf in rs) | ||
{ | ||
if (writerSchema.Contains(rf.Name)) continue; | ||
|
||
defaultStream.Position = 0; // reset for writing | ||
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue); | ||
defaultStream.Flush(); | ||
defaultStream.Position = 0; // reset for reading | ||
defaultStream.Position = 0; // reset for writing | ||
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue); | ||
defaultStream.Flush(); | ||
defaultStream.Position = 0; // reset for reading | ||
|
||
obj = rec.Get(rf.Pos); | ||
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder)); | ||
obj = rec.Get(rf.Pos); | ||
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder)); | ||
} |
Check notice
Code scanning / CodeQL
Missed opportunity to use Where Note
implicitly filters its target sequence
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class. | ||
/// </summary> | ||
/// <param name="stream">Stream to decode.</param> | ||
/// <param name="ownStream">Leave stream open after disposing the object.</param> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me the ownStream name indicates that this object (BinaryDecoder) will now takes ownership of the stream. So ownStream means "Stream is disposed when this object is disposed".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StreamReader constructor has a bool leaveOpen
parameter for a similar purpose. The default value is false
.
The HttpClient constructor instead has a bool disposeHandler
parameter. The default value is true
.
To preserve compatibility with earlier Avro versions while being easy to understand, I think this should be bool disposeStream
and default to false
. The caller tells BinaryDecoder whether BinaryDecoder should dispose the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, it could be bool leaveOpen
and default to true
; but I feel false
is generally nicer than true
as a default value of a bool
parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes a lot of sense. Will change the descriptions and Dispose logic of the encoder and decoder
public void Dispose() => stream?.Dispose(); | ||
public void Dispose() | ||
{ | ||
if(!ownStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition IMO should be if (ownStream) { stream?.Dispose(); }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the ownStream = false in the default constructor, the stream would be disposed using the default constructor, which would make it a breaking change.
@CamilAbraham @KalleOlaviNiemitalo After thinking more about the ownstream/leaveOpen issue. Adding that featurte requires to convert the BinaryEncoder and Decoder to become IDisposable. I dont think that should be in the scope of this ticket. My vote is not to add the Dispose feature to the Encoder and Decoder |
@zcsizmadia, I agree the IDispose implementation on BinaryEncoder and BinaryDecoder can be omitted from this PR. Especially as this PR does not make DataFileReader<T>.Dispose() call BinaryDecoder.Dispose on its If DataFileReader<T> is changed to dispose of streams other than its main |
|
@CamilAbraham Just a little bit reasoning behind this is if the user has CA2000 (or IDE0067) warning enabled in their code, after making BinaryEncoder or Decoder IDisposable they will get compiler/analyzer warning(s). Just fyi the avro code analyzer config we use, disables CA2000 (https://github.com/apache/avro/blob/master/lang/csharp/CodeAnalysis.src.globalconfig#L413-L414) https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2000 |
Thanks! That's really helpful |
@@ -297,8 +297,6 @@ protected virtual object ReadRecord(object reuse, RecordSchema writerSchema, Sch | |||
var defaultDecoder = new BinaryDecoder(defaultStream); | |||
foreach (Field rf in rs.Fields.Where(rf => !writerSchema.Contains(rf.Name))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CodeQL warnings like this, I ussuallyh just ignore, since it is really not a concern in a legacy code like avro. However thanks for getting rid of it ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect this change makes the code slower, as it now has to allocate a closure and a delegate, and the calls become indirect too. Roslyn contribution guidelines advise "Avoid LINQ" for that reason. So I'd prefer reverting this and disabling the CodeQL warning.
It's my first time contributing to this project, what is the process to get this change merged? |
I will merge it. I was not in a rush to do so, wanted to make sure that @KalleOlaviNiemitalo has time to address any issues he might have. |
byte[] buffer = new byte[4096]; | ||
int read; | ||
while ((read = from.Read(buffer, 0, buffer.Length)) != 0) | ||
using (MemoryStream inStream = new MemoryStream(compressedData, 0, length)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could make MemoryStream inStream
read-only by constructing it with writable: false
(the default is true
). Not important though, as DeflateStream with CompressionMode.Decompress won't try to write to it anyway.
@@ -297,8 +297,6 @@ protected virtual object ReadRecord(object reuse, RecordSchema writerSchema, Sch | |||
var defaultDecoder = new BinaryDecoder(defaultStream); | |||
foreach (Field rf in rs.Fields.Where(rf => !writerSchema.Contains(rf.Name))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect this change makes the code slower, as it now has to allocate a closure and a delegate, and the calls become indirect too. Roslyn contribution guidelines advise "Avoid LINQ" for that reason. So I'd prefer reverting this and disabling the CodeQL warning.
…pache#2439) * AVRO-3802: [Csharp] Fix memory leak on avro deflate codec decompression * AVRO-3802: [Csharp] Address PR comments * AVRO-3802: [Csharp] Revert IDisposable change in Encoder and Decoder * AVRO-3802: [Csharp] Remove implicit filtering of target sequence --------- Co-authored-by: Camil Abraham <camil.abraham@trayport.com>
What is the purpose of the change
The aim of this pull request is to improve memory usage when reading Avro encoded files. Fixing AVRO-3802.
This issue was detected when using Apache.Avro C# library for compression and decompression of large Avro files. Each file is 2-3MB and decompression of 20 files is using 2.1GB memory and memory keep increasing around 200-300MB for follow up calls.
Attached are images taken of memory profiler of three calls before fix. Each call decompress the same 20 files. First call uses 2.1GB, increase to 2.37GB and then 2.68GB.
Call tree is showing most of the memory are used by
DeflatCodec.CopyTo
method.CopyTo
method is not using Array pool. Default Microsoft's streamcopyTo
method is much more efficient.inStream
andoutStream
memory streams are not in using statement.outstream
object creation, object can be initialized with defined length/stream capacity.Verifying this change
Added test that validates that memory usage after reading records from file reader returns to its initial value. Proving that the reader object is disposed correctly.
Documentation
No new features added.