Skip to content

Commit

Permalink
AVRO-3802: [Csharp] Fix memory leak on avro deflate codec decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
Camil Abraham committed Aug 14, 2023
1 parent e6d1804 commit e00d66f
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 24 deletions.
21 changes: 11 additions & 10 deletions lang/csharp/src/apache/main/File/DeflateCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ public override void Compress(MemoryStream inputStream, MemoryStream outputStrea
/// <inheritdoc/>
public override byte[] Decompress(byte[] compressedData, int length)
{

MemoryStream inStream = new MemoryStream(compressedData);
MemoryStream outStream = new MemoryStream();

using (DeflateStream Decompress =
new DeflateStream(inStream,
CompressionMode.Decompress))
using (MemoryStream inStream = new MemoryStream(compressedData))
{
CopyTo(Decompress, outStream);
using (MemoryStream outStream = new MemoryStream(inStream.Capacity))
{
using (DeflateStream decompress =
new DeflateStream(inStream,
CompressionMode.Decompress))
{
decompress.CopyTo(outStream, length);
}
return outStream.ToArray();
}
}

return outStream.ToArray();
}

/// <summary>
Expand Down
5 changes: 4 additions & 1 deletion lang/csharp/src/apache/main/IO/BinaryDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Avro.IO
/// <summary>
/// Decoder for Avro binary format
/// </summary>
public partial class BinaryDecoder : Decoder
public partial class BinaryDecoder : Decoder, IDisposable
{
private readonly Stream stream;

Expand Down Expand Up @@ -296,5 +296,8 @@ private void Skip(long p)
{
stream.Seek(p, SeekOrigin.Current);
}

/// <inheritdoc />
public void Dispose() => stream?.Dispose();
}
}
5 changes: 4 additions & 1 deletion lang/csharp/src/apache/main/IO/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Avro.IO
/// <summary>
/// Write leaf values.
/// </summary>
public class BinaryEncoder : Encoder
public class BinaryEncoder : Encoder, IDisposable
{
private readonly Stream Stream;

Expand Down Expand Up @@ -228,5 +228,8 @@ public void Flush()
{
Stream.Flush();
}

/// <inheritdoc />
public void Dispose() => Stream?.Dispose();
}
}
24 changes: 13 additions & 11 deletions lang/csharp/src/apache/main/Specific/SpecificReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,22 @@ protected override object ReadRecord(object reuse, RecordSchema writerSchema, Sc
}
}

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

obj = rec.Get(rf.Pos);
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
obj = rec.Get(rf.Pos);

Check warning

Code scanning / CodeQL

Dereferenced variable may be null Warning

Variable
rec
may be null at this access because of
this
assignment.
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}

Check notice

Code scanning / CodeQL

Missed opportunity to use Where Note

This foreach loop
implicitly filters its target sequence
- consider filtering the sequence explicitly using '.Where(...)'.
}

return rec;
Expand Down
51 changes: 50 additions & 1 deletion lang/csharp/src/apache/test/File/FileTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
Expand Down Expand Up @@ -555,7 +556,6 @@ private static IEnumerable<TestCaseData> TestPartialReadSource()
/// position in stream
/// </summary>
/// <param name="schemaStr"></param>
/// <param name="value"></param>
/// <param name="codecType"></param>
[TestCaseSource(nameof(TestPartialReadSource))]
public void TestPartialRead(string schemaStr, Codec.Type codecType, int position, int expectedRecords)
Expand Down Expand Up @@ -668,6 +668,55 @@ public void TestPartialReadAll([Values(specificSchema)] string schemaStr, [Value
}
}

[Test]
public void TestDeflateReadMemoryUsage([Values(specificSchema)] string schemaStr)
{
// create and write out
IList<Foo> records = MakeRecords(GetTestFooObject());

Process currentProcess = Process.GetCurrentProcess();

MemoryStream dataFileOutputStream = new MemoryStream();

Schema schema = Schema.Parse(schemaStr);
DatumWriter<Foo> writer = new SpecificWriter<Foo>(schema);
using (IFileWriter<Foo> dataFileWriter = DataFileWriter<Foo>.OpenWriter(writer, dataFileOutputStream, Codec.CreateCodec(Codec.Type.Deflate)))
{
for (int i = 0; i < 10; ++i)
{
foreach (Foo foo in records)
{
dataFileWriter.Append(foo);
}

// write out block
if (i == 1 || i == 4)
{
dataFileWriter.Sync();
}
}
}

long startMemoryUsedBytes = currentProcess.WorkingSet64;

MemoryStream dataFileInputStream = new MemoryStream(dataFileOutputStream.ToArray());
dataFileInputStream.Position = 0;

// read back
IList<Foo> readRecords = new List<Foo>();

Check failure

Code scanning / CodeQL

Container contents are never accessed Error test

The contents of this container are never accessed.
using (IFileReader<Foo> reader = DataFileReader<Foo>.OpenReader(dataFileInputStream, schema))
{
// read records from synced position
foreach (Foo rec in reader.NextEntries)
readRecords.Add(rec);
}

long totalMemoryUsedBytes = currentProcess.WorkingSet64 - startMemoryUsedBytes;

Assert.IsTrue(totalMemoryUsedBytes == 0, "Total memory usage in working set");

}

/// <summary>
/// Test leaveOpen flag
/// </summary>
Expand Down

0 comments on commit e00d66f

Please sign in to comment.