Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void init() throws Exception {
"", HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

HoodieSparkWriteableTestTable cowTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema.toAvroSchema());
HoodieSparkWriteableTestTable cowTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

cowTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
Expand All @@ -127,7 +127,7 @@ public void init() throws Exception {
morTablePath, "mor_table", HoodieTableType.MERGE_ON_READ.name(),
"", HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");
HoodieSparkWriteableTestTable morTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema.toAvroSchema());
HoodieSparkWriteableTestTable morTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

morTable.addDeltaCommit("20160401010101");
morTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
Expand All @@ -151,7 +151,7 @@ public void init() throws Exception {
"", HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema.toAvroSchema());
HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

cowNonPartitionedTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "1", hoodieRecords1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private MessageType getWriteSchema(HoodieWriteConfig config, List<StoragePath> i
// All files should have the same schema in this case
try {
ParquetUtils parquetUtils = new ParquetUtils();
MessageType fileSchema = parquetUtils.readSchema(table.getStorage(), inputFiles.get(0));
MessageType fileSchema = parquetUtils.readMessageType(table.getStorage(), inputFiles.get(0));
log.info("Binary copy schema evolution disabled. Using schema from input file: " + inputFiles.get(0));
return fileSchema;
} catch (Exception e) {
Expand All @@ -79,7 +79,7 @@ private MessageType getWriteSchema(HoodieWriteConfig config, List<StoragePath> i
}
} else {
// Default behavior: use the table's write schema for evolution
return getAvroSchemaConverter(conf).convert(writeSchemaWithMetaFields.toAvroSchema());
return getAvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;

import org.apache.avro.Schema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -248,9 +246,9 @@ public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(Hoodi
}
final int parallelism = Math.min(partitionFileSlicePairs.size(), secondaryIndexMaxParallelism);
final StoragePath basePath = metaClient.getBasePath();
Schema tableSchema;
HoodieSchema tableSchema;
try {
tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
tableSchema = new TableSchemaResolver(metaClient).getTableSchema();
} catch (Exception e) {
throw new HoodieException("Failed to get latest schema for " + metaClient.getBasePath(), e);
}
Expand All @@ -261,16 +259,16 @@ public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(Hoodi
final String partition = partitionAndBaseFile.getKey();
final FileSlice fileSlice = partitionAndBaseFile.getValue();
Option<StoragePath> dataFilePath = Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> filePath(basePath, partition, baseFile.getFileName())).orElseGet(null));
Schema readerSchema;
HoodieSchema readerSchema;
if (dataFilePath.isPresent()) {
readerSchema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
.getFileFormatUtils(baseFileFormat)
.readAvroSchema(metaClient.getStorage(), dataFilePath.get());
.readSchema(metaClient.getStorage(), dataFilePath.get());
} else {
readerSchema = tableSchema;
}
ClosableIterator<Pair<String, String>> secondaryIndexGenerator = createSecondaryIndexRecordGenerator(
readerContextFactory.getContext(), metaClient, fileSlice, HoodieSchema.fromAvroSchema(readerSchema), indexDefinition,
readerContextFactory.getContext(), metaClient, fileSlice, readerSchema, indexDefinition,
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""), props, false);
return new CloseableMappingIterator<>(secondaryIndexGenerator, pair -> createSecondaryIndexRecord(pair.getKey(), pair.getValue(), indexDefinition.getIndexName(), false));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieParquetConfig;
import org.apache.hudi.common.engine.LocalTaskContextSupplier;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
Expand All @@ -31,7 +32,6 @@
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
Expand Down Expand Up @@ -61,11 +61,11 @@ public void testProperWriting() throws IOException {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
List<GenericRecord> records = dataGen.generateGenericRecords(10);

Schema schema = records.get(0).getSchema();
HoodieSchema schema = HoodieSchema.fromAvroSchema(records.get(0).getSchema());

BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000,
BloomFilterTypeCode.DYNAMIC_V0.name());
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema),
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema.toAvroSchema()),
schema, Option.of(filter), new Properties());

HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void archiveIfRequired_instantsAreArchived() throws Exception {
.build();
HoodieEngineContext context = new HoodieLocalEngineContext(metaClient.getStorageConf());
HoodieStorage hoodieStorage = HoodieStorageUtils.getStorage(basePath, metaClient.getStorageConf());
HoodieWriteableTestTable testTable = new HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA.toAvroSchema(), null, null, Option.of(context));
HoodieWriteableTestTable testTable = new HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA, null, null, Option.of(context));
testTable.addCommit(InProcessTimeGenerator.createNewInstantTime());
testTable.addCommit(InProcessTimeGenerator.createNewInstantTime());
testTable.addCommit(InProcessTimeGenerator.createNewInstantTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void testTagLocation(boolean manuallySetPartitions) throws Exception {
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(HoodieRecord::isCurrentLocationKnown));

HoodieStorage hoodieStorage = HoodieStorageUtils.getStorage(basePath, HadoopFSUtils.getStorageConf(conf));
HoodieWriteableTestTable testTable = new HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA.toAvroSchema(), null, null, Option.of(context));
HoodieWriteableTestTable testTable = new HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA, null, null, Option.of(context));

String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void testTagLocation(boolean manuallySetPartitions) throws Exception {
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(HoodieRecord::isCurrentLocationKnown));

HoodieStorage hoodieStorage = HoodieStorageUtils.getStorage(basePath, HadoopFSUtils.getStorageConf(conf));
HoodieWriteableTestTable testTable = new HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA.toAvroSchema(), null, null, Option.of(context));
HoodieWriteableTestTable testTable = new HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA, null, null, Option.of(context));

String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testSchemaEvolutionDisabled_UsesFileSchema() throws Exception {
// Mock ParquetUtils to return file schema
try (MockedConstruction<ParquetUtils> parquetUtilsConstruction = mockConstruction(ParquetUtils.class,
(mock, context) -> {
when(mock.readSchema(eq(storage), eq(inputFiles.get(0)))).thenReturn(fileSchema);
when(mock.readMessageType(eq(storage), eq(inputFiles.get(0)))).thenReturn(fileSchema);
})) {

// When: Creating HoodieBinaryCopyHandle (we can't instantiate directly due to complex dependencies,
Expand Down Expand Up @@ -201,7 +201,7 @@ public MessageType testGetWriteSchema(HoodieWriteConfig config, List<StoragePath
throw new IOException("Simulated file read error");
}
ParquetUtils parquetUtils = new ParquetUtils();
MessageType fileSchema = parquetUtils.readSchema(table.getStorage(), inputFiles.get(0));
MessageType fileSchema = parquetUtils.readMessageType(table.getStorage(), inputFiles.get(0));
return fileSchema;
} catch (Exception e) {
throw new HoodieIOException("Failed to read schema from input file when schema evolution is disabled: " + inputFiles.get(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand All @@ -48,7 +49,6 @@
import org.apache.hudi.storage.StoragePath;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.orc.CompressionKind;
import org.apache.parquet.avro.AvroSchemaConverter;
Expand All @@ -67,24 +67,24 @@
@Slf4j
public class HoodieWriteableTestTable extends HoodieMetadataTestTable {

protected final Schema schema;
protected final HoodieSchema schema;
protected final Option<BloomFilter> filter;
protected final boolean populateMetaFields;

protected HoodieWriteableTestTable(String basePath, HoodieStorage storage,
HoodieTableMetaClient metaClient,
Schema schema, BloomFilter filter) {
HoodieSchema schema, BloomFilter filter) {
this(basePath, storage, metaClient, schema, filter, null);
}

protected HoodieWriteableTestTable(String basePath, HoodieStorage storage,
HoodieTableMetaClient metaClient, Schema schema,
HoodieTableMetaClient metaClient, HoodieSchema schema,
BloomFilter filter, HoodieTableMetadataWriter metadataWriter) {
this(basePath, storage, metaClient, schema, filter, metadataWriter, Option.empty());
}

public HoodieWriteableTestTable(String basePath, HoodieStorage storage,
HoodieTableMetaClient metaClient, Schema schema,
HoodieTableMetaClient metaClient, HoodieSchema schema,
BloomFilter filter, HoodieTableMetadataWriter metadataWriter,
Option<HoodieEngineContext> context) {
super(basePath, storage, metaClient, metadataWriter, context);
Expand Down Expand Up @@ -116,7 +116,7 @@ public StoragePath withInserts(String partition, String fileId, List<HoodieRecor

if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.PARQUET)) {
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, filter, new Properties());
new AvroSchemaConverter().convert(schema.toAvroSchema()), schema, filter, new Properties());
HoodieParquetConfig<HoodieAvroWriteSupport> config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
storage.getConf(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true);
Expand All @@ -125,7 +125,8 @@ public StoragePath withInserts(String partition, String fileId, List<HoodieRecor
contextSupplier, populateMetaFields)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.rewriteRecordWithNewSchema(schema, CollectionUtils.emptyProps(), schema).getData();
GenericRecord avroRecord = (GenericRecord) record.rewriteRecordWithNewSchema(
schema.toAvroSchema(), CollectionUtils.emptyProps(), schema.toAvroSchema()).getData();
if (populateMetaFields) {
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
Expand All @@ -148,7 +149,7 @@ public StoragePath withInserts(String partition, String fileId, List<HoodieRecor
config, schema, contextSupplier)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.toIndexedRecord(schema, CollectionUtils.emptyProps()).get().getData();
GenericRecord avroRecord = (GenericRecord) record.toIndexedRecord(schema.toAvroSchema(), CollectionUtils.emptyProps()).get().getData();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package org.apache.hudi.io.log.block;

import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.util.Option;
Expand All @@ -32,7 +33,6 @@
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
import org.apache.hudi.storage.HoodieStorage;

import org.apache.avro.Schema;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -78,7 +78,7 @@ public ByteArrayOutputStream getContentBytes(HoodieStorage storage) throws IOExc
paramsMap.put(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get());
paramsMap.put(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get()));
paramsMap.put(PARQUET_DICTIONARY_ENABLED.key(), String.valueOf(useDictionaryEncoding.get()));
Schema writerSchema = AvroSchemaCache.intern(new Schema.Parser().parse(
HoodieSchema writerSchema = HoodieSchemaCache.intern(HoodieSchema.parse(
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA)));

Pair<ByteArrayOutputStream, Object> result =
Expand All @@ -88,7 +88,7 @@ public ByteArrayOutputStream getContentBytes(HoodieStorage storage) throws IOExc
recordIterator,
HoodieRecord.HoodieRecordType.FLINK,
writerSchema,
getSchema().toAvroSchema(),
getSchema(),
getKeyFieldName(),
paramsMap);
ValidationUtils.checkArgument(result.getRight() instanceof ParquetMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, b
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieBloomIndex index = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA.toAvroSchema());
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);

// Create some partitions, and put some files
// "2016/01/21": 0 file
Expand Down Expand Up @@ -205,7 +205,7 @@ public void testCheckUUIDsAgainstOneFile() throws Exception {
// record2, record3).
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
filter.add(record3.getRecordKey());
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(metaClient, SCHEMA.toAvroSchema(), filter);
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(metaClient, SCHEMA, filter);
String fileId = testTable.addCommit("000").getFileIdWithInserts(partition, record1, record2);
String filename = testTable.getBaseFileNameById(fileId);

Expand Down Expand Up @@ -266,7 +266,7 @@ public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean
// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieFlinkTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA.toAvroSchema());
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);

// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance());
Expand Down Expand Up @@ -317,7 +317,7 @@ public void testCheckExists(boolean rangePruning, boolean treeFiltering, boolean
// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA.toAvroSchema());
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);

// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance());
Expand Down Expand Up @@ -384,7 +384,7 @@ public void testBloomFilterFalseError(boolean rangePruning, boolean treeFilterin

BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
filter.add(record2.getRecordKey());
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(metaClient, SCHEMA.toAvroSchema(), filter);
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(metaClient, SCHEMA, filter);
String fileId = testTable.addCommit("000").getFileIdWithInserts("2016/01/31", record1);
assertTrue(filter.mightContain(record1.getRecordKey()));
assertTrue(filter.mightContain(record2.getRecordKey()));
Expand Down
Loading