Skip to content

Commit

Permalink
bugfix/respect-ordering-file-write (#79)
Browse files Browse the repository at this point in the history
* done

* improve

* minor adjusts
  • Loading branch information
leandromoh authored Oct 17, 2023
1 parent cac21c4 commit 569d86e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 104 deletions.
2 changes: 0 additions & 2 deletions RecordParser.Benchmark/VariableLengthWriterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@
using RecordParser.Extensions.FileWriter;
using SoftCircuits.CsvParser;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
Expand Down
75 changes: 14 additions & 61 deletions RecordParser.Test/FileWriterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace RecordParser.Test
public class FileWriterTest : TestSetup
{
private static readonly IEnumerable<int> _repeats = new[] { 0, 1, 3, 1_000, 10_000 };
private const int MaxParallelism = 4;

public static IEnumerable<object[]> Repeats()
{
Expand All @@ -31,6 +32,9 @@ public static IEnumerable<object[]> Repeats()
}
}

// the fixed-length file scenario is already covered in the test bellow,
// because "WriteRecords" method dont matters what parser is used,
// since it just receives a delegate
[Theory]
[MemberData(nameof(Repeats))]
public void Write_csv_file(int repeat, bool parallel, bool ordered)
Expand Down Expand Up @@ -67,13 +71,14 @@ public void Write_csv_file(int repeat, bool parallel, bool ordered)
using var memory = new MemoryStream();
using var textWriter = new StreamWriter(memory);

var writeOptions = new ParallelismOptions()
var parallelOptions = new ParallelismOptions()
{
Enabled = parallel,
EnsureOriginalOrdering = ordered,
MaxDegreeOfParallelism = MaxParallelism,
};

textWriter.WriteRecords(expectedItems, writer.TryFormat, writeOptions);
textWriter.WriteRecords(expectedItems, writer.TryFormat, parallelOptions);
textWriter.Flush();

// Assert
Expand All @@ -82,67 +87,15 @@ public void Write_csv_file(int repeat, bool parallel, bool ordered)
using var textReader = new StreamReader(memory);
var readOptions = new VariableLengthReaderOptions()
{
ParallelismOptions = new() { Enabled = parallel }
ParallelismOptions = parallelOptions
};

var reads = textReader.ReadRecords(reader, readOptions);
var items = textReader.ReadRecords(reader, readOptions);

reads.Should().BeEquivalentTo(expectedItems);
}

// the scenario is covered in the above test, because test dont matters what
// parser is used, since it just receives a delegate
//[Theory]
//[MemberData(nameof(Repeats))]
public void Write_fixed_length_file(int repeat, bool parallel, bool ordered)
{
// Arrange

var expectedItems = new Fixture()
.CreateMany<(string Name, DateTime Birthday, decimal Money, Color Color)>(repeat)
.ToList();

var writer = new FixedLengthWriterSequentialBuilder<(string Name, DateTime Birthday, decimal Money, Color Color)>()
.Map(x => x.Name, 50)
.Map(x => x.Birthday, 20, (dest, value) => (value.Ticks.TryFormat(dest, out var written), written))
.Map(x => x.Money, 15)
.Map(x => x.Color, 15)
.Build();

var reader = new FixedLengthReaderSequentialBuilder<(string Name, DateTime Birthday, decimal Money, Color Color)>()
.Map(x => x.Name, 50)
.Map(x => x.Birthday, 20, value => new DateTime(long.Parse(value)))
.Map(x => x.Money, 15)
.Map(x => x.Color, 15)
.Build();

// Act

using var memory = new MemoryStream();
using var textWriter = new StreamWriter(memory);

var writeOptions = new ParallelismOptions()
{
Enabled = parallel,
EnsureOriginalOrdering = ordered
};

textWriter.WriteRecords(expectedItems, writer.TryFormat, writeOptions);
textWriter.Flush();

// Assert

memory.Seek(0, SeekOrigin.Begin);
using var textReader = new StreamReader(memory);
var readOptions = new FixedLengthReaderOptions<(string Name, DateTime Birthday, decimal Money, Color Color)>()
{
Parser = reader.Parse,
ParallelismOptions = new() { Enabled = parallel }
};

var reads = textReader.ReadRecords(readOptions);

reads.Should().BeEquivalentTo(expectedItems);
}
if (ordered)
items.Should().BeEquivalentTo(expectedItems, cfg => cfg.WithStrictOrdering());
else
items.Should().BeEquivalentTo(expectedItems);
}
}
}
80 changes: 39 additions & 41 deletions RecordParser/Extensions/FileWriter/WriterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,56 +66,54 @@ private class BufferContext

private static void WriteParallel<T>(TextWriter textWriter, IEnumerable<T> items, TryFormat<T> tryFormat, ParallelismOptions options)
{
var parallelism = 20; // TODO remove hardcoded
var textWriterLock = new object();
var initialPool = 20;
var pool = new Stack<char[]>(initialPool);

var buffers = Enumerable
.Range(0, parallelism)
.Select(_ => new BufferContext
{
pow = initialPow,
buffer = ArrayPool<char>.Shared.Rent((int)Math.Pow(2, initialPow)),
lockObj = new object()
})
.ToArray();
for (var index = 0; index < initialPool; index++)
pool.Push(ArrayPool<char>.Shared.Rent((int)Math.Pow(2, initialPow)));

try
var xs = items.AsParallel(options).Select((item, i) =>
{
var xs = items.AsParallel(options).Select((item, i) =>
var buffer = Pop() ?? ArrayPool<char>.Shared.Rent((int)Math.Pow(2, initialPow));
retry:

if (tryFormat(item, buffer, out var charsWritten))
{
return (buffer, charsWritten);
}
else
{
var x = buffers[i % parallelism];
ArrayPool<char>.Shared.Return(buffer);
buffer = ArrayPool<char>.Shared.Rent(buffer.Length * 2);
goto retry;
}
});

lock (x.lockObj)
{
retry:

if (tryFormat(item, x.buffer, out var charsWritten))
{
lock (textWriterLock)
{
textWriter.WriteLine(x.buffer, 0, charsWritten);
}
}
else
{
ArrayPool<char>.Shared.Return(x.buffer);
x.pow++;
x.buffer = ArrayPool<char>.Shared.Rent((int)Math.Pow(2, x.pow));
goto retry;
}
}
foreach (var x in xs)
{
textWriter.WriteLine(x.buffer, 0, x.charsWritten);
Push(x.buffer);
}

foreach (var x in pool)
{
ArrayPool<char>.Shared.Return(x);
}

// dummy value
return (string)null;
});
pool.Clear();

// dummy iteration to force evaluation
foreach (var _ in xs) ;
char[] Pop()
{
char[] x;
lock (pool)
pool.TryPop(out x);
return x;
}
finally

void Push(char[] item)
{
foreach (var x in buffers)
ArrayPool<char>.Shared.Return(x.buffer);
lock (pool)
pool.Push(item);
}
}

Expand Down

0 comments on commit 569d86e

Please sign in to comment.