Skip to content

Commit 2191b95

Browse files
julienledemkou
authored andcommitted
ARROW-347: Add method to pass CallBack when creating a transfer pair
supersedes and closes apache#182 Author: Julien Le Dem <julien@dremio.com> Closes apache#425 from julienledem/arrow_347 and squashes the following commits: 3c47b82 [Julien Le Dem] ARROW-347: Add method to pass CallBack when creating a transfer pair
1 parent a6dd777 commit 2191b95

File tree

10 files changed

+146
-31
lines changed

10 files changed

+146
-31
lines changed

vector/src/main/codegen/templates/UnionVector.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,12 +236,17 @@ public Field getField() {
236236
237237
@Override
238238
public TransferPair getTransferPair(BufferAllocator allocator) {
239-
return new TransferImpl(name, allocator);
239+
return getTransferPair(name, allocator);
240240
}
241241
242242
@Override
243243
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
244-
return new TransferImpl(ref, allocator);
244+
return getTransferPair(ref, allocator, null);
245+
}
246+
247+
@Override
248+
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
249+
return new org.apache.arrow.vector.complex.UnionVector.TransferImpl(ref, allocator, callBack);
245250
}
246251
247252
@Override
@@ -276,8 +281,8 @@ private class TransferImpl implements TransferPair {
276281
private final TransferPair typeVectorTransferPair;
277282
private final UnionVector to;
278283
279-
public TransferImpl(String name, BufferAllocator allocator) {
280-
to = new UnionVector(name, allocator, null);
284+
public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
285+
to = new UnionVector(name, allocator, callBack);
281286
internalMapVectorTransferPair = internalMap.makeTransferPair(to.internalMap);
282287
typeVectorTransferPair = typeVector.makeTransferPair(to.typeVector);
283288
}

vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.arrow.vector.schema.ArrowFieldNode;
2525

2626
import io.netty.buffer.ArrowBuf;
27+
import org.apache.arrow.vector.util.CallBack;
28+
import org.apache.arrow.vector.util.TransferPair;
2729

2830

2931
public abstract class BaseDataValueVector extends BaseValueVector implements BufferBacked {
@@ -87,6 +89,11 @@ public void close() {
8789
super.close();
8890
}
8991

92+
@Override
93+
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
94+
return getTransferPair(ref, allocator);
95+
}
96+
9097
@Override
9198
public ArrowBuf[] getBuffers(boolean clear) {
9299
ArrowBuf[] out;

vector/src/main/java/org/apache/arrow/vector/ValueVector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.arrow.vector.complex.reader.FieldReader;
2525
import org.apache.arrow.vector.types.Types.MinorType;
2626
import org.apache.arrow.vector.types.pojo.Field;
27+
import org.apache.arrow.vector.util.CallBack;
2728
import org.apache.arrow.vector.util.TransferPair;
2829

2930
import io.netty.buffer.ArrowBuf;
@@ -106,6 +107,8 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
106107

107108
TransferPair getTransferPair(String ref, BufferAllocator allocator);
108109

110+
TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack);
111+
109112
/**
110113
* Returns a new {@link org.apache.arrow.vector.util.TransferPair transfer pair} that is used to transfer underlying
111114
* buffers into the target vector.

vector/src/main/java/org/apache/arrow/vector/ZeroVector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.arrow.vector.types.Types.MinorType;
3030
import org.apache.arrow.vector.types.pojo.ArrowType.Null;
3131
import org.apache.arrow.vector.types.pojo.Field;
32+
import org.apache.arrow.vector.util.CallBack;
3233
import org.apache.arrow.vector.util.TransferPair;
3334

3435
import io.netty.buffer.ArrowBuf;
@@ -159,6 +160,11 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
159160
return defaultPair;
160161
}
161162

163+
@Override
164+
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
165+
return defaultPair;
166+
}
167+
162168
@Override
163169
public TransferPair makeTransferPair(ValueVector target) {
164170
return defaultPair;

vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.arrow.vector.ZeroVector;
3030
import org.apache.arrow.vector.types.Types.MinorType;
3131
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
32+
import org.apache.arrow.vector.util.CallBack;
3233
import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
3334

3435
import com.google.common.base.Preconditions;
@@ -44,15 +45,17 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
4445

4546
protected final UInt4Vector offsets;
4647
protected FieldVector vector;
48+
protected final CallBack callBack;
4749

48-
protected BaseRepeatedValueVector(String name, BufferAllocator allocator) {
49-
this(name, allocator, DEFAULT_DATA_VECTOR);
50+
protected BaseRepeatedValueVector(String name, BufferAllocator allocator, CallBack callBack) {
51+
this(name, allocator, DEFAULT_DATA_VECTOR, callBack);
5052
}
5153

52-
protected BaseRepeatedValueVector(String name, BufferAllocator allocator, FieldVector vector) {
54+
protected BaseRepeatedValueVector(String name, BufferAllocator allocator, FieldVector vector, CallBack callBack) {
5355
super(name, allocator);
5456
this.offsets = new UInt4Vector(OFFSETS_VECTOR_NAME, allocator);
5557
this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null");
58+
this.callBack = callBack;
5659
}
5760

5861
@Override
@@ -154,9 +157,12 @@ public int size() {
154157
public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType, DictionaryEncoding dictionary) {
155158
boolean created = false;
156159
if (vector instanceof ZeroVector) {
157-
vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, dictionary, null);
160+
vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, dictionary, callBack);
158161
// returned vector must have the same field
159162
created = true;
163+
if (callBack != null) {
164+
callBack.doWork();
165+
}
160166
}
161167

162168
if (vector.getField().getType().getTypeID() != minorType.getType().getTypeID()) {

vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@
2424
import java.util.Collections;
2525
import java.util.List;
2626

27-
import com.google.common.collect.ImmutableList;
28-
import com.google.common.collect.ObjectArrays;
29-
30-
import io.netty.buffer.ArrowBuf;
3127
import org.apache.arrow.memory.BufferAllocator;
3228
import org.apache.arrow.memory.OutOfMemoryException;
3329
import org.apache.arrow.vector.AddOrGetResult;
@@ -52,24 +48,27 @@
5248
import org.apache.arrow.vector.util.JsonStringArrayList;
5349
import org.apache.arrow.vector.util.TransferPair;
5450

51+
import com.google.common.collect.ImmutableList;
52+
import com.google.common.collect.ObjectArrays;
53+
54+
import io.netty.buffer.ArrowBuf;
55+
5556
public class ListVector extends BaseRepeatedValueVector implements FieldVector {
5657

5758
final UInt4Vector offsets;
5859
final BitVector bits;
5960
private final List<BufferBacked> innerVectors;
6061
private Mutator mutator = new Mutator();
6162
private Accessor accessor = new Accessor();
62-
private UnionListWriter writer;
6363
private UnionListReader reader;
6464
private CallBack callBack;
6565
private final DictionaryEncoding dictionary;
6666

6767
public ListVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack) {
68-
super(name, allocator);
68+
super(name, allocator, callBack);
6969
this.bits = new BitVector("$bits$", allocator);
7070
this.offsets = getOffsetVector();
7171
this.innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(bits, offsets));
72-
this.writer = new UnionListWriter(this);
7372
this.reader = new UnionListReader(this);
7473
this.dictionary = dictionary;
7574
this.callBack = callBack;
@@ -86,6 +85,8 @@ public void initializeChildrenFromFields(List<Field> children) {
8685
if (!addOrGetVector.isCreated()) {
8786
throw new IllegalArgumentException("Child vector already existed: " + addOrGetVector.getVector());
8887
}
88+
89+
addOrGetVector.getVector().initializeChildrenFromFields(field.getChildren());
8990
}
9091

9192
@Override
@@ -111,7 +112,7 @@ public List<BufferBacked> getFieldInnerVectors() {
111112
}
112113

113114
public UnionListWriter getWriter() {
114-
return writer;
115+
return new UnionListWriter(this);
115116
}
116117

117118
@Override
@@ -139,7 +140,12 @@ public FieldVector getDataVector() {
139140

140141
@Override
141142
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
142-
return new TransferImpl(ref, allocator);
143+
return getTransferPair(ref, allocator, null);
144+
}
145+
146+
@Override
147+
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
148+
return new TransferImpl(ref, allocator, callBack);
143149
}
144150

145151
@Override
@@ -152,8 +158,8 @@ private class TransferImpl implements TransferPair {
152158
ListVector to;
153159
TransferPair pairs[] = new TransferPair[3];
154160

155-
public TransferImpl(String name, BufferAllocator allocator) {
156-
this(new ListVector(name, allocator, dictionary, null));
161+
public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
162+
this(new ListVector(name, allocator, dictionary, callBack));
157163
}
158164

159165
public TransferImpl(ListVector to) {
@@ -172,6 +178,7 @@ public void transfer() {
172178
for (TransferPair pair : pairs) {
173179
pair.transfer();
174180
}
181+
to.lastSet = lastSet;
175182
}
176183

177184
@Override
@@ -282,9 +289,12 @@ public ArrowBuf[] getBuffers(boolean clear) {
282289
}
283290

284291
public UnionVector promoteToUnion() {
285-
UnionVector vector = new UnionVector(name, allocator, null);
292+
UnionVector vector = new UnionVector(name, allocator, callBack);
286293
replaceDataVector(vector);
287294
reader = new UnionListReader(this);
295+
if (callBack != null) {
296+
callBack.doWork();
297+
}
288298
return vector;
289299
}
290300

vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public int getBufferSizeFor(final int valueCount) {
115115

116116
@Override
117117
public TransferPair getTransferPair(BufferAllocator allocator) {
118+
return getTransferPair(name, allocator, null);
119+
}
120+
121+
@Override
122+
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
118123
return new MapTransferPair(this, new MapVector(name, allocator, callBack), false);
119124
}
120125

vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public FieldReader getReader() {
8686

8787
@Override
8888
public TransferPair getTransferPair(BufferAllocator allocator) {
89-
return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, dictionary, callBack), false);
89+
return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, dictionary, null), false);
9090
}
9191

9292
@Override
@@ -96,6 +96,11 @@ public TransferPair makeTransferPair(ValueVector to) {
9696

9797
@Override
9898
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
99+
return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, dictionary, null), false);
100+
}
101+
102+
@Override
103+
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
99104
return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, dictionary, callBack), false);
100105
}
101106

vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public boolean isEmptyMap() {
142142
}
143143

144144
protected FieldWriter getWriter() {
145-
return getWriter(type);
145+
return writer;
146146
}
147147

148148
private FieldWriter promoteToUnion() {

0 commit comments

Comments
 (0)