Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> g
}

public static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(HoodieSchema schema,
HoodieWriteConfig writeConfig) {
HoodieWriteConfig writeConfig) {
// NOTE: Whether record have to be cloned here is determined based on the executor type used
// for writing: executors relying on an inner queue, will be keeping references to the records
// and therefore in the environments where underlying buffer holding the record could be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.format.FlinkRecordContext;
import org.apache.hudi.util.RowDataAvroQueryContexts;
import org.apache.hudi.util.RowDataAvroQueryContexts.RowDataQueryContext;
import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.hudi.util.RowDataQueryContexts.RowDataQueryContext;
import org.apache.hudi.util.RowProjection;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -103,7 +102,7 @@ protected Comparable<?> doGetOrderingValue(HoodieSchema recordSchema, Properties
if (recordSchema.getField(field).isEmpty()) {
return OrderingValues.getDefault();
}
return (Comparable<?>) getColumnValue(recordSchema.toAvroSchema(), field, props);
return (Comparable<?>) getColumnValue(recordSchema, field, props);
});
}
}
Expand Down Expand Up @@ -141,7 +140,7 @@ public String getRecordKey(HoodieSchema recordSchema, Option<BaseKeyGenerator> k
@Override
public String getRecordKey(HoodieSchema recordSchema, String keyFieldName) {
if (key == null) {
String recordKey = Objects.toString(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema()).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data));
String recordKey = Objects.toString(RowDataQueryContexts.fromSchema(recordSchema).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data));
key = new HoodieKey(recordKey, null);
}
return getRecordKey();
Expand All @@ -164,9 +163,9 @@ public Object convertColumnValueForLogicalType(HoodieSchema fieldSchema,
if (fieldValue == null) {
return null;
}

HoodieSchemaType schemaType = fieldSchema.getType();

if (schemaType == HoodieSchemaType.DATE) {
return LocalDate.ofEpochDay(((Integer) fieldValue).longValue());
} else if (schemaType == HoodieSchemaType.TIMESTAMP && keepConsistentLogicalTimestamp) {
Expand All @@ -190,20 +189,20 @@ public Object[] getColumnValues(HoodieSchema recordSchema, String[] columns, boo

@Override
public Object getColumnValueAsJava(HoodieSchema recordSchema, String column, Properties props) {
return getColumnValueAsJava(recordSchema.toAvroSchema(), column, props, true);
return getColumnValueAsJava(recordSchema, column, props, true);
}

private Object getColumnValueAsJava(Schema recordSchema, String column, Properties props, boolean allowsNull) {
private Object getColumnValueAsJava(HoodieSchema recordSchema, String column, Properties props, boolean allowsNull) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
RowDataQueryContext rowDataQueryContext = RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
RowDataQueryContext rowDataQueryContext = RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
return rowDataQueryContext.getFieldQueryContext(column).getValAsJava(data, allowsNull);
}

private Object getColumnValue(Schema recordSchema, String column, Properties props) {
private Object getColumnValue(HoodieSchema recordSchema, String column, Properties props) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
RowDataQueryContext rowDataQueryContext = RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
RowDataQueryContext rowDataQueryContext = RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed that the cache should still work.

return rowDataQueryContext.getFieldQueryContext(column).getFieldGetter().getFieldOrNull(data);
}

Expand Down Expand Up @@ -236,7 +235,7 @@ public HoodieRecord updateMetaField(HoodieSchema recordSchema, int ordinal, Stri

@Override
public HoodieRecord rewriteRecordWithNewSchema(HoodieSchema recordSchema, Properties props, HoodieSchema newSchema, Map<String, String> renameCols) {
RowProjection rowProjection = RowDataAvroQueryContexts.getRowProjection(recordSchema.toAvroSchema(), newSchema.toAvroSchema(), renameCols);
RowProjection rowProjection = RowDataQueryContexts.getRowProjection(recordSchema, newSchema, renameCols);
RowData newRow = rowProjection.project(getData());
return new HoodieFlinkRecord(getKey(), getOperation(), newRow);
}
Expand Down Expand Up @@ -285,17 +284,17 @@ public HoodieRecord truncateRecordKey(HoodieSchema recordSchema, Properties prop
public Option<HoodieAvroIndexedRecord> toIndexedRecord(HoodieSchema recordSchema, Properties props) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
RowDataQueryContext rowDataQueryContext = RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema(), utcTimezone);
IndexedRecord indexedRecord = (IndexedRecord) rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema.toAvroSchema(), getData());
RowDataQueryContext rowDataQueryContext = RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
IndexedRecord indexedRecord = (IndexedRecord) rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema, getData());
return Option.of(new HoodieAvroIndexedRecord(getKey(), indexedRecord, getOperation(), getMetadata(), orderingValue, isDelete));
}

@Override
public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, Properties props) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
RowDataQueryContext rowDataQueryContext = RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema(), utcTimezone);
IndexedRecord indexedRecord = (IndexedRecord) rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema.toAvroSchema(), getData());
RowDataQueryContext rowDataQueryContext = RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
IndexedRecord indexedRecord = (IndexedRecord) rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema, getData());
return HoodieAvroUtils.avroToBytesStream(indexedRecord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecords;
import org.apache.hudi.util.RowDataAvroQueryContexts;
import org.apache.hudi.util.RowDataQueryContexts;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -129,7 +129,7 @@ private <T> BufferedRecord<T> mergeRecord(
// later in the file writer.
int mergedArity = newSchema.getFields().size();
boolean utcTimezone = Boolean.parseBoolean(props.getProperty("read.utc-timezone", "true"));
RowData.FieldGetter[] fieldGetters = RowDataAvroQueryContexts.fromAvroSchema(newSchema.toAvroSchema(), utcTimezone).fieldGetters();
RowData.FieldGetter[] fieldGetters = RowDataQueryContexts.fromSchema(newSchema, utcTimezone).fieldGetters();

int lowOrderIdx = 0;
int highOrderIdx = 0;
Expand All @@ -138,10 +138,10 @@ private <T> BufferedRecord<T> mergeRecord(
// shift start index for merging if there is schema discrepancy
if (lowOrderArity != mergedArity) {
lowOrderIdx += lowOrderArity - mergedArity;
lowOrderFieldGetters = RowDataAvroQueryContexts.fromAvroSchema(lowOrderSchema.toAvroSchema(), utcTimezone).fieldGetters();
lowOrderFieldGetters = RowDataQueryContexts.fromSchema(lowOrderSchema, utcTimezone).fieldGetters();
} else if (highOrderArity != mergedArity) {
highOrderIdx += highOrderArity - mergedArity;
highOrderFieldGetters = RowDataAvroQueryContexts.fromAvroSchema(highOrderSchema.toAvroSchema(), utcTimezone).fieldGetters();
highOrderFieldGetters = RowDataQueryContexts.fromSchema(highOrderSchema, utcTimezone).fieldGetters();
}

RowData lowOrderRow = (RowData) lowOrderRecord.getRecord();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.hudi.execution;

import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.util.RowDataAvroQueryContexts;
import org.apache.hudi.util.RowDataQueryContexts;

import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -67,7 +67,7 @@ protected HoodieFileWriter newParquetFileWriter(
HoodieConfig config,
HoodieSchema schema) throws IOException {
//TODO boundary to revisit in follow up to use HoodieSchema directly
final RowType rowType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(schema.getAvroSchema()).getRowType().getLogicalType();
final RowType rowType = (RowType) RowDataQueryContexts.fromSchema(schema).getRowType().getLogicalType();
HoodieRowDataParquetWriteSupport writeSupport =
new HoodieRowDataParquetWriteSupport(
storage.getConf().unwrapAs(Configuration.class), rowType, null);
Expand All @@ -94,7 +94,7 @@ public HoodieFileWriter newParquetFileWriter(
HoodieSchema schema,
TaskContextSupplier taskContextSupplier) throws IOException {
//TODO boundary to revisit in follow up to use HoodieSchema directly
final RowType rowType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(schema.getAvroSchema()).getRowType().getLogicalType();
final RowType rowType = (RowType) RowDataQueryContexts.fromSchema(schema).getRowType().getLogicalType();
return newParquetFileWriter(instantTime, storagePath, config, rowType, taskContextSupplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.OrderingValueEngineTypeConverter;
import org.apache.hudi.util.RecordKeyToRowDataConverter;
import org.apache.hudi.util.RowDataAvroQueryContexts;
import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.hudi.util.RowDataUtils;
import org.apache.hudi.util.RowProjection;
import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
Expand Down Expand Up @@ -82,8 +82,8 @@ public static FlinkRecordContext getDeleteCheckingInstance() {

@Override
public Object getValue(RowData record, HoodieSchema schema, String fieldName) {
RowDataAvroQueryContexts.FieldQueryContext fieldQueryContext =
RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema(), utcTimezone).getFieldQueryContext(fieldName);
RowDataQueryContexts.FieldQueryContext fieldQueryContext =
RowDataQueryContexts.fromSchema(schema, utcTimezone).getFieldQueryContext(fieldName);
if (fieldQueryContext == null) {
return null;
} else {
Expand All @@ -108,7 +108,7 @@ public Comparable convertOrderingValueToEngineType(Comparable value) {

@Override
public GenericRecord convertToAvroRecord(RowData record, HoodieSchema schema) {
return (GenericRecord) RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema()).getRowDataToAvroConverter().convert(schema.toAvroSchema(), record);
return (GenericRecord) RowDataQueryContexts.fromSchema(schema).getRowDataToAvroConverter().convert(schema, record);
}

@Override
Expand All @@ -125,7 +125,7 @@ public RowData getDeleteRow(String recordKey) {
@Override
public RowData convertAvroRecord(IndexedRecord avroRecord) {
Schema recordSchema = avroRecord.getSchema();
AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone).getAvroToRowDataConverter();
AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieSchema.fromAvroSchema(recordSchema), utcTimezone).getAvroToRowDataConverter();
RowData rowData = (RowData) converter.convert(avroRecord);
Schema.Field operationField = recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD);
if (operationField != null) {
Expand Down Expand Up @@ -180,7 +180,7 @@ public RowData toBinaryRow(HoodieSchema schema, RowData record) {
if (record instanceof BinaryRowData) {
return record;
}
RowDataSerializer rowDataSerializer = RowDataAvroQueryContexts.getRowDataSerializer(schema.toAvroSchema());
RowDataSerializer rowDataSerializer = RowDataQueryContexts.getRowDataSerializer(schema);
return rowDataSerializer.toBinaryRow(record);
}

Expand All @@ -197,8 +197,8 @@ public RowData toBinaryRow(HoodieSchema schema, RowData record) {
*/
@Override
public UnaryOperator<RowData> projectRecord(HoodieSchema from, HoodieSchema to, Map<String, String> renamedColumns) {
RowType fromType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(from.toAvroSchema()).getRowType().getLogicalType();
RowType toType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(to.toAvroSchema()).getRowType().getLogicalType();
RowType fromType = (RowType) RowDataQueryContexts.fromSchema(from).getRowType().getLogicalType();
RowType toType = (RowType) RowDataQueryContexts.fromSchema(to).getRowType().getLogicalType();
RowProjection rowProjection = SchemaEvolvingRowDataProjection.instance(fromType, toType, renamedColumns);
return rowProjection::project;
}
Expand Down
Loading
Loading