Skip to content

Commit c57184a

Browse files
martin-traverseMartin Traverse
andauthored
GH-615: Produce Avro core data types out of Arrow VSR (#638)
## What's Changed Per discussion in #615 , here is a first take on the core producers to generate Avro data from Arrow vectors. There are a few points I'd like to clarify before going further: * Nullability. Avro only understands nullable types as unions but that is not normally how they will be if the data comes from other sources. I have added a special NullableProducer to handle nullable vectors which are not unions. We will need something equivalent in the consumers and probably a setting in the AvroToArrowConfig to control it on read, defaulting to current behaviour. I have also added special handling for nullable unions, because unions can't be nested (i.e. you can't nest "type | null" as a type inside a union). I can add consumers to handle both (unions and regular types) for review, if that sounds right? At the moment the schema for nullable fields gets quite mangled on a round trip! * Arrow has a lot more types than Avro, at the level of minor / vector types. Going Avro -> Arrow we just pick the direct equivalent. Going Arrow -> Avro, we could cast silently if there is no loss of precision. E.g. TinyInt and SmallInt -> Int and so on. For types like e.g. Decimal256 and LargeVarChar we could write out safely but would need support in the consumers to read back the wider types. I could start by adding the safe conversions now and we could come back to the wide types in a later PR maybe? * I have made Producer inherit AutoClosable the same as Consumer does. Not sure if that is always what you want though - it will free the buffers in the VSR, but you might want to keep the VSR after you are finished with IO. Do we need something like detachValueVector() to go with resetValueVector()? Calling close() after detach would not affect the vector. * Type information is inferred from the list of vectors, using minor types. We'll also need to generate the Avro schema, the input for that would be a list of fields. I haven't done it yet but will do if that sounds right. * Dictionary encoding for enums not implemented yet, I'll add it if the rest looks good. Caveat is that dictionaries must be fixed before the encoding starts if we are writing out the whole file in one go (i.e. if the Avro schema is at the start of the container file). If the schema is saved separately that limitation need not apply, we could provide the schema once encoding is finished. Please do let me know if this is going in the right direction + any comments. If it is I will add the missing pieces and start the exhaustive test coverage to mirror the consumers. Once it's done this PR should get us to the point where we can round trip the contents of an individual block for most data types, but it does not address the container format. Closes #615. --------- Co-authored-by: Martin Traverse <martin.traverse@icloud.com>
1 parent 1e62e3b commit c57184a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+6773
-0
lines changed

adapter/avro/src/main/java/module-info.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
module org.apache.arrow.adapter.avro {
1919
exports org.apache.arrow.adapter.avro.consumers;
2020
exports org.apache.arrow.adapter.avro.consumers.logical;
21+
exports org.apache.arrow.adapter.avro.producers;
22+
exports org.apache.arrow.adapter.avro.producers.logical;
2123
exports org.apache.arrow.adapter.avro;
2224

2325
requires org.apache.arrow.memory.core;

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/ArrowToAvroUtils.java

Lines changed: 524 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.producers;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.vector.BaseFixedWidthVector;
21+
import org.apache.arrow.vector.BigIntVector;
22+
import org.apache.avro.io.Encoder;
23+
24+
/**
25+
* Producer that produces long values from a {@link BigIntVector}, writes data to an Avro encoder.
26+
*
27+
* <p>Logical types are also supported, for vectors derived from {@link BaseFixedWidthVector} where
28+
* the internal representation matches BigIntVector and requires no conversion.
29+
*/
30+
public class AvroBigIntProducer extends BaseAvroProducer<BaseFixedWidthVector> {
31+
32+
/** Instantiate an AvroBigIntProducer. */
33+
public AvroBigIntProducer(BigIntVector vector) {
34+
super(vector);
35+
}
36+
37+
/** Protected constructor for logical types with a long representation. */
38+
protected AvroBigIntProducer(BaseFixedWidthVector vector) {
39+
super(vector);
40+
if (vector.getTypeWidth() != BigIntVector.TYPE_WIDTH) {
41+
throw new IllegalArgumentException(
42+
"AvroBigIntProducer requires type width = " + BigIntVector.TYPE_WIDTH);
43+
}
44+
}
45+
46+
@Override
47+
public void produce(Encoder encoder) throws IOException {
48+
long value = vector.getDataBuffer().getLong(currentIndex * (long) BigIntVector.TYPE_WIDTH);
49+
encoder.writeLong(value);
50+
currentIndex++;
51+
}
52+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.producers;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.vector.BitVector;
21+
import org.apache.avro.io.Encoder;
22+
23+
/**
24+
* Producer that produces boolean values from a {@link BitVector}, writes data to an Avro encoder.
25+
*/
26+
public class AvroBooleanProducer extends BaseAvroProducer<BitVector> {
27+
28+
/** Instantiate am AvroBooleanProducer. */
29+
public AvroBooleanProducer(BitVector vector) {
30+
super(vector);
31+
}
32+
33+
@Override
34+
public void produce(Encoder encoder) throws IOException {
35+
int bitValue = vector.get(currentIndex++);
36+
encoder.writeBoolean(bitValue != 0);
37+
}
38+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.producers;
18+
19+
import java.io.IOException;
20+
import java.nio.ByteBuffer;
21+
import org.apache.arrow.vector.VarBinaryVector;
22+
import org.apache.avro.io.Encoder;
23+
24+
/**
25+
* Producer that produces byte array values from a {@link VarBinaryVector}, writes data to an Avro
26+
* encoder.
27+
*/
28+
public class AvroBytesProducer extends BaseAvroProducer<VarBinaryVector> {
29+
30+
/** Instantiate an AvroBytesProducer. */
31+
public AvroBytesProducer(VarBinaryVector vector) {
32+
super(vector);
33+
}
34+
35+
@Override
36+
public void produce(Encoder encoder) throws IOException {
37+
// The nio ByteBuffer is created once per call, but underlying data is not copied
38+
long offset = vector.getStartOffset(currentIndex);
39+
long endOffset = vector.getEndOffset(currentIndex);
40+
int length = (int) (endOffset - offset);
41+
ByteBuffer nioBuffer = vector.getDataBuffer().nioBuffer(offset, length);
42+
encoder.writeBytes(nioBuffer);
43+
currentIndex++;
44+
}
45+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.producers;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.vector.IntVector;
21+
import org.apache.avro.io.Encoder;
22+
23+
/**
24+
* Producer that produces enum values from a dictionary-encoded {@link IntVector}, writes data to an
25+
* Avro encoder.
26+
*/
27+
public class AvroEnumProducer extends BaseAvroProducer<IntVector> {
28+
29+
/** Instantiate an AvroEnumProducer. */
30+
public AvroEnumProducer(IntVector vector) {
31+
super(vector);
32+
}
33+
34+
@Override
35+
public void produce(Encoder encoder) throws IOException {
36+
encoder.writeEnum(vector.get(currentIndex++));
37+
}
38+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.producers;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.vector.BaseFixedWidthVector;
21+
import org.apache.arrow.vector.FixedSizeBinaryVector;
22+
import org.apache.avro.io.Encoder;
23+
24+
/**
25+
* Producer that produces fixed-size binary values from a {@link FixedSizeBinaryVector}, writes data
26+
* to an Avro encoder.
27+
*
28+
* <p>Logical types are also supported, for vectors derived from {@link BaseFixedWidthVector} where
29+
* the internal representation is fixed width bytes and requires no conversion.
30+
*/
31+
public class AvroFixedSizeBinaryProducer extends BaseAvroProducer<BaseFixedWidthVector> {
32+
33+
private final byte[] reuseBytes;
34+
35+
/** Instantiate an AvroFixedSizeBinaryProducer. */
36+
public AvroFixedSizeBinaryProducer(FixedSizeBinaryVector vector) {
37+
super(vector);
38+
reuseBytes = new byte[vector.getTypeWidth()];
39+
}
40+
41+
/** Protected constructor for logical types with a fixed width representation. */
42+
protected AvroFixedSizeBinaryProducer(BaseFixedWidthVector vector) {
43+
super(vector);
44+
reuseBytes = new byte[vector.getTypeWidth()];
45+
}
46+
47+
@Override
48+
public void produce(Encoder encoder) throws IOException {
49+
long offset = (long) currentIndex * vector.getTypeWidth();
50+
vector.getDataBuffer().getBytes(offset, reuseBytes);
51+
encoder.writeFixed(reuseBytes);
52+
currentIndex++;
53+
}
54+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.producers;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.vector.FieldVector;
21+
import org.apache.arrow.vector.complex.FixedSizeListVector;
22+
import org.apache.avro.io.Encoder;
23+
24+
/**
25+
* Producer that produces array values from a {@link FixedSizeListVector}, writes data to an avro
26+
* encoder.
27+
*/
28+
public class AvroFixedSizeListProducer extends BaseAvroProducer<FixedSizeListVector> {
29+
30+
private final Producer<? extends FieldVector> delegate;
31+
32+
/** Instantiate an AvroFixedSizeListProducer. */
33+
public AvroFixedSizeListProducer(
34+
FixedSizeListVector vector, Producer<? extends FieldVector> delegate) {
35+
super(vector);
36+
this.delegate = delegate;
37+
}
38+
39+
@Override
40+
public void produce(Encoder encoder) throws IOException {
41+
42+
encoder.writeArrayStart();
43+
encoder.setItemCount(vector.getListSize());
44+
45+
for (int i = 0; i < vector.getListSize(); i++) {
46+
encoder.startItem();
47+
delegate.produce(encoder);
48+
}
49+
50+
encoder.writeArrayEnd();
51+
currentIndex++;
52+
}
53+
54+
@Override
55+
public void skipNull() {
56+
super.skipNull();
57+
// Child vector contains a fixed number of elements for each entry
58+
int childIndex = currentIndex * vector.getListSize();
59+
delegate.setPosition(childIndex);
60+
}
61+
62+
@Override
63+
public void setPosition(int index) {
64+
if (index < 0 || index > vector.getValueCount()) {
65+
throw new IllegalArgumentException("Index out of bounds");
66+
}
67+
super.setPosition(index);
68+
// Child vector contains a fixed number of elements for each entry
69+
int childIndex = currentIndex * vector.getListSize();
70+
delegate.setPosition(childIndex);
71+
}
72+
73+
@Override
74+
@SuppressWarnings("unchecked")
75+
public void resetValueVector(FixedSizeListVector vector) {
76+
((Producer<FieldVector>) delegate).resetValueVector(vector.getDataVector());
77+
}
78+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.adapter.avro.producers;
18+
19+
import java.io.IOException;
20+
import org.apache.arrow.memory.util.Float16;
21+
import org.apache.arrow.vector.Float2Vector;
22+
import org.apache.avro.io.Encoder;
23+
24+
/**
25+
* Producer that produces float values from a {@link Float2Vector}, writes data to an Avro encoder.
26+
*/
27+
public class AvroFloat2Producer extends BaseAvroProducer<Float2Vector> {
28+
29+
/** Instantiate an AvroFloat2Producer. */
30+
public AvroFloat2Producer(Float2Vector vector) {
31+
super(vector);
32+
}
33+
34+
@Override
35+
public void produce(Encoder encoder) throws IOException {
36+
short rawValue = vector.getDataBuffer().getShort(currentIndex * (long) Float2Vector.TYPE_WIDTH);
37+
encoder.writeFloat(Float16.toFloat(rawValue));
38+
currentIndex++;
39+
}
40+
}

0 commit comments

Comments
 (0)