Skip to content

Commit

Permalink
apacheGH-40517: [C#] Fix writing sliced arrays to IPC format (apache#…
Browse files Browse the repository at this point in the history
…41197)

### Rationale for this change

Fixes writing sliced arrays to IPC files or streams, so that they can be successfully read back in. Previously, writing such data would succeed but then couldn't be read.

### What changes are included in this PR?

* Fixes `BinaryViewArray.GetBytes` to account for the array offset
* Fixes `FixedSizeBinaryArray.GetBytes` to account for the array offset
* Updates `ArrowStreamWriter` so that it writes slices of buffers when required, and handles slicing bitmap arrays by creating a copy if the offset isn't a multiple of 8
* Refactors `ArrowStreamWriter`, making the `ArrowRecordBatchFlatBufferBuilder` class responsible for building a list of field nodes as well as buffers. This was required to avoid having to duplicate logic for handling array types with child data between the `ArrowRecordBatchFlatBufferBuilder` class and the `CreateSelfAndChildrenFieldNodes` method, which I've removed.

Note that after this change, we still write more data than required when writing a slice of a `ListArray`, `BinaryArray`, `ListViewArray`, `BinaryViewArray` or `DenseUnionArray`. When writing a `ListArray` for example, we write slices of the null bitmap and value offsets and write the full values array. Ideally we should write a slice of the values and adjust the value offsets so they start at zero. The C++ implementation for example handles this [here](https://github.com/apache/arrow/blob/18c74b0733c9ff473a211259cf10705b2c9be891/cpp/src/arrow/ipc/writer.cc#L316). I will make a follow-up issue for this once this PR is merged.

### Are these changes tested?

Yes, I've added new unit tests for this.

### Are there any user-facing changes?

Yes, this is a user-facing bug fix.
* GitHub Issue: apache#40517

Authored-by: Adam Reeve <adreeve@gmail.com>
Signed-off-by: Curt Hagenlocher <curt@hagenlocher.org>
  • Loading branch information
adamreeve authored and tolleybot committed May 4, 2024
1 parent 0ba37ab commit 0af60f0
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 259 deletions.
151 changes: 6 additions & 145 deletions csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,40 +163,18 @@ public void Visit(BooleanArray array)
public void Visit(ListArray array)
{
_buffers.Add(CreateBitmapBuffer(array.NullBitmapBuffer, array.Offset, array.Length));
_buffers.Add(CreateBuffer(GetZeroBasedValueOffsets(array.ValueOffsetsBuffer, array.Offset, array.Length)));
_buffers.Add(CreateSlicedBuffer<int>(array.ValueOffsetsBuffer, array.Offset, array.Length + 1));

int valuesOffset = 0;
int valuesLength = 0;
if (array.Length > 0)
{
valuesOffset = array.ValueOffsets[0];
valuesLength = array.ValueOffsets[array.Length] - valuesOffset;
}

var values = array.Values;
if (valuesOffset > 0 || valuesLength < values.Length)
{
values = ArrowArrayFactory.Slice(values, valuesOffset, valuesLength);
}

VisitArray(values);
VisitArray(array.Values);
}

public void Visit(ListViewArray array)
{
var (valueOffsetsBuffer, minOffset, maxEnd) = GetZeroBasedListViewOffsets(array);

_buffers.Add(CreateBitmapBuffer(array.NullBitmapBuffer, array.Offset, array.Length));
_buffers.Add(CreateBuffer(valueOffsetsBuffer));
_buffers.Add(CreateSlicedBuffer<int>(array.ValueOffsetsBuffer, array.Offset, array.Length));
_buffers.Add(CreateSlicedBuffer<int>(array.SizesBuffer, array.Offset, array.Length));

IArrowArray values = array.Values;
if (minOffset != 0 || values.Length != maxEnd)
{
values = ArrowArrayFactory.Slice(values, minOffset, maxEnd - minOffset);
}

VisitArray(values);
VisitArray(array.Values);
}

public void Visit(FixedSizeListArray array)
Expand All @@ -217,17 +195,8 @@ public void Visit(FixedSizeListArray array)
public void Visit(BinaryArray array)
{
_buffers.Add(CreateBitmapBuffer(array.NullBitmapBuffer, array.Offset, array.Length));
_buffers.Add(CreateBuffer(GetZeroBasedValueOffsets(array.ValueOffsetsBuffer, array.Offset, array.Length)));

int valuesOffset = 0;
int valuesLength = 0;
if (array.Length > 0)
{
valuesOffset = array.ValueOffsets[0];
valuesLength = array.ValueOffsets[array.Length] - valuesOffset;
}

_buffers.Add(CreateSlicedBuffer<byte>(array.ValueBuffer, valuesOffset, valuesLength));
_buffers.Add(CreateSlicedBuffer<int>(array.ValueOffsetsBuffer, array.Offset, array.Length + 1));
_buffers.Add(CreateBuffer(array.ValueBuffer));
}

public void Visit(BinaryViewArray array)
Expand Down Expand Up @@ -294,91 +263,6 @@ public void Visit(NullArray array)
// There are no buffers for a NullArray
}

private ArrowBuffer GetZeroBasedValueOffsets(ArrowBuffer valueOffsetsBuffer, int arrayOffset, int arrayLength)
{
var requiredBytes = CalculatePaddedBufferLength(sizeof(int) * (arrayLength + 1));

if (arrayOffset != 0)
{
// Array has been sliced, so we need to shift and adjust the offsets
var originalOffsets = valueOffsetsBuffer.Span.CastTo<int>().Slice(arrayOffset, arrayLength + 1);
var firstOffset = arrayLength > 0 ? originalOffsets[0] : 0;

var newValueOffsetsBuffer = _allocator.Allocate(requiredBytes);
var newValueOffsets = newValueOffsetsBuffer.Memory.Span.CastTo<int>();

for (int i = 0; i < arrayLength + 1; ++i)
{
newValueOffsets[i] = originalOffsets[i] - firstOffset;
}

return new ArrowBuffer(newValueOffsetsBuffer);
}
else if (valueOffsetsBuffer.Length > requiredBytes)
{
// Array may have been sliced but the offset is zero,
// so we can truncate the existing offsets
return new ArrowBuffer(valueOffsetsBuffer.Memory.Slice(0, requiredBytes));
}
else
{
// Use the full buffer
return valueOffsetsBuffer;
}
}

private (ArrowBuffer Buffer, int minOffset, int maxEnd) GetZeroBasedListViewOffsets(ListViewArray array)
{
if (array.Length == 0)
{
return (ArrowBuffer.Empty, 0, 0);
}

var offsets = array.ValueOffsets;
var sizes = array.Sizes;

int minOffset = offsets[0];
int maxEnd = offsets[array.Length - 1] + sizes[array.Length - 1];

// Min possible offset is zero, and max possible end is the values length.
// If these match the first offset and last end we don't need to do anything further,
// but otherwise we need to iterate over each index in case the offsets aren't ordered.
if (minOffset != 0 || maxEnd != array.Values.Length)
{
for (int i = 0; i < array.Length; ++i)
{
minOffset = Math.Min(minOffset, offsets[i]);
maxEnd = Math.Max(maxEnd, offsets[i] + sizes[i]);
}
}

var requiredBytes = CalculatePaddedBufferLength(sizeof(int) * array.Length);

if (minOffset == 0)
{
// No need to adjust the offsets, but we may need to slice the offsets buffer.
ArrowBuffer buffer = array.ValueOffsetsBuffer;
if (array.Offset != 0 || buffer.Length > requiredBytes)
{
var byteOffset = sizeof(int) * array.Offset;
var sliceLength = Math.Min(requiredBytes, buffer.Length - byteOffset);
buffer = new ArrowBuffer(buffer.Memory.Slice(byteOffset, sliceLength));
}

return (buffer, minOffset, maxEnd);
}

// Compute shifted offsets
var newOffsetsBuffer = _allocator.Allocate(requiredBytes);
var newOffsets = newOffsetsBuffer.Memory.Span.CastTo<int>();
for (int i = 0; i < array.Length; ++i)
{
newOffsets[i] = offsets[i] - minOffset;
}

return (new ArrowBuffer(newOffsetsBuffer), minOffset, maxEnd);
}

private Buffer CreateBitmapBuffer(ArrowBuffer buffer, int offset, int length)
{
if (buffer.IsEmpty)
Expand Down Expand Up @@ -557,29 +441,6 @@ public ArrowStreamWriter(Stream baseStream, Schema schema, bool leaveOpen, IpcOp
}
}

private void CreateSelfAndChildrenFieldNodes(ArrayData data)
{
if (data.DataType is NestedType)
{
// flatbuffer struct vectors have to be created in reverse order
for (int i = data.Children.Length - 1; i >= 0; i--)
{
CreateSelfAndChildrenFieldNodes(data.Children[i]);
}
}
Flatbuf.FieldNode.CreateFieldNode(Builder, data.Length, data.GetNullCount());
}

private static int CountAllNodes(IReadOnlyList<Field> fields)
{
int count = 0;
foreach (Field arrowArray in fields)
{
CountSelfAndChildrenNodes(arrowArray.DataType, ref count);
}
return count;
}

private Offset<Flatbuf.BodyCompression> GetBodyCompression()
{
if (_options.CompressionCodec == null)
Expand Down
6 changes: 6 additions & 0 deletions csharp/test/Apache.Arrow.Tests/ArrowArrayConcatenatorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ public void TestStandardCases()
{
foreach ((List<IArrowArray> testTargetArrayList, IArrowArray expectedArray) in GenerateTestData())
{
if (expectedArray is UnionArray)
{
// Union array concatenation is incorrect. See https://github.com/apache/arrow/issues/41198
continue;
}

IArrowArray actualArray = ArrowArrayConcatenator.Concatenate(testTargetArrayList);
ArrowReaderVerifier.CompareArrays(expectedArray, actualArray);
}
Expand Down
66 changes: 0 additions & 66 deletions csharp/test/Apache.Arrow.Tests/ArrowFileWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public async Task WritesFooterAlignedMultipleOf8Async()
[InlineData(0, 45)]
[InlineData(3, 45)]
[InlineData(16, 45)]
[InlineData(10, 0)]
public async Task WriteSlicedArrays(int sliceOffset, int sliceLength)
{
var originalBatch = TestData.CreateSampleRecordBatch(length: 100);
Expand All @@ -135,71 +134,6 @@ public async Task WriteSlicedArrays(int sliceOffset, int sliceLength)
await ValidateRecordBatchFile(stream, slicedBatch, strictCompare: false);
}

[Theory]
[InlineData(0, 100)]
[InlineData(0, 50)]
[InlineData(50, 50)]
[InlineData(25, 50)]
public async Task WriteListViewDataWithUnorderedOffsets(int sliceOffset, int sliceLength)
{
// A list-view array doesn't require that offsets are ordered,
// so verify that we can round trip a list-view array with out-of-order offsets.
const int length = 100;
var random = new Random();

var randomizedIndices = Enumerable.Range(0, length).ToArray();
Shuffle(randomizedIndices, random);

var offsetsBuilder = new ArrowBuffer.Builder<int>().Resize(length);
var sizesBuilder = new ArrowBuffer.Builder<int>().Resize(length);
var validityBuilder = new ArrowBuffer.BitmapBuilder().Reserve(length);

var valuesLength = 0;
for (int i = 0; i < length; ++i)
{
var index = randomizedIndices[i];
var listLength = random.Next(0, 10);
offsetsBuilder.Span[index] = valuesLength;
sizesBuilder.Span[index] = listLength;
valuesLength += listLength;

validityBuilder.Append(random.NextDouble() < 0.9);
}

var valuesBuilder = new Int64Array.Builder().Reserve(valuesLength);
for (int i = 0; i < valuesLength; ++i)
{
valuesBuilder.Append(random.Next(0, 1_000));
}

var type = new ListViewType(new Int64Type());
var offsets = offsetsBuilder.Build();
var sizes = sizesBuilder.Build();
var values = valuesBuilder.Build();
var nullCount = validityBuilder.UnsetBitCount;
var validityBuffer = validityBuilder.Build();

IArrowArray listViewArray = new ListViewArray(
type, length, offsets, sizes, values, validityBuffer, nullCount);

if (sliceOffset != 0 || sliceLength != length)
{
listViewArray = ArrowArrayFactory.Slice(listViewArray, sliceOffset, sliceLength);
}

var recordBatch = new RecordBatch.Builder().Append("x", true, listViewArray).Build();

var stream = new MemoryStream();
var writer = new ArrowFileWriter(stream, recordBatch.Schema, leaveOpen: true);

await writer.WriteRecordBatchAsync(recordBatch);
await writer.WriteEndAsync();

stream.Position = 0;

await ValidateRecordBatchFile(stream, recordBatch, strictCompare: false);
}

private async Task ValidateRecordBatchFile(Stream stream, RecordBatch recordBatch, bool strictCompare = true)
{
var reader = new ArrowFileReader(stream);
Expand Down
Loading

0 comments on commit 0af60f0

Please sign in to comment.