Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3802: [Csharp] Fix memory leak on deflate codec decompression #2439

Merged
merged 4 commits into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 7 additions & 25 deletions lang/csharp/src/apache/main/File/DeflateCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,14 @@ public override void Compress(MemoryStream inputStream, MemoryStream outputStrea
/// <inheritdoc/>
public override byte[] Decompress(byte[] compressedData, int length)
{

MemoryStream inStream = new MemoryStream(compressedData);
MemoryStream outStream = new MemoryStream();

using (DeflateStream Decompress =
new DeflateStream(inStream,
CompressionMode.Decompress))
{
CopyTo(Decompress, outStream);
}

return outStream.ToArray();
}

/// <summary>
/// Copies to stream.
/// </summary>
/// <param name="from">stream you are copying from</param>
/// <param name="to">stream you are copying to</param>
private static void CopyTo(Stream from, Stream to)
{
byte[] buffer = new byte[4096];
int read;
while ((read = from.Read(buffer, 0, buffer.Length)) != 0)
using (MemoryStream inStream = new MemoryStream(compressedData, 0, length))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make MemoryStream inStream read-only by constructing it with writable: false (the default is true). Not important though, as DeflateStream with CompressionMode.Decompress won't try to write to it anyway.

using (MemoryStream outStream = new MemoryStream())
{
to.Write(buffer, 0, read);
using (DeflateStream decompress = new DeflateStream(inStream, CompressionMode.Decompress))
{
decompress.CopyTo(outStream);
}
return outStream.ToArray();
}
}

Expand Down
26 changes: 14 additions & 12 deletions lang/csharp/src/apache/main/Generic/GenericReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,23 @@
}
}

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}
Fixed Show fixed Hide fixed
}

return rec;
Expand Down
40 changes: 21 additions & 19 deletions lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
var readerDefaultOrdinal = null != readerSchema.Default ? readerSchema.Ordinal(readerSchema.Default) : -1;

foreach (var symbol in writerSchema.Symbols)
{
{
var writerOrdinal = writerSchema.Ordinal(symbol);
if (readerSchema.Contains(symbol))
{
Expand Down Expand Up @@ -274,27 +274,29 @@ private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSch
{
if (writerSchema.Contains(rf.Name)) continue;

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
using (var defaultStream = new MemoryStream())
{
var defaultEncoder = new BinaryEncoder(defaultStream);

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();

var readItem = ResolveReader(rf.Schema, rf.Schema);
var readItem = ResolveReader(rf.Schema, rf.Schema);

var rfInstance = rf;
if(IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream( defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
var rfInstance = rf;
if (IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
}
}

Expand Down
22 changes: 20 additions & 2 deletions lang/csharp/src/apache/main/IO/BinaryDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,28 @@ namespace Avro.IO
/// <summary>
/// Decoder for Avro binary format
/// </summary>
public partial class BinaryDecoder : Decoder
public partial class BinaryDecoder : Decoder, IDisposable
{
private readonly Stream stream;
private readonly bool ownStream;

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
public BinaryDecoder(Stream stream)
public BinaryDecoder(Stream stream) : this(stream, false)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
/// <param name="ownStream">Leave stream open after disposing the object.</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me the ownStream name indicates that this object (BinaryDecoder) will now takes ownership of the stream. So ownStream means "Stream is disposed when this object is disposed".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StreamReader constructor has a bool leaveOpen parameter for a similar purpose. The default value is false.

The HttpClient constructor instead has a bool disposeHandler parameter. The default value is true.

To preserve compatibility with earlier Avro versions while being easy to understand, I think this should be bool disposeStream and default to false. The caller tells BinaryDecoder whether BinaryDecoder should dispose the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, it could be bool leaveOpen and default to true; but I feel false is generally nicer than true as a default value of a bool parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes a lot of sense. Will change the descriptions and Dispose logic of the encoder and decoder

public BinaryDecoder(Stream stream, bool ownStream)
{
this.stream = stream;
this.ownStream = ownStream;
}

/// <summary>
Expand Down Expand Up @@ -296,5 +307,12 @@ private void Skip(long p)
{
stream.Seek(p, SeekOrigin.Current);
}

/// <inheritdoc />
public void Dispose()
{
if(!ownStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition IMO should be if (ownStream) { stream?.Dispose(); }

Copy link
Contributor

@zcsizmadia zcsizmadia Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the ownStream = false in the default constructor, the stream would be disposed using the default constructor, which would make it a breaking change.

stream?.Dispose();
}
}
}
36 changes: 27 additions & 9 deletions lang/csharp/src/apache/main/IO/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ namespace Avro.IO
/// <summary>
/// Write leaf values.
/// </summary>
public class BinaryEncoder : Encoder
public class BinaryEncoder : Encoder, IDisposable
{
private readonly Stream Stream;
private readonly Stream stream;
private readonly bool ownStream;

/// <summary>
/// Initializes a new instance of the <see cref="BinaryEncoder"/> class without a backing
Expand All @@ -40,9 +41,19 @@ public BinaryEncoder() : this(null)
/// the provided stream.
/// </summary>
/// <param name="stream">Stream to write to.</param>
public BinaryEncoder(Stream stream)
public BinaryEncoder(Stream stream) : this(stream, false)
{
this.Stream = stream;
}

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
/// <param name="ownStream">Leave stream open after disposing the object.</param>
public BinaryEncoder(Stream stream, bool ownStream)
{
this.stream = stream;
this.ownStream = ownStream;
}

/// <summary>
Expand Down Expand Up @@ -203,30 +214,37 @@ public void WriteFixed(byte[] data)
/// <inheritdoc/>
public void WriteFixed(byte[] data, int start, int len)
{
Stream.Write(data, start, len);
stream.Write(data, start, len);
}

private void writeBytes(byte[] bytes)
{
Stream.Write(bytes, 0, bytes.Length);
stream.Write(bytes, 0, bytes.Length);
}

private void writeBytes(byte[] bytes, int offset, int length)
{
Stream.Write(bytes, offset, length);
stream.Write(bytes, offset, length);
}

private void writeByte(byte b)
{
Stream.WriteByte(b);
stream.WriteByte(b);
}

/// <summary>
/// Flushes the underlying stream.
/// </summary>
public void Flush()
{
Stream.Flush();
stream.Flush();
}

/// <inheritdoc />
public void Dispose()
{
if (!ownStream)
stream?.Dispose();
}
}
}
24 changes: 13 additions & 11 deletions lang/csharp/src/apache/main/Specific/SpecificReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,22 @@
}
}

var defaultStream = new MemoryStream();
Copy link
Contributor

@zcsizmadia zcsizmadia Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar code like this is in GenericReader and PreresolvingDatumReader. If SpecificReader fixes the MemoryStream leak, those others should be fixed as well.

var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

obj = rec.Get(rf.Pos);
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
obj = rec.Get(rf.Pos);

Check warning

Code scanning / CodeQL

Dereferenced variable may be null Warning

Variable
rec
may be null at this access because of
this
assignment.
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}
Comment on lines +137 to +148

Check notice

Code scanning / CodeQL

Missed opportunity to use Where Note

This foreach loop
implicitly filters its target sequence
- consider filtering the sequence explicitly using '.Where(...)'.
}

return rec;
Expand Down
2 changes: 1 addition & 1 deletion lang/csharp/src/apache/test/File/FileTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
Expand Down Expand Up @@ -555,7 +556,6 @@ private static IEnumerable<TestCaseData> TestPartialReadSource()
/// position in stream
/// </summary>
/// <param name="schemaStr"></param>
/// <param name="value"></param>
/// <param name="codecType"></param>
[TestCaseSource(nameof(TestPartialReadSource))]
public void TestPartialRead(string schemaStr, Codec.Type codecType, int position, int expectedRecords)
Expand Down