Skip to content
6 changes: 3 additions & 3 deletions src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ private void Append(ReadOnlySpan<byte> value)
_offsetsBuffers.Add(mutableOffsetsBuffer);
mutableOffsetsBuffer.Append(0);
}
mutableDataBuffer.EnsureCapacity(value.Length);
value.CopyTo(mutableDataBuffer.RawSpan.Slice(mutableDataBuffer.Length));
mutableDataBuffer.Length += value.Length;
var startIndex = mutableDataBuffer.Length;
mutableDataBuffer.IncreaseSize(value.Length);
value.CopyTo(mutableDataBuffer.RawSpan.Slice(startIndex));
mutableOffsetsBuffer.Append(mutableOffsetsBuffer[mutableOffsetsBuffer.Length - 1] + value.Length);
}
SetValidityBit(Length - 1, value != default);
Expand Down
31 changes: 22 additions & 9 deletions src/Microsoft.Data.Analysis/DataFrameBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace Microsoft.Data.Analysis
internal class DataFrameBuffer<T> : ReadOnlyDataFrameBuffer<T>
where T : unmanaged
{
private const int MinCapacity = 8;

private Memory<byte> _memory;

public override ReadOnlyMemory<byte> ReadOnlyBuffer => _memory;
Expand All @@ -36,24 +38,35 @@ public Span<T> RawSpan
get => MemoryMarshal.Cast<byte, T>(Buffer.Span);
}

public DataFrameBuffer(int numberOfValues = 8) : base(numberOfValues) { }
public DataFrameBuffer(int capacity = 0)
{
if ((long)capacity * Size > MaxCapacity)
{
throw new ArgumentException($"{capacity} exceeds buffer capacity", nameof(capacity));
}

_memory = new byte[Math.Max(capacity, MinCapacity)];
}

internal DataFrameBuffer(ReadOnlyMemory<byte> buffer, int length) : base(buffer, length)
internal DataFrameBuffer(ReadOnlyMemory<byte> buffer, int length)
{
_memory = new byte[buffer.Length];
buffer.CopyTo(_memory);
Length = length;
}

public void Append(T value)
{
if (Length == MaxCapacity)
{
throw new ArgumentException("Current buffer is full", nameof(value));
}
EnsureCapacity(1);
if (Length < MaxCapacity)
++Length;
Span[Length - 1] = value;

RawSpan[Length] = value;
Length++;
}

public void IncreaseSize(int numberOfValues)
{
EnsureCapacity(numberOfValues);
Length += numberOfValues;
}

public void EnsureCapacity(int numberOfValues)
Expand Down
65 changes: 20 additions & 45 deletions src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ public PrimitiveColumnContainer(ReadOnlyMemory<byte> buffer, ReadOnlyMemory<byte
ReadOnlyDataFrameBuffer<T> dataBuffer;
if (buffer.IsEmpty)
{
DataFrameBuffer<T> mutableBuffer = new DataFrameBuffer<T>();
mutableBuffer.EnsureCapacity(length);
mutableBuffer.Length = length;
DataFrameBuffer<T> mutableBuffer = new DataFrameBuffer<T>(length);
mutableBuffer.IncreaseSize(length);
mutableBuffer.RawSpan.Fill(default(T));
dataBuffer = mutableBuffer;
}
Expand Down Expand Up @@ -172,15 +171,12 @@ public void AppendMany(T? value, long count)

//Calculate how many values we can additionaly allocate and not exceed the MaxCapacity
int allocatable = (int)Math.Min(remaining, ReadOnlyDataFrameBuffer<T>.MaxCapacity - mutableLastBuffer.Length);
mutableLastBuffer.EnsureCapacity(allocatable);
mutableLastBuffer.IncreaseSize(allocatable);

DataFrameBuffer<byte> lastNullBitMapBuffer = NullBitMapBuffers.GetOrCreateMutable(NullBitMapBuffers.Count - 1);
int nullBufferAllocatable = (allocatable + 7) / 8;
lastNullBitMapBuffer.EnsureCapacity(nullBufferAllocatable);
lastNullBitMapBuffer.IncreaseSize(nullBufferAllocatable);


mutableLastBuffer.Length += allocatable;
lastNullBitMapBuffer.Length += nullBufferAllocatable;
Length += allocatable;

if (value.HasValue)
Expand Down Expand Up @@ -436,13 +432,8 @@ private List<ReadOnlyDataFrameBuffer<byte>> CloneNullBitMapBuffers()
List<ReadOnlyDataFrameBuffer<byte>> ret = new List<ReadOnlyDataFrameBuffer<byte>>();
foreach (ReadOnlyDataFrameBuffer<byte> buffer in NullBitMapBuffers)
{
DataFrameBuffer<byte> newBuffer = new DataFrameBuffer<byte>();
DataFrameBuffer<byte> newBuffer = new DataFrameBuffer<byte>(buffer.ReadOnlyBuffer, buffer.Length);
ret.Add(newBuffer);
ReadOnlySpan<byte> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
newBuffer.Append(span[i]);
}
}
return ret;
}
Expand Down Expand Up @@ -518,14 +509,9 @@ public PrimitiveColumnContainer<T> Clone()
var ret = new PrimitiveColumnContainer<T>();
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
DataFrameBuffer<T> newBuffer = new DataFrameBuffer<T>();
DataFrameBuffer<T> newBuffer = new DataFrameBuffer<T>(buffer.ReadOnlyBuffer, buffer.Length);
ret.Buffers.Add(newBuffer);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
ret.Length += buffer.Length;
for (int i = 0; i < span.Length; i++)
{
newBuffer.Append(span[i]);
}
}
ret.NullBitMapBuffers = CloneNullBitMapBuffers();
ret.NullCount = NullCount;
Expand All @@ -537,9 +523,10 @@ internal PrimitiveColumnContainer<bool> CloneAsBoolContainer()
var ret = new PrimitiveColumnContainer<bool>();
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
DataFrameBuffer<bool> newBuffer = new DataFrameBuffer<bool>();
DataFrameBuffer<bool> newBuffer = new DataFrameBuffer<bool>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
newBuffer.IncreaseSize(buffer.Length);

if (typeof(T) == typeof(bool))
{
var localBuffer = buffer;
Expand All @@ -550,7 +537,6 @@ internal PrimitiveColumnContainer<bool> CloneAsBoolContainer()
{
newBuffer.Span.Fill(false);
}
newBuffer.Length = buffer.Length;
ret.Length += buffer.Length;
}
ret.NullBitMapBuffers = CloneNullBitMapBuffers();
Expand All @@ -564,9 +550,8 @@ internal PrimitiveColumnContainer<byte> CloneAsByteContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<byte> newBuffer = new DataFrameBuffer<byte>();
DataFrameBuffer<byte> newBuffer = new DataFrameBuffer<byte>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -584,9 +569,8 @@ internal PrimitiveColumnContainer<sbyte> CloneAsSByteContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<sbyte> newBuffer = new DataFrameBuffer<sbyte>();
DataFrameBuffer<sbyte> newBuffer = new DataFrameBuffer<sbyte>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -604,9 +588,8 @@ internal PrimitiveColumnContainer<double> CloneAsDoubleContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<double> newBuffer = new DataFrameBuffer<double>();
DataFrameBuffer<double> newBuffer = new DataFrameBuffer<double>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -624,9 +607,8 @@ internal PrimitiveColumnContainer<decimal> CloneAsDecimalContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<decimal> newBuffer = new DataFrameBuffer<decimal>();
DataFrameBuffer<decimal> newBuffer = new DataFrameBuffer<decimal>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -644,9 +626,8 @@ internal PrimitiveColumnContainer<short> CloneAsShortContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<short> newBuffer = new DataFrameBuffer<short>();
DataFrameBuffer<short> newBuffer = new DataFrameBuffer<short>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -664,9 +645,8 @@ internal PrimitiveColumnContainer<ushort> CloneAsUShortContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<ushort> newBuffer = new DataFrameBuffer<ushort>();
DataFrameBuffer<ushort> newBuffer = new DataFrameBuffer<ushort>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -684,9 +664,8 @@ internal PrimitiveColumnContainer<int> CloneAsIntContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<int> newBuffer = new DataFrameBuffer<int>();
DataFrameBuffer<int> newBuffer = new DataFrameBuffer<int>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -704,9 +683,8 @@ internal PrimitiveColumnContainer<uint> CloneAsUIntContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<uint> newBuffer = new DataFrameBuffer<uint>();
DataFrameBuffer<uint> newBuffer = new DataFrameBuffer<uint>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -724,9 +702,8 @@ internal PrimitiveColumnContainer<long> CloneAsLongContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<long> newBuffer = new DataFrameBuffer<long>();
DataFrameBuffer<long> newBuffer = new DataFrameBuffer<long>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -744,9 +721,8 @@ internal PrimitiveColumnContainer<ulong> CloneAsULongContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<ulong> newBuffer = new DataFrameBuffer<ulong>();
DataFrameBuffer<ulong> newBuffer = new DataFrameBuffer<ulong>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand All @@ -764,9 +740,8 @@ internal PrimitiveColumnContainer<float> CloneAsFloatContainer()
foreach (ReadOnlyDataFrameBuffer<T> buffer in Buffers)
{
ret.Length += buffer.Length;
DataFrameBuffer<float> newBuffer = new DataFrameBuffer<float>();
DataFrameBuffer<float> newBuffer = new DataFrameBuffer<float>(buffer.Length);
ret.Buffers.Add(newBuffer);
newBuffer.EnsureCapacity(buffer.Length);
ReadOnlySpan<T> span = buffer.ReadOnlySpan;
for (int i = 0; i < span.Length; i++)
{
Expand Down
Loading