Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3802: [Csharp] Fix memory leak on deflate codec decompression #2439

Merged
merged 4 commits into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 5 additions & 24 deletions lang/csharp/src/apache/main/File/DeflateCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,14 @@ public override void Compress(MemoryStream inputStream, MemoryStream outputStrea
/// <inheritdoc/>
public override byte[] Decompress(byte[] compressedData, int length)
{
using (MemoryStream inStream = new MemoryStream(compressedData))
using (MemoryStream inStream = new MemoryStream(compressedData, 0, length))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make MemoryStream inStream read-only by constructing it with writable: false (the default is true). Not important though, as DeflateStream with CompressionMode.Decompress won't try to write to it anyway.

using (MemoryStream outStream = new MemoryStream())
{
using (MemoryStream outStream = new MemoryStream(inStream.Capacity))
using (DeflateStream decompress = new DeflateStream(inStream, CompressionMode.Decompress))
{
using (DeflateStream decompress =
new DeflateStream(inStream,
CompressionMode.Decompress))
{
decompress.CopyTo(outStream, length);
}
return outStream.ToArray();
decompress.CopyTo(outStream);
}
}
}

/// <summary>
/// Copies to stream.
/// </summary>
/// <param name="from">stream you are copying from</param>
/// <param name="to">stream you are copying to</param>
private static void CopyTo(Stream from, Stream to)
{
byte[] buffer = new byte[4096];
int read;
while ((read = from.Read(buffer, 0, buffer.Length)) != 0)
{
to.Write(buffer, 0, read);
return outStream.ToArray();
}
}

Expand Down
26 changes: 14 additions & 12 deletions lang/csharp/src/apache/main/Generic/GenericReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,23 @@
}
}

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}
Fixed Show fixed Hide fixed
}

return rec;
Expand Down
40 changes: 21 additions & 19 deletions lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
var readerDefaultOrdinal = null != readerSchema.Default ? readerSchema.Ordinal(readerSchema.Default) : -1;

foreach (var symbol in writerSchema.Symbols)
{
{
var writerOrdinal = writerSchema.Ordinal(symbol);
if (readerSchema.Contains(symbol))
{
Expand Down Expand Up @@ -274,27 +274,29 @@ private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSch
{
if (writerSchema.Contains(rf.Name)) continue;

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
using (var defaultStream = new MemoryStream())
{
var defaultEncoder = new BinaryEncoder(defaultStream);

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();

var readItem = ResolveReader(rf.Schema, rf.Schema);
var readItem = ResolveReader(rf.Schema, rf.Schema);

var rfInstance = rf;
if(IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream( defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
var rfInstance = rf;
if (IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
}
}

Expand Down
19 changes: 17 additions & 2 deletions lang/csharp/src/apache/main/IO/BinaryDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,25 @@ namespace Avro.IO
public partial class BinaryDecoder : Decoder, IDisposable
{
private readonly Stream stream;
private readonly bool ownStream;

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
public BinaryDecoder(Stream stream)
public BinaryDecoder(Stream stream) : this(stream, false)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
/// <param name="ownStream">Leave stream open after disposing the object.</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me the ownStream name indicates that this object (BinaryDecoder) will now takes ownership of the stream. So ownStream means "Stream is disposed when this object is disposed".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StreamReader constructor has a bool leaveOpen parameter for a similar purpose. The default value is false.

The HttpClient constructor instead has a bool disposeHandler parameter. The default value is true.

To preserve compatibility with earlier Avro versions while being easy to understand, I think this should be bool disposeStream and default to false. The caller tells BinaryDecoder whether BinaryDecoder should dispose the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, it could be bool leaveOpen and default to true; but I feel false is generally nicer than true as a default value of a bool parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes a lot of sense. Will change the descriptions and Dispose logic of the encoder and decoder

public BinaryDecoder(Stream stream, bool ownStream)
{
this.stream = stream;
this.ownStream = ownStream;
}

/// <summary>
Expand Down Expand Up @@ -298,6 +309,10 @@ private void Skip(long p)
}

/// <inheritdoc />
public void Dispose() => stream?.Dispose();
public void Dispose()
{
if(!ownStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition IMO should be if (ownStream) { stream?.Dispose(); }

Copy link
Contributor

@zcsizmadia zcsizmadia Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the ownStream = false in the default constructor, the stream would be disposed using the default constructor, which would make it a breaking change.

stream?.Dispose();
}
}
}
33 changes: 24 additions & 9 deletions lang/csharp/src/apache/main/IO/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ namespace Avro.IO
/// </summary>
public class BinaryEncoder : Encoder, IDisposable
{
private readonly Stream Stream;
private readonly Stream stream;
private readonly bool ownStream;

/// <summary>
/// Initializes a new instance of the <see cref="BinaryEncoder"/> class without a backing
Expand All @@ -40,9 +41,19 @@ public BinaryEncoder() : this(null)
/// the provided stream.
/// </summary>
/// <param name="stream">Stream to write to.</param>
public BinaryEncoder(Stream stream)
public BinaryEncoder(Stream stream) : this(stream, false)
{
this.Stream = stream;
}

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
/// <param name="ownStream">Leave stream open after disposing the object.</param>
public BinaryEncoder(Stream stream, bool ownStream)
{
this.stream = stream;
this.ownStream = ownStream;
}

/// <summary>
Expand Down Expand Up @@ -203,33 +214,37 @@ public void WriteFixed(byte[] data)
/// <inheritdoc/>
public void WriteFixed(byte[] data, int start, int len)
{
Stream.Write(data, start, len);
stream.Write(data, start, len);
}

private void writeBytes(byte[] bytes)
{
Stream.Write(bytes, 0, bytes.Length);
stream.Write(bytes, 0, bytes.Length);
}

private void writeBytes(byte[] bytes, int offset, int length)
{
Stream.Write(bytes, offset, length);
stream.Write(bytes, offset, length);
}

private void writeByte(byte b)
{
Stream.WriteByte(b);
stream.WriteByte(b);
}

/// <summary>
/// Flushes the underlying stream.
/// </summary>
public void Flush()
{
Stream.Flush();
stream.Flush();
}

/// <inheritdoc />
public void Dispose() => Stream?.Dispose();
public void Dispose()
{
if (!ownStream)
stream?.Dispose();
}
}
}
49 changes: 0 additions & 49 deletions lang/csharp/src/apache/test/File/FileTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -668,55 +668,6 @@ public void TestPartialReadAll([Values(specificSchema)] string schemaStr, [Value
}
}

[Test]
public void TestDeflateReadMemoryUsage([Values(specificSchema)] string schemaStr)
{
// create and write out
IList<Foo> records = MakeRecords(GetTestFooObject());

Process currentProcess = Process.GetCurrentProcess();

MemoryStream dataFileOutputStream = new MemoryStream();

Schema schema = Schema.Parse(schemaStr);
DatumWriter<Foo> writer = new SpecificWriter<Foo>(schema);
using (IFileWriter<Foo> dataFileWriter = DataFileWriter<Foo>.OpenWriter(writer, dataFileOutputStream, Codec.CreateCodec(Codec.Type.Deflate)))
{
for (int i = 0; i < 10; ++i)
{
foreach (Foo foo in records)
{
dataFileWriter.Append(foo);
}

// write out block
if (i == 1 || i == 4)
{
dataFileWriter.Sync();
}
}
}

long startMemoryUsedBytes = currentProcess.WorkingSet64;

MemoryStream dataFileInputStream = new MemoryStream(dataFileOutputStream.ToArray());
dataFileInputStream.Position = 0;

// read back
IList<Foo> readRecords = new List<Foo>();
using (IFileReader<Foo> reader = DataFileReader<Foo>.OpenReader(dataFileInputStream, schema))
{
// read records from synced position
foreach (Foo rec in reader.NextEntries)
readRecords.Add(rec);
}

long totalMemoryUsedBytes = currentProcess.WorkingSet64 - startMemoryUsedBytes;

Assert.IsTrue(totalMemoryUsedBytes == 0, "Total memory usage in working set");

}

/// <summary>
/// Test leaveOpen flag
/// </summary>
Expand Down
Loading