Skip to content

Commit

Permalink
AVRO-3802: [Csharp] Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Camil Abraham committed Aug 16, 2023
1 parent e00d66f commit d239dcc
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 115 deletions.
29 changes: 5 additions & 24 deletions lang/csharp/src/apache/main/File/DeflateCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,14 @@ public override void Compress(MemoryStream inputStream, MemoryStream outputStrea
/// <inheritdoc/>
public override byte[] Decompress(byte[] compressedData, int length)
{
using (MemoryStream inStream = new MemoryStream(compressedData))
using (MemoryStream inStream = new MemoryStream(compressedData, 0, length))
using (MemoryStream outStream = new MemoryStream())
{
using (MemoryStream outStream = new MemoryStream(inStream.Capacity))
using (DeflateStream decompress = new DeflateStream(inStream, CompressionMode.Decompress))
{
using (DeflateStream decompress =
new DeflateStream(inStream,
CompressionMode.Decompress))
{
decompress.CopyTo(outStream, length);
}
return outStream.ToArray();
decompress.CopyTo(outStream);
}
}
}

/// <summary>
/// Copies to stream.
/// </summary>
/// <param name="from">stream you are copying from</param>
/// <param name="to">stream you are copying to</param>
private static void CopyTo(Stream from, Stream to)
{
byte[] buffer = new byte[4096];
int read;
while ((read = from.Read(buffer, 0, buffer.Length)) != 0)
{
to.Write(buffer, 0, read);
return outStream.ToArray();
}
}

Expand Down
26 changes: 14 additions & 12 deletions lang/csharp/src/apache/main/Generic/GenericReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,23 @@ protected virtual object ReadRecord(object reuse, RecordSchema writerSchema, Sch
}
}

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}

Check notice

Code scanning / CodeQL

Missed opportunity to use Where Note

This foreach loop
implicitly filters its target sequence
- consider filtering the sequence explicitly using '.Where(...)'.
}

return rec;
Expand Down
40 changes: 21 additions & 19 deletions lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
var readerDefaultOrdinal = null != readerSchema.Default ? readerSchema.Ordinal(readerSchema.Default) : -1;

foreach (var symbol in writerSchema.Symbols)
{
{
var writerOrdinal = writerSchema.Ordinal(symbol);
if (readerSchema.Contains(symbol))
{
Expand Down Expand Up @@ -274,27 +274,29 @@ private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSch
{
if (writerSchema.Contains(rf.Name)) continue;

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
using (var defaultStream = new MemoryStream())
{
var defaultEncoder = new BinaryEncoder(defaultStream);

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();

var readItem = ResolveReader(rf.Schema, rf.Schema);
var readItem = ResolveReader(rf.Schema, rf.Schema);

var rfInstance = rf;
if(IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream( defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
var rfInstance = rf;
if (IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
}
}

Expand Down
19 changes: 17 additions & 2 deletions lang/csharp/src/apache/main/IO/BinaryDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,25 @@ namespace Avro.IO
public partial class BinaryDecoder : Decoder, IDisposable
{
private readonly Stream stream;
private readonly bool ownStream;

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
public BinaryDecoder(Stream stream)
public BinaryDecoder(Stream stream) : this(stream, false)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
/// <param name="ownStream">Leave stream open after disposing the object.</param>
public BinaryDecoder(Stream stream, bool ownStream)
{
this.stream = stream;
this.ownStream = ownStream;
}

/// <summary>
Expand Down Expand Up @@ -298,6 +309,10 @@ private void Skip(long p)
}

/// <inheritdoc />
public void Dispose() => stream?.Dispose();
public void Dispose()
{
if(!ownStream)
stream?.Dispose();
}
}
}
33 changes: 24 additions & 9 deletions lang/csharp/src/apache/main/IO/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ namespace Avro.IO
/// </summary>
public class BinaryEncoder : Encoder, IDisposable
{
private readonly Stream Stream;
private readonly Stream stream;
private readonly bool ownStream;

/// <summary>
/// Initializes a new instance of the <see cref="BinaryEncoder"/> class without a backing
Expand All @@ -40,9 +41,19 @@ public BinaryEncoder() : this(null)
/// the provided stream.
/// </summary>
/// <param name="stream">Stream to write to.</param>
public BinaryEncoder(Stream stream)
public BinaryEncoder(Stream stream) : this(stream, false)
{
this.Stream = stream;
}

/// <summary>
/// Initializes a new instance of the <see cref="BinaryDecoder"/> class.
/// </summary>
/// <param name="stream">Stream to decode.</param>
/// <param name="ownStream">Leave stream open after disposing the object.</param>
public BinaryEncoder(Stream stream, bool ownStream)
{
this.stream = stream;
this.ownStream = ownStream;
}

/// <summary>
Expand Down Expand Up @@ -203,33 +214,37 @@ public void WriteFixed(byte[] data)
/// <inheritdoc/>
public void WriteFixed(byte[] data, int start, int len)
{
Stream.Write(data, start, len);
stream.Write(data, start, len);
}

private void writeBytes(byte[] bytes)
{
Stream.Write(bytes, 0, bytes.Length);
stream.Write(bytes, 0, bytes.Length);
}

private void writeBytes(byte[] bytes, int offset, int length)
{
Stream.Write(bytes, offset, length);
stream.Write(bytes, offset, length);
}

private void writeByte(byte b)
{
Stream.WriteByte(b);
stream.WriteByte(b);
}

/// <summary>
/// Flushes the underlying stream.
/// </summary>
public void Flush()
{
Stream.Flush();
stream.Flush();
}

/// <inheritdoc />
public void Dispose() => Stream?.Dispose();
public void Dispose()
{
if (!ownStream)
stream?.Dispose();
}
}
}
49 changes: 0 additions & 49 deletions lang/csharp/src/apache/test/File/FileTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -668,55 +668,6 @@ public void TestPartialReadAll([Values(specificSchema)] string schemaStr, [Value
}
}

[Test]
public void TestDeflateReadMemoryUsage([Values(specificSchema)] string schemaStr)
{
// create and write out
IList<Foo> records = MakeRecords(GetTestFooObject());

Process currentProcess = Process.GetCurrentProcess();

MemoryStream dataFileOutputStream = new MemoryStream();

Schema schema = Schema.Parse(schemaStr);
DatumWriter<Foo> writer = new SpecificWriter<Foo>(schema);
using (IFileWriter<Foo> dataFileWriter = DataFileWriter<Foo>.OpenWriter(writer, dataFileOutputStream, Codec.CreateCodec(Codec.Type.Deflate)))
{
for (int i = 0; i < 10; ++i)
{
foreach (Foo foo in records)
{
dataFileWriter.Append(foo);
}

// write out block
if (i == 1 || i == 4)
{
dataFileWriter.Sync();
}
}
}

long startMemoryUsedBytes = currentProcess.WorkingSet64;

MemoryStream dataFileInputStream = new MemoryStream(dataFileOutputStream.ToArray());
dataFileInputStream.Position = 0;

// read back
IList<Foo> readRecords = new List<Foo>();
using (IFileReader<Foo> reader = DataFileReader<Foo>.OpenReader(dataFileInputStream, schema))
{
// read records from synced position
foreach (Foo rec in reader.NextEntries)
readRecords.Add(rec);
}

long totalMemoryUsedBytes = currentProcess.WorkingSet64 - startMemoryUsedBytes;

Assert.IsTrue(totalMemoryUsedBytes == 0, "Total memory usage in working set");

}

/// <summary>
/// Test leaveOpen flag
/// </summary>
Expand Down

0 comments on commit d239dcc

Please sign in to comment.