Skip to content

Commit

Permalink
[feature](tvf)(jni-avro)jni-avro scanner add complex data types (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored and gnehil committed Dec 4, 2023
1 parent 568c2c1 commit 1877ed9
Show file tree
Hide file tree
Showing 9 changed files with 513 additions and 85 deletions.
44 changes: 20 additions & 24 deletions be/src/vec/exec/format/avro/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,10 @@ Status AvroJNIReader::init_fetch_table_reader(
{"file_type", std::to_string(type)},
{"is_get_table_schema", "false"},
{"hive.serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"}};
switch (type) {
case TFileType::FILE_HDFS:
required_param.insert(std::make_pair("uri", _params.hdfs_params.hdfs_conf.data()->value));
break;
case TFileType::FILE_S3:
required_param.insert(std::make_pair("uri", _range.path));
if (type == TFileType::FILE_S3) {
required_param.insert(_params.properties.begin(), _params.properties.end());
break;
default:
Status::InternalError("unsupported file reader type: {}", std::to_string(type));
}
required_param.insert(_params.properties.begin(), _params.properties.end());
required_param.insert(std::make_pair("uri", _range.path));
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
required_param, column_names);
RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
Expand Down Expand Up @@ -144,39 +136,43 @@ Status AvroJNIReader::get_parsed_schema(std::vector<std::string>* col_names,
}

TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& column_schema) {
::doris::TPrimitiveType::type schema_type =
static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt());
auto schema_type = static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt());
switch (schema_type) {
case TPrimitiveType::INT:
case TPrimitiveType::STRING:
case TPrimitiveType::BIGINT:
case TPrimitiveType::BOOLEAN:
case TPrimitiveType::DOUBLE:
case TPrimitiveType::FLOAT:
return TypeDescriptor(thrift_to_type(schema_type));
case TPrimitiveType::BINARY:
return {thrift_to_type(schema_type)};
case TPrimitiveType::ARRAY: {
TypeDescriptor list_type(PrimitiveType::TYPE_ARRAY);
list_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject()));
const rapidjson::Value& childColumns = column_schema["childColumns"];
list_type.add_sub_type(convert_to_doris_type(childColumns[0]));
return list_type;
}
case TPrimitiveType::MAP: {
TypeDescriptor map_type(PrimitiveType::TYPE_MAP);

const rapidjson::Value& childColumns = column_schema["childColumns"];
// The default type of AVRO MAP structure key is STRING
map_type.add_sub_type(PrimitiveType::TYPE_STRING);
map_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject()));
map_type.add_sub_type(convert_to_doris_type(childColumns[1]));
return map_type;
}
case TPrimitiveType::STRUCT: {
TypeDescriptor struct_type(PrimitiveType::TYPE_STRUCT);
const rapidjson::Value& childColumns = column_schema["childColumns"];
for (auto i = 0; i < childColumns.Size(); i++) {
const rapidjson::Value& child = childColumns[i];
struct_type.add_sub_type(convert_to_doris_type(childColumns[i]),
std::string(child["name"].GetString()));
}
return struct_type;
}
default:
return TypeDescriptor(PrimitiveType::INVALID_TYPE);
return {PrimitiveType::INVALID_TYPE};
}
}

TypeDescriptor AvroJNIReader::convert_complex_type(
const rapidjson::Document::ConstObject child_schema) {
::doris::TPrimitiveType::type child_schema_type =
static_cast< ::doris::TPrimitiveType::type>(child_schema["type"].GetInt());
return TypeDescriptor(thrift_to_type(child_schema_type));
}

} // namespace doris::vectorized
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@

import org.apache.doris.common.jni.vec.ColumnValue;

import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Fixed;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;

public class AvroColumnValue implements ColumnValue {

Expand All @@ -42,6 +50,12 @@ public AvroColumnValue(ObjectInspector fieldInspector, Object fieldData) {
}

private Object inspectObject() {
if (fieldData instanceof ByteBuffer) {
return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(((ByteBuffer) fieldData).array());
} else if (fieldData instanceof Fixed) {
return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(
((GenericFixed) fieldData).bytes());
}
return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData);
}

Expand Down Expand Up @@ -162,6 +176,24 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {

@Override
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {

StructObjectInspector inspector = (StructObjectInspector) fieldInspector;
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
for (Integer idx : structFieldIndex) {
AvroColumnValue cv = null;
if (idx != null) {
StructField sf = fields.get(idx);
Object o;
if (fieldData instanceof GenericData.Record) {
GenericRecord record = (GenericRecord) inspector.getStructFieldData(fieldData, sf);
o = record.get(idx);
} else {
o = inspector.getStructFieldData(fieldData, sf);
}
if (Objects.nonNull(o)) {
cv = new AvroColumnValue(sf.getFieldObjectInspector(), o);
}
}
values.add(cv);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPrimitiveType;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
Expand All @@ -40,10 +37,7 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
Expand Down Expand Up @@ -193,54 +187,6 @@ protected int getNext() throws IOException {
@Override
protected TableSchema parseTableSchema() throws UnsupportedOperationException {
Schema schema = avroReader.getSchema();
List<Field> schemaFields = schema.getFields();
List<SchemaColumn> schemaColumns = new ArrayList<>();
for (Field schemaField : schemaFields) {
Schema avroSchema = schemaField.schema();
String columnName = schemaField.name().toLowerCase(Locale.ROOT);

SchemaColumn schemaColumn = new SchemaColumn();
TPrimitiveType tPrimitiveType = serializeSchemaType(avroSchema, schemaColumn);
schemaColumn.setName(columnName);
schemaColumn.setType(tPrimitiveType);
schemaColumns.add(schemaColumn);
}
return new TableSchema(schemaColumns);
}

private TPrimitiveType serializeSchemaType(Schema avroSchema, SchemaColumn schemaColumn)
throws UnsupportedOperationException {
Schema.Type type = avroSchema.getType();
switch (type) {
case NULL:
return TPrimitiveType.NULL_TYPE;
case STRING:
return TPrimitiveType.STRING;
case INT:
return TPrimitiveType.INT;
case BOOLEAN:
return TPrimitiveType.BOOLEAN;
case LONG:
return TPrimitiveType.BIGINT;
case FLOAT:
return TPrimitiveType.FLOAT;
case BYTES:
return TPrimitiveType.BINARY;
case DOUBLE:
return TPrimitiveType.DOUBLE;
case ARRAY:
SchemaColumn arrayChildColumn = new SchemaColumn();
schemaColumn.addChildColumn(arrayChildColumn);
arrayChildColumn.setType(serializeSchemaType(avroSchema.getElementType(), arrayChildColumn));
return TPrimitiveType.ARRAY;
case MAP:
SchemaColumn mapChildColumn = new SchemaColumn();
schemaColumn.addChildColumn(mapChildColumn);
mapChildColumn.setType(serializeSchemaType(avroSchema.getValueType(), mapChildColumn));
return TPrimitiveType.MAP;
default:
throw new UnsupportedOperationException("avro format: " + type.getName() + " is not supported.");
}
return AvroTypeUtils.parseTableSchema(schema);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.avro;

import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn;
import org.apache.doris.thrift.TPrimitiveType;

import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.commons.compress.utils.Lists;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class AvroTypeUtils {

protected static TableSchema parseTableSchema(Schema schema) throws UnsupportedOperationException {
List<Field> schemaFields = schema.getFields();
List<SchemaColumn> schemaColumns = new ArrayList<>();
for (Field schemaField : schemaFields) {
Schema avroSchema = schemaField.schema();
String columnName = schemaField.name();

SchemaColumn schemaColumn = new SchemaColumn();
TPrimitiveType tPrimitiveType = typeFromAvro(avroSchema, schemaColumn);
schemaColumn.setName(columnName);
schemaColumn.setType(tPrimitiveType);
schemaColumns.add(schemaColumn);
}
return new TableSchema(schemaColumns);
}

private static TPrimitiveType typeFromAvro(Schema avroSchema, SchemaColumn schemaColumn)
throws UnsupportedOperationException {
Schema.Type type = avroSchema.getType();
switch (type) {
case ENUM:
case STRING:
return TPrimitiveType.STRING;
case INT:
return TPrimitiveType.INT;
case BOOLEAN:
return TPrimitiveType.BOOLEAN;
case LONG:
return TPrimitiveType.BIGINT;
case FLOAT:
return TPrimitiveType.FLOAT;
case FIXED:
case BYTES:
return TPrimitiveType.BINARY;
case DOUBLE:
return TPrimitiveType.DOUBLE;
case ARRAY:
SchemaColumn arrayChildColumn = new SchemaColumn();
schemaColumn.addChildColumns(Collections.singletonList(arrayChildColumn));
arrayChildColumn.setType(typeFromAvro(avroSchema.getElementType(), arrayChildColumn));
return TPrimitiveType.ARRAY;
case MAP:
// The default type of AVRO MAP structure key is STRING
SchemaColumn keyChildColumn = new SchemaColumn();
keyChildColumn.setType(TPrimitiveType.STRING);
SchemaColumn valueChildColumn = new SchemaColumn();
valueChildColumn.setType(typeFromAvro(avroSchema.getValueType(), valueChildColumn));

schemaColumn.addChildColumns(Arrays.asList(keyChildColumn, valueChildColumn));
return TPrimitiveType.MAP;
case RECORD:
List<Field> fields = avroSchema.getFields();
List<SchemaColumn> childSchemaColumn = Lists.newArrayList();
for (Field field : fields) {
SchemaColumn structChildColumn = new SchemaColumn();
structChildColumn.setName(field.name());
structChildColumn.setType(typeFromAvro(field.schema(), structChildColumn));
childSchemaColumn.add(structChildColumn);
}
schemaColumn.addChildColumns(childSchemaColumn);
return TPrimitiveType.STRUCT;
case UNION:
List<Schema> nonNullableMembers = filterNullableUnion(avroSchema);
Preconditions.checkArgument(!nonNullableMembers.isEmpty(),
avroSchema.getName() + "Union child type not all nullAble type");
List<SchemaColumn> childSchemaColumns = Lists.newArrayList();
for (Schema nullableMember : nonNullableMembers) {
SchemaColumn childColumn = new SchemaColumn();
childColumn.setName(nullableMember.getName());
childColumn.setType(typeFromAvro(nullableMember, childColumn));
childSchemaColumns.add(childColumn);
}
schemaColumn.addChildColumns(childSchemaColumns);
return TPrimitiveType.STRUCT;
default:
throw new UnsupportedOperationException(
"avro format: " + avroSchema.getName() + type.getName() + " is not supported.");
}
}

private static List<Schema> filterNullableUnion(Schema schema) {
Preconditions.checkArgument(schema.isUnion(), "Schema must be union");
return schema.getTypes().stream().filter(s -> !s.isNullable()).collect(Collectors.toList());
}

}
Loading

0 comments on commit 1877ed9

Please sign in to comment.