Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-555] using 32bit selection vector #556

Merged
merged 2 commits into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.arrow.memory.ArrowBuf;
import java.io.ByteArrayOutputStream;
import java.nio.channels.Channels;
import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16;
import org.apache.arrow.gandiva.evaluator.SelectionVectorInt32;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.ArrowBuffer;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
Expand Down Expand Up @@ -111,7 +111,7 @@ public ArrowRecordBatch process(Schema schema, ArrowRecordBatch recordBatch)
}

public ArrowRecordBatch process(Schema schema, ArrowRecordBatch recordBatch,
SelectionVectorInt16 selectionVector) throws IOException {
SelectionVectorInt32 selectionVector) throws IOException {
int num_rows = recordBatch.getLength();
List<ArrowBuf> buffers = recordBatch.getBuffers();
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
Expand Down Expand Up @@ -158,7 +158,7 @@ public void processAndCacheOne(Schema schema, ArrowRecordBatch recordBatch)
}

public void processAndCacheOne(Schema schema, ArrowRecordBatch recordBatch,
SelectionVectorInt16 selectionVector) throws IOException {
SelectionVectorInt32 selectionVector) throws IOException {
int num_rows = recordBatch.getLength();
List<ArrowBuf> buffers = recordBatch.getBuffers();
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.intel.oap.spark.sql.execution.datasources.v2.arrow.Spiller;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer;
import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16;
import org.apache.arrow.gandiva.evaluator.SelectionVectorInt32;
import org.apache.arrow.gandiva.exceptions.GandivaException;
import org.apache.arrow.gandiva.expression.ExpressionTree;
import org.apache.arrow.gandiva.ipc.GandivaTypes;
Expand Down Expand Up @@ -175,7 +175,7 @@ public ArrowRecordBatch[] evaluate2(ArrowRecordBatch recordBatch) throws Runtime
/**
* Evaluate input data using builded native function, and output as recordBatch.
*/
public ArrowRecordBatch[] evaluate(ArrowRecordBatch recordBatch, SelectionVectorInt16 selectionVector)
public ArrowRecordBatch[] evaluate(ArrowRecordBatch recordBatch, SelectionVectorInt32 selectionVector)
throws RuntimeException, IOException {
List<ArrowBuf> buffers = recordBatch.getBuffers();
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class ColumnarConditionProjector(
projectionSchema,
resultSchema,
fieldNodesList,
SelectionVectorType.SV_INT16)
SelectionVectorType.SV_INT32)
} else {
new FieldOptimizedProjector(projectionSchema, resultSchema, fieldNodesList)
}
Expand Down Expand Up @@ -200,7 +200,7 @@ class ColumnarConditionProjector(
var afterEval: Long = 0
var numRows = 0
var input: ArrowRecordBatch = null
var selectionVector: SelectionVectorInt16 = null
var selectionVector: SelectionVectorInt32 = null
while (numRows == 0) {
if (cbIterator.hasNext) {
columnarBatch = cbIterator.next()
Expand Down Expand Up @@ -230,8 +230,8 @@ class ColumnarConditionProjector(
selectionBuffer.close()
selectionBuffer = null
}
selectionBuffer = allocator.buffer(numRows * 2)
selectionVector = new SelectionVectorInt16(selectionBuffer)
selectionBuffer = allocator.buffer(numRows * 4)
selectionVector = new SelectionVectorInt32(selectionBuffer)
val cols = conditionOrdinalList.map(i => {
columnarBatch.column(i).asInstanceOf[ArrowWritableColumnVector].getValueVector
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,26 @@ class ColumnarFilter (
val projectionNodeList = projectFieldList.map(field => {
TreeBuilder.makeExpression(TreeBuilder.makeField(field), field)
})
val projector = Projector.make(projectArrowSchema, projectionNodeList.asJava, SelectionVectorType.SV_INT16)
val projector = Projector.make(projectArrowSchema, projectionNodeList.asJava, SelectionVectorType.SV_INT32)

def getOrdinalList(): List[Int] = {
conditionOrdinalList
}

def getResultNumRows = resultNumRows

private def evaluate(inputRecordBatch: ArrowRecordBatch): SelectionVectorInt16 = {
private def evaluate(inputRecordBatch: ArrowRecordBatch): SelectionVectorInt32 = {
if (selectionBuffer != null) {
selectionBuffer.close()
selectionBuffer = null
}
selectionBuffer = allocator.buffer(inputRecordBatch.getLength * 2)
val selectionVector = new SelectionVectorInt16(selectionBuffer)
selectionBuffer = allocator.buffer(inputRecordBatch.getLength * 4)
val selectionVector = new SelectionVectorInt32(selectionBuffer)
filter.evaluate(inputRecordBatch, selectionVector)
selectionVector
}

def evaluate(numRows: Int, inputColumnVector: List[ValueVector]): SelectionVectorInt16 = {
def evaluate(numRows: Int, inputColumnVector: List[ValueVector]): SelectionVectorInt32 = {
val inputRecordBatch: ArrowRecordBatch = ConverterUtils.createArrowRecordBatch(numRows, inputColumnVector)
val selectionVector = evaluate(inputRecordBatch)
ConverterUtils.releaseArrowRecordBatch(inputRecordBatch)
Expand Down
6 changes: 3 additions & 3 deletions native-sql-engine/cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluateWithSe
reinterpret_cast<uint8_t*>(selection_vector_buf_addr), selection_vector_buf_size);

auto selection_arraydata = arrow::ArrayData::Make(
arrow::uint16(), selection_vector_count, {NULLPTR, selection_vector_buf});
arrow::uint32(), selection_vector_count, {NULLPTR, selection_vector_buf});
auto selection_array = arrow::MakeArray(selection_arraydata);

std::vector<std::shared_ptr<arrow::RecordBatch>> out;
Expand Down Expand Up @@ -852,7 +852,7 @@ Java_com_intel_oap_vectorized_BatchIterator_nativeProcessWithSelection(
reinterpret_cast<uint8_t*>(selection_vector_buf_addr), selection_vector_buf_size);

auto selection_arraydata = arrow::ArrayData::Make(
arrow::uint16(), selection_vector_count, {NULLPTR, selection_vector_buf});
arrow::uint32(), selection_vector_count, {NULLPTR, selection_vector_buf});
auto selection_array = arrow::MakeArray(selection_arraydata);

std::shared_ptr<arrow::RecordBatch> out;
Expand Down Expand Up @@ -938,7 +938,7 @@ Java_com_intel_oap_vectorized_BatchIterator_nativeProcessAndCacheOneWithSelectio
reinterpret_cast<uint8_t*>(selection_vector_buf_addr), selection_vector_buf_size);

auto selection_arraydata = arrow::ArrayData::Make(
arrow::uint16(), selection_vector_count, {NULLPTR, selection_vector_buf});
arrow::uint32(), selection_vector_count, {NULLPTR, selection_vector_buf});
auto selection_array = arrow::MakeArray(selection_arraydata);
JniAssertOkOrThrow(iter->ProcessAndCacheOne(in, selection_array),
"nativeProcessAndCache: ResultIterator process next failed");
Expand Down