Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3802: [Csharp] Fix memory leak on deflate codec decompression #2439

Merged
merged 4 commits into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 7 additions & 25 deletions lang/csharp/src/apache/main/File/DeflateCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,14 @@ public override void Compress(MemoryStream inputStream, MemoryStream outputStrea
/// <inheritdoc/>
public override byte[] Decompress(byte[] compressedData, int length)
{

MemoryStream inStream = new MemoryStream(compressedData);
MemoryStream outStream = new MemoryStream();

using (DeflateStream Decompress =
new DeflateStream(inStream,
CompressionMode.Decompress))
{
CopyTo(Decompress, outStream);
}

return outStream.ToArray();
}

/// <summary>
/// Copies to stream.
/// </summary>
/// <param name="from">stream you are copying from</param>
/// <param name="to">stream you are copying to</param>
private static void CopyTo(Stream from, Stream to)
{
byte[] buffer = new byte[4096];
int read;
while ((read = from.Read(buffer, 0, buffer.Length)) != 0)
using (MemoryStream inStream = new MemoryStream(compressedData, 0, length))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make MemoryStream inStream read-only by constructing it with writable: false (the default is true). Not important though, as DeflateStream with CompressionMode.Decompress won't try to write to it anyway.

using (MemoryStream outStream = new MemoryStream())
{
to.Write(buffer, 0, read);
using (DeflateStream decompress = new DeflateStream(inStream, CompressionMode.Decompress))
{
decompress.CopyTo(outStream);
}
return outStream.ToArray();
}
}

Expand Down
29 changes: 15 additions & 14 deletions lang/csharp/src/apache/main/Generic/GenericReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using Avro.IO;
using System.IO;
using System.Linq;

namespace Avro.Generic
{
Expand Down Expand Up @@ -290,21 +291,21 @@ protected virtual object ReadRecord(object reuse, RecordSchema writerSchema, Sch
}
}

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs.Fields.Where(rf => !writerSchema.Contains(rf.Name)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CodeQL warnings like this, I ussuallyh just ignore, since it is really not a concern in a legacy code like avro. However thanks for getting rid of it ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this change makes the code slower, as it now has to allocate a closure and a delegate, and the calls become indirect too. Roslyn contribution guidelines advise "Avoid LINQ" for that reason. So I'd prefer reverting this and disabling the CodeQL warning.

{
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}
}

return rec;
Expand Down
40 changes: 21 additions & 19 deletions lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
var readerDefaultOrdinal = null != readerSchema.Default ? readerSchema.Ordinal(readerSchema.Default) : -1;

foreach (var symbol in writerSchema.Symbols)
{
{
var writerOrdinal = writerSchema.Ordinal(symbol);
if (readerSchema.Contains(symbol))
{
Expand Down Expand Up @@ -274,27 +274,29 @@ private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSch
{
if (writerSchema.Contains(rf.Name)) continue;

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
using (var defaultStream = new MemoryStream())
{
var defaultEncoder = new BinaryEncoder(defaultStream);

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();

var readItem = ResolveReader(rf.Schema, rf.Schema);
var readItem = ResolveReader(rf.Schema, rf.Schema);

var rfInstance = rf;
if(IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream( defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
var rfInstance = rf;
if (IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
}
}

Expand Down
14 changes: 7 additions & 7 deletions lang/csharp/src/apache/main/IO/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Avro.IO
/// </summary>
public class BinaryEncoder : Encoder
{
private readonly Stream Stream;
private readonly Stream stream;

/// <summary>
/// Initializes a new instance of the <see cref="BinaryEncoder"/> class without a backing
Expand All @@ -42,7 +42,7 @@ public BinaryEncoder() : this(null)
/// <param name="stream">Stream to write to.</param>
public BinaryEncoder(Stream stream)
{
this.Stream = stream;
this.stream = stream;
}

/// <summary>
Expand Down Expand Up @@ -203,30 +203,30 @@ public void WriteFixed(byte[] data)
/// <inheritdoc/>
public void WriteFixed(byte[] data, int start, int len)
{
Stream.Write(data, start, len);
stream.Write(data, start, len);
}

private void writeBytes(byte[] bytes)
{
Stream.Write(bytes, 0, bytes.Length);
stream.Write(bytes, 0, bytes.Length);
}

private void writeBytes(byte[] bytes, int offset, int length)
{
Stream.Write(bytes, offset, length);
stream.Write(bytes, offset, length);
}

private void writeByte(byte b)
{
Stream.WriteByte(b);
stream.WriteByte(b);
}

/// <summary>
/// Flushes the underlying stream.
/// </summary>
public void Flush()
{
Stream.Flush();
stream.Flush();
}
}
}
24 changes: 13 additions & 11 deletions lang/csharp/src/apache/main/Specific/SpecificReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,22 @@
}
}

var defaultStream = new MemoryStream();
Copy link
Contributor

@zcsizmadia zcsizmadia Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar code like this is in GenericReader and PreresolvingDatumReader. If SpecificReader fixes the MemoryStream leak, those others should be fixed as well.

var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

obj = rec.Get(rf.Pos);
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
obj = rec.Get(rf.Pos);

Check warning

Code scanning / CodeQL

Dereferenced variable may be null Warning

Variable
rec
may be null at this access because of
this
assignment.
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}
Comment on lines +137 to +148

Check notice

Code scanning / CodeQL

Missed opportunity to use Where Note

This foreach loop
implicitly filters its target sequence
- consider filtering the sequence explicitly using '.Where(...)'.
}

return rec;
Expand Down
2 changes: 1 addition & 1 deletion lang/csharp/src/apache/test/File/FileTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
Expand Down Expand Up @@ -555,7 +556,6 @@ private static IEnumerable<TestCaseData> TestPartialReadSource()
/// position in stream
/// </summary>
/// <param name="schemaStr"></param>
/// <param name="value"></param>
/// <param name="codecType"></param>
[TestCaseSource(nameof(TestPartialReadSource))]
public void TestPartialRead(string schemaStr, Codec.Type codecType, int position, int expectedRecords)
Expand Down