From 2aca00fa2dcd45821de9d23415dabb858e91ce0a Mon Sep 17 00:00:00 2001 From: aadilkhalifa Date: Tue, 30 Jul 2024 10:31:55 +0530 Subject: [PATCH] Merge RecordEnricher into RecordTransformer --- .../utils/config/TableConfigSerDeTest.java | 3 +- .../flink/sink/FlinkSegmentWriter.java | 8 +-- .../realtime/RealtimeSegmentDataManager.java | 8 +-- .../framework/SegmentProcessorFramework.java | 4 +- .../processing/mapper/SegmentMapper.java | 8 +-- .../pinot/core/util/SchemaUtilsTest.java | 3 +- ...sonUnnestIngestionFromAvroQueriesTest.java | 6 +- .../pinot/queries/TransformQueriesTest.java | 4 +- .../CLPEncodingRealtimeIntegrationTest.java | 2 +- .../IngestionConfigHybridIntegrationTest.java | 6 +- .../MultiStageEngineIntegrationTest.java | 2 +- .../tests/OfflineClusterIntegrationTest.java | 18 +++--- ...eSegmentsMinionClusterIntegrationTest.java | 2 +- .../tests/custom/JsonPathTest.java | 8 ++- .../integration/tests/custom/MapTypeTest.java | 6 +- ...timeToOfflineSegmentsTaskExecutorTest.java | 4 +- .../filebased/FileBasedSegmentWriterTest.java | 5 +- .../converter/RealtimeSegmentConverter.java | 4 +- .../recordtransformer}/RecordEnricher.java | 30 ++++----- .../recordtransformer/RecordTransformer.java | 10 +++ .../RecordTransformerConfig.java | 8 +-- .../RecordTransformerFactory.java | 10 +-- .../RecordTransformerPipeline.java | 38 ++++++------ .../RecordTransformerRegistry.java | 62 +++++++++++++++++++ .../RecordTransformerValidationConfig.java | 6 +- .../clp/CLPEncodingTransformer.java} | 25 ++++---- .../clp/CLPEncodingTransformerFactory.java} | 22 +++---- .../clp/ClpTransformerConfig.java} | 6 +- .../function/CustomFunctionTransformer.java} | 16 ++--- .../CustomFunctionTransformerConfig.java} | 8 +-- .../CustomFunctionTransformerFactory.java} | 28 ++++----- ...RecordReaderSegmentCreationDataSource.java | 15 ++--- .../impl/SegmentIndexCreationDriverImpl.java | 14 ++--- .../segment/local/utils/IngestionUtils.java | 4 +- .../segment/local/utils/TableConfigUtils.java | 39 ++++++++---- .../ExpressionTransformerTest.java | 52 ++++++++-------- .../RecordTransformerTest.java | 2 +- .../index/loader/SegmentPreProcessorTest.java | 8 +-- .../local/utils/IngestionUtilsTest.java | 12 ++-- .../local/utils/TableConfigUtilsTest.java | 39 +++++++----- .../table/ingestion/EnrichmentConfig.java | 51 +++++++-------- .../table/ingestion/IngestionConfig.java | 22 +++---- .../table/ingestion/TransformConfig.java | 20 +++++- .../RecordEnricherRegistry.java | 62 ------------------- .../pinot/spi/utils/TimestampIndexUtils.java | 3 +- 45 files changed, 385 insertions(+), 328 deletions(-) rename {pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher => pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer}/RecordEnricher.java (67%) rename pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java => pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerConfig.java (86%) rename pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java => pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerFactory.java (73%) rename pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java => pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerPipeline.java (57%) create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerRegistry.java rename pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java => pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerValidationConfig.java (85%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record/enricher/clp/CLPEncodingEnricher.java => segment/local/recordtransformer/clp/CLPEncodingTransformer.java} (80%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record/enricher/clp/CLPEncodingEnricherFactory.java => segment/local/recordtransformer/clp/CLPEncodingTransformerFactory.java} (60%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record/enricher/clp/ClpEnricherConfig.java => segment/local/recordtransformer/clp/ClpTransformerConfig.java} (86%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record/enricher/function/CustomFunctionEnricher.java => segment/local/recordtransformer/function/CustomFunctionTransformer.java} (79%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record/enricher/function/CustomFunctionEnricherConfig.java => segment/local/recordtransformer/function/CustomFunctionTransformerConfig.java} (86%) rename pinot-segment-local/src/main/java/org/apache/pinot/{plugin/record/enricher/function/CustomFunctionEnricherFactory.java => segment/local/recordtransformer/function/CustomFunctionTransformerFactory.java} (63%) delete mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index dc9235d793d7..544f1117c2e6 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -284,7 +284,8 @@ public void testSerDe() new StreamIngestionConfig(Collections.singletonList(Collections.singletonMap("streamType", "kafka")))); ingestionConfig.setFilterConfig(new FilterConfig("filterFunc(foo)")); ingestionConfig.setTransformConfigs( - Arrays.asList(new TransformConfig("bar", "func(moo)"), new TransformConfig("zoo", "myfunc()"))); + Arrays.asList(new TransformConfig("bar", "func(moo)", null, null), + new TransformConfig("zoo", "myfunc()", null, null))); ingestionConfig.setComplexTypeConfig(new ComplexTypeConfig(Arrays.asList("c1", "c2"), ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, Collections.emptyMap())); ingestionConfig.setAggregationConfigs( diff --git a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java index 9e98d28116ca..f6d3df32f5cb 100644 --- a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java +++ b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java @@ -42,6 +42,7 @@ import org.apache.pinot.core.util.SegmentProcessorAvroUtils; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline; import org.apache.pinot.segment.local.utils.IngestionUtils; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -53,7 +54,6 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +78,7 @@ public class FlinkSegmentWriter implements SegmentWriter { private String _outputDirURI; private Schema _schema; private Set _fieldsToRead; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformerPipeline _recordTransformerPipeline; private RecordTransformer _recordTransformer; private File _stagingDir; @@ -139,7 +139,7 @@ public void init(TableConfig tableConfig, Schema schema, Map bat _schema = schema; _fieldsToRead = _schema.getColumnNames(); - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig); + _recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(_tableConfig); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); _avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema); _reusableRecord = new GenericData.Record(_avroSchema); @@ -175,7 +175,7 @@ private void resetBuffer() public void collect(GenericRow row) throws IOException { long startTime = System.currentTimeMillis(); - _recordEnricherPipeline.run(row); + _recordTransformerPipeline.run(row); GenericRow transform = _recordTransformer.transform(row); SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead); _rowCount++; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index c26b2c14f3b4..aeb380a9d6ac 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -58,6 +58,7 @@ import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable; import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; @@ -80,7 +81,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.metrics.PinotMeter; import org.apache.pinot.spi.plugin.PluginManager; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; @@ -283,7 +283,7 @@ public void deleteSegmentFile() { private final int _partitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; - private final RecordEnricherPipeline _recordEnricherPipeline; + private final RecordTransformerPipeline _recordTransformerPipeline; private final TransformPipeline _transformPipeline; private PartitionGroupConsumer _partitionGroupConsumer = null; private StreamMetadataProvider _partitionMetadataProvider = null; @@ -603,7 +603,7 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee _numRowsErrored++; } else { try { - _recordEnricherPipeline.run(decodedRow.getResult()); + _recordTransformerPipeline.run(decodedRow.getResult()); _transformPipeline.processRow(decodedRow.getResult(), reusedResult); } catch (Exception e) { _numRowsErrored++; @@ -1588,7 +1588,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _streamDataDecoder = localStreamDataDecoder.get(); try { - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig); + _recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(tableConfig); } catch (Exception e) { _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index 4b166b934a90..99729e48ba26 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -34,6 +34,7 @@ import org.apache.pinot.core.segment.processing.reducer.Reducer; import org.apache.pinot.core.segment.processing.reducer.ReducerFactory; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline; import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; @@ -43,7 +44,6 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -291,7 +291,7 @@ private List generateSegment(Map partitionT GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId); SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), - RecordEnricherPipeline.getPassThroughPipeline(), + RecordTransformerPipeline.getPassThroughPipeline(), TransformPipeline.getPassThroughPipeline()); driver.build(); outputSegmentDirs.add(driver.getOutputDirectory()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 8d36b72b92c3..ea8206ae682e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -42,6 +42,7 @@ import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline; import org.apache.pinot.segment.local.utils.IngestionUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; @@ -49,7 +50,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +70,7 @@ public class SegmentMapper { private final List _fieldSpecs; private final boolean _includeNullFields; private final int _numSortFields; - private final RecordEnricherPipeline _recordEnricherPipeline; + private final RecordTransformerPipeline _recordTransformerPipeline; private final CompositeTransformer _recordTransformer; private final ComplexTypeTransformer _complexTypeTransformer; private final TimeHandler _timeHandler; @@ -96,7 +96,7 @@ public SegmentMapper(List recordReaderFileConfigs, _fieldSpecs = pair.getLeft(); _numSortFields = pair.getRight(); _includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled(); - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig); + _recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(tableConfig); _recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema); _complexTypeTransformer = ComplexTypeTransformer.getComplexTypeTransformer(tableConfig); _timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig); @@ -172,7 +172,7 @@ private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount)); while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) { reuse = recordReader.next(reuse); - _recordEnricherPipeline.run(reuse); + _recordTransformerPipeline.run(reuse); if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { //noinspection unchecked diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java index e1dd69f7717a..83c9409c5e79 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java @@ -96,7 +96,8 @@ public void testCompatibilityWithTableConfig() { // schema doesn't have destination columns from transformConfigs schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build(); IngestionConfig ingestionConfig = new IngestionConfig(); - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("colA", "round(colB, 1000)"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("colA", + "round(colB, 1000)", null, null))); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig).build(); try { diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java index bdbad4b80392..c9190cb36c01 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java @@ -90,9 +90,9 @@ public class JsonUnnestIngestionFromAvroQueriesTest extends BaseQueriesTest { .build(); private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, null, null, - List.of(new TransformConfig("eventTimeColumn", "eventTimeColumn.seconds * 1000"), - new TransformConfig("eventTimeColumn_10m", "round(eventTimeColumn, 60000)")), + new IngestionConfig(null, null, null, + List.of(new TransformConfig("eventTimeColumn", "eventTimeColumn.seconds * 1000", null, null), + new TransformConfig("eventTimeColumn_10m", "round(eventTimeColumn, 60000)", null, null)), new ComplexTypeConfig(List.of(JSON_COLUMN), null, null, null), null, null, null) ).build(); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java index cfb570d80e0e..497b5f3ad1f6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java @@ -132,9 +132,9 @@ protected void buildSegment() TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME) - .setIngestionConfig(new IngestionConfig(null, null, null, null, + .setIngestionConfig(new IngestionConfig(null, null, null, Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || " - + "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")), + + "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)", null, null)), null, null, null, null)) .build(); Schema schema = diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java index d6b1ed119525..14ed7afb1180 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java @@ -140,7 +140,7 @@ protected List getFieldConfigs() { @Override protected IngestionConfig getIngestionConfig() { List transforms = new ArrayList<>(); - transforms.add(new TransformConfig("timestampInEpoch", "now()")); + transforms.add(new TransformConfig("timestampInEpoch", "now()", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transforms); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java index 7495be1e0f79..33631f5b3ee3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java @@ -67,9 +67,9 @@ protected IngestionConfig getIngestionConfig() { new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5 }, AirlineID, ArrDelayMinutes)"); ingestionConfig.setFilterConfig(filterConfig); List transformConfigs = Arrays.asList( - new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)"), - new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)"), - new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)")); + new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)", null, null), + new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)", null, null), + new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)", null, null)); ingestionConfig.setTransformConfigs(transformConfigs); return ingestionConfig; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 811e957fe147..0ce397d67b84 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -118,7 +118,7 @@ private void setupTableWithNonDefaultDatabase(List avroFiles) noDicCols.add(customCol); tableConfig.getIndexingConfig().setNoDictionaryColumns(noDicCols); IngestionConfig ingestionConfig = new IngestionConfig(); - ingestionConfig.setTransformConfigs(List.of(new TransformConfig(customCol, defaultCol))); + ingestionConfig.setTransformConfigs(List.of(new TransformConfig(customCol, defaultCol, null, null))); tableConfig.setIngestionConfig(ingestionConfig); addTableConfig(tableConfig); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 6fb23d96f66c..6f2831a746a9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1585,15 +1585,15 @@ private void reloadWithExtraColumns() TableConfig tableConfig = getOfflineTableConfig(); List transformConfigs = - Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"), - new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"), - new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0"), - new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"), - new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs"), - new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs"), - new TransformConfig("NewAddedRawDerivedStringDimension", "reverse(DestCityName)"), - new TransformConfig("NewAddedRawDerivedMVIntDimension", "array(ActualElapsedTime)"), - new TransformConfig("NewAddedDerivedMVDoubleDimension", "array(ArrDelayMinutes)")); + Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24", null, null), + new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000", null, null), + new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0", null, null), + new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')", null, null), + new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs", null, null), + new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs", null, null), + new TransformConfig("NewAddedRawDerivedStringDimension", "reverse(DestCityName)", null, null), + new TransformConfig("NewAddedRawDerivedMVIntDimension", "array(ActualElapsedTime)", null, null), + new TransformConfig("NewAddedDerivedMVDoubleDimension", "array(ArrDelayMinutes)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); tableConfig.setIngestionConfig(ingestionConfig); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java index e6c8ce270030..85a2d76ba009 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java @@ -114,7 +114,7 @@ public void setUp() TableConfig realtimeTableConfig = createRealtimeTableConfig(avroFiles.get(0)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs( - Collections.singletonList(new TransformConfig("ts", "fromEpochDays(DaysSinceEpoch)"))); + Collections.singletonList(new TransformConfig("ts", "fromEpochDays(DaysSinceEpoch)", null, null))); realtimeTableConfig.setIngestionConfig(ingestionConfig); FieldConfig tsFieldConfig = new FieldConfig("ts", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.TIMESTAMP, null, null, diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java index 7dd460d1f576..324dd511eb4b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java @@ -85,10 +85,12 @@ public String getTableName() { @Override public TableConfig createOfflineTableConfig() { List transformConfigs = Arrays.asList( - new TransformConfig(MY_MAP_STR_K1_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k1')"), - new TransformConfig(MY_MAP_STR_K2_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k2')"), + new TransformConfig(MY_MAP_STR_K1_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k1')", + null, null), + new TransformConfig(MY_MAP_STR_K2_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k2')", + null, null), new TransformConfig(COMPLEX_MAP_STR_K3_FIELD_NAME, - "jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')")); + "jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setIngestionConfig(ingestionConfig) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java index 4c5571c9a10a..1824a566e66b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java @@ -78,8 +78,10 @@ public Schema createSchema() { @Override public TableConfig createOfflineTableConfig() { List transformConfigs = Arrays.asList( - new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")"), - new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")")); + new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")", + null, null), + new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")", + null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setIngestionConfig(ingestionConfig) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java index 99388c1f0893..3605b0b00122 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java @@ -102,13 +102,13 @@ public void setUp() .setSortedColumn(D1).build(); IngestionConfig ingestionConfigEpochHours = new IngestionConfig(); ingestionConfigEpochHours.setTransformConfigs( - Collections.singletonList(new TransformConfig(T_TRX, "toEpochHours(t)"))); + Collections.singletonList(new TransformConfig(T_TRX, "toEpochHours(t)", null, null))); TableConfig tableConfigEpochHours = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX) .setSortedColumn(D1).setIngestionConfig(ingestionConfigEpochHours).build(); IngestionConfig ingestionConfigSDF = new IngestionConfig(); ingestionConfigSDF.setTransformConfigs( - Collections.singletonList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')"))); + Collections.singletonList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')", null, null))); TableConfig tableConfigSDF = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX) .setSortedColumn(D1).setIngestionConfig(ingestionConfigSDF).build(); diff --git a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java index a6e4b9c32f5b..5f331e002fc5 100644 --- a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java +++ b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java @@ -74,8 +74,9 @@ public void setup() { _ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(Collections.singletonList( Collections.singletonMap(BatchConfigProperties.OUTPUT_DIR_URI, _outputDir.getAbsolutePath())), "APPEND", "HOURLY")); - _ingestionConfig.setTransformConfigs(Arrays.asList(new TransformConfig("aSimpleMap_str", "jsonFormat(aSimpleMap)"), - new TransformConfig("anAdvancedMap_str", "jsonFormat(anAdvancedMap)"))); + _ingestionConfig.setTransformConfigs(Arrays.asList(new TransformConfig("aSimpleMap_str", + "jsonFormat(aSimpleMap)", null, null), + new TransformConfig("anAdvancedMap_str", "jsonFormat(anAdvancedMap)", null, null))); _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(_ingestionConfig) .setTimeColumnName(TIME_COLUMN_NAME).build(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index 0bf8fe571f18..9092247e3428 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -26,6 +26,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder; @@ -42,7 +43,6 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; public class RealtimeSegmentConverter { @@ -129,7 +129,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM recordReader.init(_realtimeSegmentImpl, sortedDocIds); RealtimeSegmentSegmentCreationDataSource dataSource = new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader); - driver.init(genConfig, dataSource, RecordEnricherPipeline.getPassThroughPipeline(), + driver.init(genConfig, dataSource, RecordTransformerPipeline.getPassThroughPipeline(), TransformPipeline.getPassThroughPipeline()); if (!_enableColumnMajor) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordEnricher.java similarity index 67% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordEnricher.java index 8e48ed71023a..d3d5d98af874 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordEnricher.java @@ -16,25 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; - -import java.util.List; -import org.apache.pinot.spi.data.readers.GenericRow; - +package org.apache.pinot.segment.local.recordtransformer; /** * Interface for enriching records. * If a column with the same name as the input column already exists in the record, it will be overwritten. */ -public interface RecordEnricher { - /** - * Returns the list of input columns required for enriching the record. - * This is used to make sure the required input fields are extracted. - */ - List getInputColumns(); - - /** - * Enriches the given record, by adding new columns to the same record. - */ - void enrich(GenericRow record); -} +//public interface RecordEnricher { +// /** +// * Returns the list of input columns required for enriching the record. +// * This is used to make sure the required input fields are extracted. +// */ +// List getInputColumns(); +// +// /** +// * Enriches the given record, by adding new columns to the same record. +// */ +// void enrich(GenericRow record); +//} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 72065132ae49..266d637ef237 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -19,6 +19,8 @@ package org.apache.pinot.segment.local.recordtransformer; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; @@ -35,6 +37,14 @@ default boolean isNoOp() { return false; } + /** + * Returns the list of input columns required for enriching the record. + * This is used to make sure the required input fields are extracted. + */ + default List getInputColumns() { + return new ArrayList<>(); + } + /** * Transforms a record based on some custom rules. * diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerConfig.java similarity index 86% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerConfig.java index adac697bdb6d..e4fef09af03c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerConfig.java @@ -16,8 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.segment.local.recordtransformer; -public interface RecordEnricherConfig { - void parse(); -} +//public interface RecordTransformerConfig { +// void parse(); +//} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerFactory.java similarity index 73% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerFactory.java index 4b55d0426013..fe604720ebfd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerFactory.java @@ -16,14 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.segment.local.recordtransformer; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; -public interface RecordEnricherFactory { - String getEnricherType(); - RecordEnricher createEnricher(JsonNode enricherProps) throws IOException; - void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig); +public interface RecordTransformerFactory { + String getTransformerType(); + RecordTransformer createTransformer(JsonNode enricherProps) throws IOException; + void validateTransformConfig(JsonNode enricherProps, RecordTransformerValidationConfig validationConfig); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerPipeline.java similarity index 57% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerPipeline.java index 5a50d685cafe..cdd339bddf09 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerPipeline.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.segment.local.recordtransformer; import java.io.IOException; import java.util.ArrayList; @@ -24,37 +24,39 @@ import java.util.List; import java.util.Set; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.readers.GenericRow; -public class RecordEnricherPipeline { - private final List _enrichers = new ArrayList<>(); +public class RecordTransformerPipeline { + private final List _enrichers = new ArrayList<>(); private final Set _columnsToExtract = new HashSet<>(); - public static RecordEnricherPipeline getPassThroughPipeline() { - return new RecordEnricherPipeline(); + public static RecordTransformerPipeline getPassThroughPipeline() { + return new RecordTransformerPipeline(); } - public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingestionConfig) { - RecordEnricherPipeline pipeline = new RecordEnricherPipeline(); - if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) { + public static RecordTransformerPipeline fromIngestionConfig(IngestionConfig ingestionConfig) { + RecordTransformerPipeline pipeline = new RecordTransformerPipeline(); + if (null == ingestionConfig || null == ingestionConfig.getTransformConfigs()) { return pipeline; } - List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); - for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { + List enrichmentConfigs = ingestionConfig.getTransformConfigs(); + for (TransformConfig transformConfig : enrichmentConfigs) { try { - RecordEnricher enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig); - pipeline.add(enricher); + if (transformConfig.getEnricherType() != null) { + RecordTransformer enricher = RecordTransformerRegistry.createRecordTransformer(transformConfig); + pipeline.add(enricher); + } } catch (IOException e) { - throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e); + throw new RuntimeException("Failed to instantiate record enricher " + transformConfig.getEnricherType(), e); } } return pipeline; } - public static RecordEnricherPipeline fromTableConfig(TableConfig tableConfig) { + public static RecordTransformerPipeline fromTableConfig(TableConfig tableConfig) { return fromIngestionConfig(tableConfig.getIngestionConfig()); } @@ -62,14 +64,14 @@ public Set getColumnsToExtract() { return _columnsToExtract; } - public void add(RecordEnricher enricher) { + public void add(RecordTransformer enricher) { _enrichers.add(enricher); _columnsToExtract.addAll(enricher.getInputColumns()); } public void run(GenericRow record) { - for (RecordEnricher enricher : _enrichers) { - enricher.enrich(record); + for (RecordTransformer enricher : _enrichers) { + enricher.transform(record); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerRegistry.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerRegistry.java new file mode 100644 index 000000000000..d2e63b6da64f --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerRegistry.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.recordtransformer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RecordTransformerRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(RecordTransformerRegistry.class); + private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); + + private RecordTransformerRegistry() { + } + + public static void validateTransformConfig(TransformConfig transformConfig, + RecordTransformerValidationConfig config) { + if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(transformConfig.getEnricherType())) { + throw new IllegalArgumentException("No record enricher found for type: " + transformConfig.getEnricherType()); + } + + RECORD_ENRICHER_FACTORY_MAP.get(transformConfig.getEnricherType()) + .validateTransformConfig(transformConfig.getProperties(), config); + } + + public static RecordTransformer createRecordTransformer(TransformConfig transformConfig) + throws IOException { + if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(transformConfig.getEnricherType())) { + throw new IllegalArgumentException("No record transformer found for type: " + transformConfig.getEnricherType()); + } + return RECORD_ENRICHER_FACTORY_MAP.get(transformConfig.getEnricherType()) + .createTransformer(transformConfig.getProperties()); + } + + static { + for (RecordTransformerFactory recordTransformerFactory : ServiceLoader.load(RecordTransformerFactory.class)) { + LOGGER.info("Registered record enricher factory type: {}", recordTransformerFactory.getTransformerType()); + RECORD_ENRICHER_FACTORY_MAP.put(recordTransformerFactory.getTransformerType(), recordTransformerFactory); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerValidationConfig.java similarity index 85% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerValidationConfig.java index 86fb74155e59..d280cddba1b5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerValidationConfig.java @@ -1,4 +1,4 @@ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.segment.local.recordtransformer; /** * Licensed to the Apache Software Foundation (ASF) under one @@ -22,10 +22,10 @@ /** * Interface for cluster constrains, which can be used to validate the record enricher configs */ -public class RecordEnricherValidationConfig { +public class RecordTransformerValidationConfig { private final boolean _groovyDisabled; - public RecordEnricherValidationConfig(boolean groovyDisabled) { + public RecordTransformerValidationConfig(boolean groovyDisabled) { _groovyDisabled = groovyDisabled; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/CLPEncodingTransformer.java similarity index 80% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/CLPEncodingTransformer.java index 790015d8af31..354a5616a1ca 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/CLPEncodingTransformer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.clp; +package org.apache.pinot.segment.local.recordtransformer.clp; import com.fasterxml.jackson.databind.JsonNode; import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions; @@ -24,8 +24,8 @@ import com.yscope.clp.compressorfrontend.MessageEncoder; import java.io.IOException; import java.util.List; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.recordenricher.RecordEnricher; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.sql.parsers.rewriter.ClpRewriter; import org.slf4j.Logger; @@ -33,20 +33,20 @@ /** - * Enriches the record with CLP encoded fields. + * Transforms the record with CLP encoded fields. * For a column 'x', it adds three new columns to the record: * 1. 'x_logtype' - The logtype of the encoded message * 2. 'x_dictVars' - The dictionary variables of the encoded message * 3. 'x_encodedVars' - The encoded variables of the encoded message */ -public class CLPEncodingEnricher implements RecordEnricher { - private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class); - private final ClpEnricherConfig _config; +public class CLPEncodingTransformer implements RecordTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingTransformer.class); + private final ClpTransformerConfig _config; private final EncodedMessage _clpEncodedMessage; private final MessageEncoder _clpMessageEncoder; - public CLPEncodingEnricher(JsonNode enricherProperties) throws IOException { - _config = JsonUtils.jsonNodeToObject(enricherProperties, ClpEnricherConfig.class); + public CLPEncodingTransformer(JsonNode transformerProperties) throws IOException { + _config = JsonUtils.jsonNodeToObject(transformerProperties, ClpTransformerConfig.class); _clpEncodedMessage = new EncodedMessage(); _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2, BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1); @@ -58,20 +58,21 @@ public List getInputColumns() { } @Override - public void enrich(GenericRow record) { + public GenericRow transform(GenericRow record) { try { for (String field : _config.getFields()) { Object value = record.getValue(field); if (value != null) { - enrichWithClpEncodedFields(field, value, record); + transformWithClpEncodedFields(field, value, record); } } } catch (Exception e) { - LOGGER.error("Failed to enrich record: {}", record); + LOGGER.error("Failed to transform record: {}", record); } + return record; } - private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) { + private void transformWithClpEncodedFields(String key, Object value, GenericRow to) { String logtype = null; Object[] dictVars = null; Object[] encodedVars = null; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/CLPEncodingTransformerFactory.java similarity index 60% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/CLPEncodingTransformerFactory.java index cacc4fdbc9b4..e1d2ec3b3a25 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/CLPEncodingTransformerFactory.java @@ -16,34 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.clp; +package org.apache.pinot.segment.local.recordtransformer.clp; import com.fasterxml.jackson.databind.JsonNode; import com.google.auto.service.AutoService; import java.io.IOException; -import org.apache.pinot.spi.recordenricher.RecordEnricher; -import org.apache.pinot.spi.recordenricher.RecordEnricherFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerFactory; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerValidationConfig; import org.apache.pinot.spi.utils.JsonUtils; -@AutoService(RecordEnricherFactory.class) -public class CLPEncodingEnricherFactory implements RecordEnricherFactory { +@AutoService(RecordTransformerFactory.class) +public class CLPEncodingTransformerFactory implements RecordTransformerFactory { private static final String ENRICHER_TYPE = "clpEnricher"; @Override - public String getEnricherType() { + public String getTransformerType() { return ENRICHER_TYPE; } @Override - public RecordEnricher createEnricher(JsonNode enricherProps) + public RecordTransformer createTransformer(JsonNode enricherProps) throws IOException { - return new CLPEncodingEnricher(enricherProps); + return new CLPEncodingTransformer(enricherProps); } @Override - public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) { + public void validateTransformConfig(JsonNode enricherProps, RecordTransformerValidationConfig validationConfig) { try { - ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class); + ClpTransformerConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpTransformerConfig.class); } catch (IOException e) { throw new IllegalArgumentException("Failed to parse clp enricher config", e); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/ClpTransformerConfig.java similarity index 86% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/ClpTransformerConfig.java index 93439602ecb2..de622e27a33a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/clp/ClpTransformerConfig.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.clp; +package org.apache.pinot.segment.local.recordtransformer.clp; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -26,11 +26,11 @@ /** * Configuration for the CLP enricher. */ -public class ClpEnricherConfig { +public class ClpTransformerConfig { private final List _fields; @JsonCreator - public ClpEnricherConfig(@JsonProperty("fields") List fields) { + public ClpTransformerConfig(@JsonProperty("fields") List fields) { _fields = fields; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformer.java similarity index 79% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformer.java index 92cf565220d9..45d14fc07512 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.pinot.plugin.record.enricher.function; +package org.apache.pinot.segment.local.recordtransformer.function; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; @@ -27,20 +27,21 @@ import java.util.Map; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.recordenricher.RecordEnricher; import org.apache.pinot.spi.utils.JsonUtils; /** - * Enriches the record with custom functions. + * Transforms the record with custom functions. */ -public class CustomFunctionEnricher implements RecordEnricher { +public class CustomFunctionTransformer implements RecordTransformer { private final Map _fieldToFunctionEvaluator; private final List _fieldsToExtract; - public CustomFunctionEnricher(JsonNode enricherProps) throws IOException { - CustomFunctionEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); + public CustomFunctionTransformer(JsonNode transformProps) throws IOException { + CustomFunctionTransformerConfig config = JsonUtils.jsonNodeToObject(transformProps, + CustomFunctionTransformerConfig.class); _fieldToFunctionEvaluator = new LinkedHashMap<>(); _fieldsToExtract = new ArrayList<>(); for (Map.Entry entry : config.getFieldToFunctionMap().entrySet()) { @@ -58,9 +59,10 @@ public List getInputColumns() { } @Override - public void enrich(GenericRow record) { + public GenericRow transform(GenericRow record) { _fieldToFunctionEvaluator.forEach((field, evaluator) -> { record.putValue(field, evaluator.evaluate(record)); }); + return record; } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformerConfig.java similarity index 86% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformerConfig.java index cc4270f60185..0aacbc3f319f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformerConfig.java @@ -16,20 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.function; +package org.apache.pinot.segment.local.recordtransformer.function; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.LinkedHashMap; /** - * Configuration for the custom function enricher. + * Configuration for the custom function transformer. */ -public class CustomFunctionEnricherConfig { +public class CustomFunctionTransformerConfig { private final LinkedHashMap _fieldToFunctionMap; @JsonCreator - public CustomFunctionEnricherConfig( + public CustomFunctionTransformerConfig( @JsonProperty("fieldToFunctionMap") LinkedHashMap columnTofunctionMap) { _fieldToFunctionMap = columnTofunctionMap; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformerFactory.java similarity index 63% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformerFactory.java index f77308190359..763491b4edf1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/function/CustomFunctionTransformerFactory.java @@ -16,46 +16,46 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.record.enricher.function; +package org.apache.pinot.segment.local.recordtransformer.function; import com.fasterxml.jackson.databind.JsonNode; import com.google.auto.service.AutoService; import java.io.IOException; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricher; -import org.apache.pinot.spi.recordenricher.RecordEnricherFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerFactory; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerValidationConfig; import org.apache.pinot.spi.utils.JsonUtils; -@AutoService(RecordEnricherFactory.class) -public class CustomFunctionEnricherFactory implements RecordEnricherFactory { +@AutoService(RecordTransformerFactory.class) +public class CustomFunctionTransformerFactory implements RecordTransformerFactory { private static final String TYPE = "generateColumn"; @Override - public String getEnricherType() { + public String getTransformerType() { return TYPE; } @Override - public RecordEnricher createEnricher(JsonNode enricherProps) + public RecordTransformer createTransformer(JsonNode transformProps) throws IOException { - return new CustomFunctionEnricher(enricherProps); + return new CustomFunctionTransformer(transformProps); } @Override - public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) { - CustomFunctionEnricherConfig config; + public void validateTransformConfig(JsonNode transformProps, RecordTransformerValidationConfig validationConfig) { + CustomFunctionTransformerConfig config; try { - config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); + config = JsonUtils.jsonNodeToObject(transformProps, CustomFunctionTransformerConfig.class); if (!validationConfig.isGroovyDisabled()) { return; } for (String function : config.getFieldToFunctionMap().values()) { if (FunctionEvaluatorFactory.isGroovyExpression(function)) { - throw new IllegalArgumentException("Groovy expression is not allowed for enrichment"); + throw new IllegalArgumentException("Groovy expression is not allowed for transform"); } } } catch (IOException e) { - throw new IllegalArgumentException("Failed to parse custom function enricher config", e); + throw new IllegalArgumentException("Failed to parse custom function transform config", e); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java index 219a207d94e9..8e77e00289fa 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java @@ -19,13 +19,13 @@ package org.apache.pinot.segment.local.segment.creator; import org.apache.pinot.common.Utils; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline; import org.apache.pinot.segment.local.segment.creator.impl.stats.SegmentPreIndexStatsCollectorImpl; import org.apache.pinot.segment.spi.creator.SegmentCreationDataSource; import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,15 +39,15 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class); private final RecordReader _recordReader; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformerPipeline _recordTransformerPipeline; private TransformPipeline _transformPipeline; public RecordReaderSegmentCreationDataSource(RecordReader recordReader) { _recordReader = recordReader; } - public void setRecordEnricherPipeline(RecordEnricherPipeline recordEnricherPipeline) { - _recordEnricherPipeline = recordEnricherPipeline; + public void setRecordEnricherPipeline(RecordTransformerPipeline recordTransformerPipeline) { + _recordTransformerPipeline = recordTransformerPipeline; } public void setTransformPipeline(TransformPipeline transformPipeline) { @@ -57,8 +57,9 @@ public void setTransformPipeline(TransformPipeline transformPipeline) { @Override public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) { try { - RecordEnricherPipeline recordEnricherPipeline = _recordEnricherPipeline != null ? _recordEnricherPipeline - : RecordEnricherPipeline.fromTableConfig(statsCollectorConfig.getTableConfig()); + RecordTransformerPipeline + recordTransformerPipeline = _recordTransformerPipeline != null ? _recordTransformerPipeline + : RecordTransformerPipeline.fromTableConfig(statsCollectorConfig.getTableConfig()); TransformPipeline transformPipeline = _transformPipeline != null ? _transformPipeline : new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema()); @@ -72,7 +73,7 @@ public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsColle reuse.clear(); reuse = _recordReader.next(reuse); - recordEnricherPipeline.run(reuse); + recordTransformerPipeline.run(reuse); transformPipeline.processRow(reuse, reusedResult); for (GenericRow row : reusedResult.getTransformedRows()) { collector.collectRow(row); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index fe404da09a93..ecd73eee2ef8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -39,6 +39,7 @@ import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline; import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.index.converter.SegmentFormatConverterFactory; @@ -80,7 +81,6 @@ import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.ReadMode; import org.slf4j.Logger; @@ -103,7 +103,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive private SegmentIndexCreationInfo _segmentIndexCreationInfo; private SegmentCreationDataSource _dataSource; private Schema _dataSchema; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformerPipeline _recordTransformerPipeline; private TransformPipeline _transformPipeline; private IngestionSchemaValidator _ingestionSchemaValidator; private int _totalDocs = 0; @@ -161,7 +161,7 @@ public RecordReader getRecordReader() { public void init(SegmentGeneratorConfig config, RecordReader recordReader) throws Exception { SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader); - init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), + init(config, dataSource, RecordTransformerPipeline.fromTableConfig(config.getTableConfig()), new TransformPipeline(config.getTableConfig(), config.getSchema())); } @@ -169,12 +169,12 @@ public void init(SegmentGeneratorConfig config, RecordReader recordReader) public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer) throws Exception { - init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), + init(config, dataSource, RecordTransformerPipeline.fromTableConfig(config.getTableConfig()), new TransformPipeline(recordTransformer, complexTypeTransformer)); } public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, - RecordEnricherPipeline enricherPipeline, + RecordTransformerPipeline enricherPipeline, TransformPipeline transformPipeline) throws Exception { _config = config; @@ -185,7 +185,7 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo if (config.isFailOnEmptySegment()) { Preconditions.checkState(_recordReader.hasNext(), "No record in data source"); } - _recordEnricherPipeline = enricherPipeline; + _recordTransformerPipeline = enricherPipeline; _transformPipeline = transformPipeline; // Use the same transform pipeline if the data source is backed by a record reader if (dataSource instanceof RecordReaderSegmentCreationDataSource) { @@ -281,7 +281,7 @@ public void build() // Should not be needed anymore. // Add row to indexes - _recordEnricherPipeline.run(decodedRow); + _recordTransformerPipeline.run(decodedRow); _transformPipeline.processRow(decodedRow, reusedResult); recordReadStopTimeNs = System.nanoTime(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index 556b0258558f..27910bc39d4f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -35,6 +35,7 @@ import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; @@ -65,7 +66,6 @@ import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; @@ -382,7 +382,7 @@ private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig i } } - fields.addAll(RecordEnricherPipeline.fromIngestionConfig(ingestionConfig).getColumnsToExtract()); + fields.addAll(RecordTransformerPipeline.fromIngestionConfig(ingestionConfig).getColumnsToExtract()); List transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { for (TransformConfig transformConfig : transformConfigs) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index d58106098db0..aa13c226f8a7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -45,6 +45,8 @@ import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerRegistry; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformerValidationConfig; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformerV2; import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; @@ -74,7 +76,6 @@ import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig; @@ -85,8 +86,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.ingestion.batch.BatchConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherRegistry; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; @@ -98,7 +97,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.segment.spi.AggregationFunctionType.*; +import static org.apache.pinot.segment.spi.AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH; +import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTCPCSKETCH; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTHLL; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTHLLPLUS; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTRAWCPCSKETCH; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTRAWHLL; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTRAWHLLPLUS; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTRAWULL; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTTHETASKETCH; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTULL; +import static org.apache.pinot.segment.spi.AggregationFunctionType.MAX; +import static org.apache.pinot.segment.spi.AggregationFunctionType.MIN; +import static org.apache.pinot.segment.spi.AggregationFunctionType.SUM; +import static org.apache.pinot.segment.spi.AggregationFunctionType.SUMPRECISION; +import static org.apache.pinot.segment.spi.AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH; /** @@ -543,20 +560,16 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc }); } - // Enrichment configs - List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); - if (enrichmentConfigs != null) { - for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { - RecordEnricherRegistry.validateEnrichmentConfig(enrichmentConfig, - new RecordEnricherValidationConfig(_disableGroovy)); - } - } - // Transform configs List transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { Set transformColumns = new HashSet<>(); for (TransformConfig transformConfig : transformConfigs) { + if (transformConfig.getEnricherType() != null) { + RecordTransformerRegistry.validateTransformConfig(transformConfig, + new RecordTransformerValidationConfig(_disableGroovy)); + continue; + } String columnName = transformConfig.getColumnName(); String transformFunction = transformConfig.getTransformFunction(); if (columnName == null || transformFunction == null) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java index 55d8d7172ff4..2e325768f4d8 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java @@ -55,12 +55,12 @@ public void testTransformConfigsFromTableConfig() { .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS").build(); List transformConfigs = Arrays.asList( - new TransformConfig("userId", "Groovy({user_id}, user_id)"), - new TransformConfig("fullName", "Groovy({firstName+' '+lastName}, firstName, lastName)"), - new TransformConfig("maxBid", "Groovy({bids.max{ it.toBigDecimal() }}, bids)"), - new TransformConfig("map2_keys", "Groovy({map2.sort()*.key}, map2)"), - new TransformConfig("map2_values", "Groovy({map2.sort()*.value}, map2)"), - new TransformConfig("hoursSinceEpoch", "Groovy({timestamp/(1000*60*60)}, timestamp)")); + new TransformConfig("userId", "Groovy({user_id}, user_id)", null, null), + new TransformConfig("fullName", "Groovy({firstName+' '+lastName}, firstName, lastName)", null, null), + new TransformConfig("maxBid", "Groovy({bids.max{ it.toBigDecimal() }}, bids)", null, null), + new TransformConfig("map2_keys", "Groovy({map2.sort()*.key}, map2)", null, null), + new TransformConfig("map2_values", "Groovy({map2.sort()*.value}, map2)", null, null), + new TransformConfig("hoursSinceEpoch", "Groovy({timestamp/(1000*60*60)}, timestamp)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions") @@ -149,9 +149,9 @@ public void testTransformConfigsFromSchema() { pinotSchema.getFieldSpecFor("hoursSinceEpoch").setTransformFunction("Groovy({timestamp/(1000)}, timestamp)"); List transformConfigs = Arrays.asList( - new TransformConfig("userId", "Groovy({user_id}, user_id)"), - new TransformConfig("fullName", "Groovy({firstName+' '+lastName}, firstName, lastName)"), - new TransformConfig("hoursSinceEpoch", "Groovy({timestamp/(1000*60*60)}, timestamp)")); + new TransformConfig("userId", "Groovy({user_id}, user_id)", null, null), + new TransformConfig("fullName", "Groovy({firstName+' '+lastName}, firstName, lastName)", null, null), + new TransformConfig("hoursSinceEpoch", "Groovy({timestamp/(1000*60*60)}, timestamp)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions") @@ -207,7 +207,7 @@ public void testValueAlreadyExists() { DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("fullName", FieldSpec.DataType.STRING, true); pinotSchema.addField(dimensionFieldSpec); List transformConfigs = Collections.singletonList( - new TransformConfig("fullName", "Groovy({firstName + ' ' + lastName}, firstName, lastName)")); + new TransformConfig("fullName", "Groovy({firstName + ' ' + lastName}, firstName, lastName)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); TableConfig tableConfig = @@ -247,11 +247,11 @@ public void testTransformFunctionSortOrder() { .addSingleValueDimension("d", FieldSpec.DataType.STRING).addSingleValueDimension("e", FieldSpec.DataType.STRING) .addSingleValueDimension("f", FieldSpec.DataType.STRING).build(); List transformConfigs = Arrays.asList( - new TransformConfig("d", "plus(x, 10)"), - new TransformConfig("b", "plus(d, 10)"), - new TransformConfig("a", "plus(b, 10)"), - new TransformConfig("c", "plus(a, d)"), - new TransformConfig("f", "plus(e, 10)")); + new TransformConfig("d", "plus(x, 10)", null, null), + new TransformConfig("b", "plus(d, 10)", null, null), + new TransformConfig("a", "plus(b, 10)", null, null), + new TransformConfig("c", "plus(a, d)", null, null), + new TransformConfig("f", "plus(e, 10)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testDerivedFunctions") @@ -278,8 +278,8 @@ public void testMultipleTransformFunctionSortOrder() { .build(); List transformConfigs = Arrays.asList( - new TransformConfig("a", "plus(b,10)"), - new TransformConfig("a", "plus(c,10)")); + new TransformConfig("a", "plus(b,10)", null, null), + new TransformConfig("a", "plus(c,10)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); TableConfig tableConfig = @@ -299,10 +299,10 @@ public void testNonCyclicTransformFunctionSortOrder() { // Define transform function dependencies: a -> (b,c), b -> d, d -> e, c -> (d,e) List transformConfigs = Arrays.asList( - new TransformConfig("a", "plus(b,c)"), - new TransformConfig("b", "plus(d,10)"), - new TransformConfig("d", "plus(e,10)"), - new TransformConfig("c", "plus(d,e)")); + new TransformConfig("a", "plus(b,c)", null, null), + new TransformConfig("b", "plus(d,10)", null, null), + new TransformConfig("d", "plus(e,10)", null, null), + new TransformConfig("c", "plus(d,e)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); TableConfig tableConfig = @@ -328,9 +328,9 @@ public void testCyclicTransformFunctionSortOrder() { // Define transform function dependencies: a -> b, b -> c, c -> a List transformConfigs = Arrays.asList( - new TransformConfig("a", "plus(b,10)"), - new TransformConfig("b", "plus(c,10)"), - new TransformConfig("c", "plus(a,10)")); + new TransformConfig("a", "plus(b,10)", null, null), + new TransformConfig("b", "plus(c,10)", null, null), + new TransformConfig("c", "plus(a,10)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); TableConfig tableConfig = @@ -345,7 +345,7 @@ public void testTransformFunctionWithWrongInput() { DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.INT, true); pinotSchema.addField(dimensionFieldSpec); List transformConfigs = Collections.singletonList( - new TransformConfig("y", "plus(x, 10)")); + new TransformConfig("y", "plus(x, 10)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); TableConfig tableConfig = @@ -375,7 +375,7 @@ public void testTransformFunctionContinueOnError() { DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.INT, true); pinotSchema.addField(dimensionFieldSpec); List transformConfigs = Collections.singletonList( - new TransformConfig("y", "plus(x, 10)")); + new TransformConfig("y", "plus(x, 10)", null, null)); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); ingestionConfig.setContinueOnError(true); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java index 696f2b1f48bb..aad3fac86dfb 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java @@ -523,7 +523,7 @@ public void testOrderForTransformers() { new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig) .setTimeColumnName("timeCol").build(); ingestionConfig.setFilterConfig(new FilterConfig("svInt = 123 AND svDouble <= 200")); - ingestionConfig.setTransformConfigs(List.of(new TransformConfig("expressionTestColumn", "plus(x,10)"))); + ingestionConfig.setTransformConfigs(List.of(new TransformConfig("expressionTestColumn", "plus(x,10)", null, null))); ingestionConfig.setSchemaConformingTransformerConfig( new SchemaConformingTransformerConfig("indexableExtras", null, null, null)); ingestionConfig.setRowTimeValueCheck(true); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index 80178b3fdaef..2da0074d57c6 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -1104,8 +1104,8 @@ public void testV1UpdateDefaultColumns() IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs( ImmutableList.of( - new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)"), - new TransformConfig(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME, "reverse(column3)") + new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)", null, null), + new TransformConfig(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME, "reverse(column3)", null, null) )); _tableConfig.setIngestionConfig(ingestionConfig); _indexLoadingConfig.addInvertedIndexColumns(NEW_COLUMN_INVERTED_INDEX); @@ -1155,8 +1155,8 @@ public void testV3UpdateDefaultColumns() IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs( ImmutableList.of( - new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)"), - new TransformConfig(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME, "reverse(column3)") + new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)", null, null), + new TransformConfig(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME, "reverse(column3)", null, null) )); _tableConfig.setIngestionConfig(ingestionConfig); _indexLoadingConfig.addInvertedIndexColumns(NEW_COLUMN_INVERTED_INDEX); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java index 817ce0362a34..c9f5617b48d3 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java @@ -152,13 +152,14 @@ public void testExtractFieldsIngestionConfig() { schema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING).build(); ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs( - Collections.singletonList(new TransformConfig("d1", "Groovy({function}, argument1, argument2)"))); + Collections.singletonList(new TransformConfig("d1", "Groovy({function}, argument1, argument2)", null, null))); List extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 3); Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1", "argument2"))); // groovy function, no arguments - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("d1", "Groovy({function})"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("d1", + "Groovy({function})", null, null))); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 1); Assert.assertTrue(extract.contains("d1")); @@ -166,7 +167,7 @@ public void testExtractFieldsIngestionConfig() { // inbuilt functions schema = new Schema.SchemaBuilder().addSingleValueDimension("hoursSinceEpoch", FieldSpec.DataType.LONG).build(); ingestionConfig.setTransformConfigs( - Collections.singletonList(new TransformConfig("hoursSinceEpoch", "toEpochHours(timestampColumn)"))); + Collections.singletonList(new TransformConfig("hoursSinceEpoch", "toEpochHours(timestampColumn)", null, null))); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Arrays.asList("timestampColumn", "hoursSinceEpoch"))); @@ -175,7 +176,7 @@ public void testExtractFieldsIngestionConfig() { schema = new Schema.SchemaBuilder().addSingleValueDimension("tenMinutesSinceEpoch", FieldSpec.DataType.LONG).build(); ingestionConfig.setTransformConfigs(Collections.singletonList( - new TransformConfig("tenMinutesSinceEpoch", "toEpochMinutesBucket(timestampColumn, 10)"))); + new TransformConfig("tenMinutesSinceEpoch", "toEpochMinutesBucket(timestampColumn, 10)", null, null))); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Lists.newArrayList("tenMinutesSinceEpoch", "timestampColumn"))); @@ -184,7 +185,8 @@ public void testExtractFieldsIngestionConfig() { schema = new Schema.SchemaBuilder().addDateTime("dateColumn", FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build(); ingestionConfig.setTransformConfigs( - Collections.singletonList(new TransformConfig("dateColumn", "toDateTime(timestampColumn, 'yyyy-MM-dd')"))); + Collections.singletonList(new TransformConfig("dateColumn", "toDateTime(timestampColumn, 'yyyy-MM-dd')", + null, null))); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Lists.newArrayList("dateColumn", "timestampColumn"))); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 8062b422f0b8..356464780096 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -288,7 +288,8 @@ public void validateIngestionConfig() { TableConfigUtils.validate(tableConfig, schema); // transformed column not in schema - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", "reverse(anotherCol)"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", + "reverse(anotherCol)", null, null))); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for transformedColumn not present in schema"); @@ -302,7 +303,7 @@ public void validateIngestionConfig() { tableConfig.setIndexingConfig(indexingConfig); schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("twiceSum", FieldSpec.DataType.DOUBLE).build(); - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("twice", "col * 2"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("twice", "col * 2", null, null))); ingestionConfig.setAggregationConfigs(Collections.singletonList(new AggregationConfig("twiceSum", "SUM(twice)"))); TableConfigUtils.validate(tableConfig, schema); @@ -312,15 +313,16 @@ public void validateIngestionConfig() { .build(); indexingConfig.setNoDictionaryColumns(List.of("myCol")); ingestionConfig.setAggregationConfigs(null); - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", "reverse(anotherCol)"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", + "reverse(anotherCol)", null, null))); TableConfigUtils.validate(tableConfig, schema); // valid transform configs schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) .addMetric("transformedCol", FieldSpec.DataType.LONG).build(); - ingestionConfig.setTransformConfigs(Arrays.asList(new TransformConfig("myCol", "reverse(anotherCol)"), - new TransformConfig("transformedCol", "Groovy({x+y}, x, y)"))); + ingestionConfig.setTransformConfigs(Arrays.asList(new TransformConfig("myCol", "reverse(anotherCol)", null, null), + new TransformConfig("transformedCol", "Groovy({x+y}, x, y)", null, null))); TableConfigUtils.validate(tableConfig, schema); // invalid transform config since Groovy is disabled @@ -372,7 +374,8 @@ public void validateIngestionConfig() { // null transform column name ingestionConfig.setFilterConfig(null); - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig(null, "reverse(anotherCol)"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig(null, + "reverse(anotherCol)", null, null))); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for null column name in transform config"); @@ -381,7 +384,7 @@ public void validateIngestionConfig() { } // null transform function string - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", null))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", null, null, null))); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for null transform function in transform config"); @@ -390,7 +393,8 @@ public void validateIngestionConfig() { } // invalid function - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", "fakeFunction(col)"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", + "fakeFunction(col)", null, null))); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for invalid transform function in transform config"); @@ -399,7 +403,8 @@ public void validateIngestionConfig() { } // invalid function - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", "Groovy(badExpr)"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", + "Groovy(badExpr)", null, null))); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for invalid transform function in transform config"); @@ -408,7 +413,8 @@ public void validateIngestionConfig() { } // input field name used as destination field - ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", "reverse(myCol)"))); + ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("myCol", + "reverse(myCol)", null, null))); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail due to use of myCol as arguments and columnName"); @@ -418,7 +424,8 @@ public void validateIngestionConfig() { // input field name used as destination field ingestionConfig.setTransformConfigs( - Collections.singletonList(new TransformConfig("myCol", "Groovy({x + y + myCol}, x, myCol, y)"))); + Collections.singletonList(new TransformConfig("myCol", "Groovy({x + y + myCol}, x, myCol, y)", + null, null))); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail due to use of myCol as arguments and columnName"); @@ -428,7 +435,10 @@ public void validateIngestionConfig() { // duplicate transform config ingestionConfig.setTransformConfigs( - Arrays.asList(new TransformConfig("myCol", "reverse(x)"), new TransformConfig("myCol", "lower(y)"))); + Arrays.asList(new TransformConfig("myCol", "reverse(x)", null, null), + new TransformConfig("myCol", "lower(y)", + null, + null))); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail due to duplicate transform config"); @@ -437,8 +447,9 @@ public void validateIngestionConfig() { } // derived columns - should pass - ingestionConfig.setTransformConfigs(Arrays.asList(new TransformConfig("transformedCol", "reverse(x)"), - new TransformConfig("myCol", "lower(transformedCol)"))); + ingestionConfig.setTransformConfigs(Arrays.asList(new TransformConfig("transformedCol", + "reverse(x)", null, null), + new TransformConfig("myCol", "lower(transformedCol)", null, null))); TableConfigUtils.validate(tableConfig, schema); // invalid field name in schema with matching prefix from complexConfigType's prefixesToRename diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java index 9b295a78e452..b69468530cec 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java @@ -18,32 +18,25 @@ */ package org.apache.pinot.spi.config.table.ingestion; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyDescription; -import com.fasterxml.jackson.databind.JsonNode; -import org.apache.pinot.spi.config.BaseJsonConfig; - - -public class EnrichmentConfig extends BaseJsonConfig { - @JsonPropertyDescription("Enricher type") - private final String _enricherType; - - @JsonPropertyDescription("Enricher properties") - private final JsonNode _properties; - - @JsonCreator - public EnrichmentConfig(@JsonProperty("enricherType") String enricherType, - @JsonProperty("properties") JsonNode properties) { - _enricherType = enricherType; - _properties = properties; - } - - public String getEnricherType() { - return _enricherType; - } - - public JsonNode getProperties() { - return _properties; - } -} +//public class EnrichmentConfig extends BaseJsonConfig { +// @JsonPropertyDescription("Enricher type") +// private final String _enricherType; +// +// @JsonPropertyDescription("Enricher properties") +// private final JsonNode _properties; +// +// @JsonCreator +// public EnrichmentConfig(@JsonProperty("enricherType") String enricherType, +// @JsonProperty("properties") JsonNode properties) { +// _enricherType = enricherType; +// _properties = properties; +// } +// +// public String getEnricherType() { +// return _enricherType; +// } +// +// public JsonNode getProperties() { +// return _properties; +// } +//} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java index 358cf35a43ac..84e35eb4b2aa 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java @@ -39,8 +39,8 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Config related to filtering records during ingestion") private FilterConfig _filterConfig; - @JsonPropertyDescription("Config related to enriching records during ingestion") - private List _enrichmentConfigs; +// @JsonPropertyDescription("Config related to enriching records during ingestion") +// private List _enrichmentConfigs; @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion") private List _transformConfigs; @@ -69,7 +69,7 @@ public class IngestionConfig extends BaseJsonConfig { @Deprecated public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig, @Nullable StreamIngestionConfig streamIngestionConfig, @Nullable FilterConfig filterConfig, - @Nullable List enrichmentConfigs, +// @Nullable List enrichmentConfigs, @Nullable List transformConfigs, @Nullable ComplexTypeConfig complexTypeConfig, @Nullable SchemaConformingTransformerConfig schemaConformingTransformerConfig, @Nullable SchemaConformingTransformerV2Config schemaConformingTransformerV2Config, @@ -77,7 +77,7 @@ public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig, _batchIngestionConfig = batchIngestionConfig; _streamIngestionConfig = streamIngestionConfig; _filterConfig = filterConfig; - _enrichmentConfigs = enrichmentConfigs; +// _enrichmentConfigs = enrichmentConfigs; _transformConfigs = transformConfigs; _complexTypeConfig = complexTypeConfig; _schemaConformingTransformerConfig = schemaConformingTransformerConfig; @@ -103,10 +103,10 @@ public FilterConfig getFilterConfig() { return _filterConfig; } - @Nullable - public List getEnrichmentConfigs() { - return _enrichmentConfigs; - } +// @Nullable +// public List getEnrichmentConfigs() { +// return _enrichmentConfigs; +// } @Nullable public List getTransformConfigs() { @@ -157,9 +157,9 @@ public void setFilterConfig(FilterConfig filterConfig) { _filterConfig = filterConfig; } - public void setEnrichmentConfigs(List enrichmentConfigs) { - _enrichmentConfigs = enrichmentConfigs; - } +// public void setEnrichmentConfigs(List enrichmentConfigs) { +// _enrichmentConfigs = enrichmentConfigs; +// } public void setTransformConfigs(List transformConfigs) { _transformConfigs = transformConfigs; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/TransformConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/TransformConfig.java index 468308d196ce..02e072bad5d1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/TransformConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/TransformConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.pinot.spi.config.BaseJsonConfig; @@ -29,6 +30,12 @@ */ public class TransformConfig extends BaseJsonConfig { + @JsonPropertyDescription("Enricher type") + private final String _enricherType; + + @JsonPropertyDescription("Enricher properties") + private final JsonNode _properties; + @JsonPropertyDescription("Column name") private final String _columnName; @@ -37,9 +44,20 @@ public class TransformConfig extends BaseJsonConfig { @JsonCreator public TransformConfig(@JsonProperty("columnName") String columnName, - @JsonProperty("transformFunction") String transformFunction) { + @JsonProperty("transformFunction") String transformFunction, @JsonProperty("enricherType") String enricherType, + @JsonProperty("properties") JsonNode properties) { _columnName = columnName; _transformFunction = transformFunction; + _enricherType = enricherType; + _properties = properties; + } + + public String getEnricherType() { + return _enricherType; + } + + public JsonNode getProperties() { + return _properties; } public String getColumnName() { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java deleted file mode 100644 index 59c1e6fb348d..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.recordenricher; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.ServiceLoader; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class RecordEnricherRegistry { - private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class); - private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); - - private RecordEnricherRegistry() { - } - - public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, - RecordEnricherValidationConfig config) { - if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { - throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); - } - - RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) - .validateEnrichmentConfig(enrichmentConfig.getProperties(), config); - } - - public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig) - throws IOException { - if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { - throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); - } - return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) - .createEnricher(enrichmentConfig.getProperties()); - } - - static { - for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) { - LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType()); - RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory); - } - } -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimestampIndexUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimestampIndexUtils.java index f34c7db57011..bd0172562c51 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimestampIndexUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimestampIndexUtils.java @@ -188,7 +188,8 @@ public static void applyTimestampIndex(TableConfig tableConfig, Schema schema) { } if (!tableConfigApplied) { transformConfigs.add( - new TransformConfig(columnWithGranularity, getTransformExpression(timestampColumn, granularity))); + new TransformConfig(columnWithGranularity, getTransformExpression(timestampColumn, granularity), + null, null)); rangeIndexColumns.add(columnWithGranularity); } }