diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java index 0a59696b10e1..701dc59773ad 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java @@ -62,12 +62,11 @@ public class TableConfigUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigUtils.class); + private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing"; private TableConfigUtils() { } - private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing"; - public static TableConfig fromZNRecord(ZNRecord znRecord) throws IOException { Map simpleFields = znRecord.getSimpleFields(); @@ -80,8 +79,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord) Preconditions.checkState(tableType != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TABLE_TYPE_KEY); String validationConfigString = simpleFields.get(TableConfig.VALIDATION_CONFIG_KEY); - Preconditions - .checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.VALIDATION_CONFIG_KEY); + Preconditions.checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, + TableConfig.VALIDATION_CONFIG_KEY); SegmentsValidationAndRetentionConfig validationConfig = JsonUtils.stringToObject(validationConfigString, SegmentsValidationAndRetentionConfig.class); @@ -90,8 +89,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord) TenantConfig tenantConfig = JsonUtils.stringToObject(tenantConfigString, TenantConfig.class); String indexingConfigString = simpleFields.get(TableConfig.INDEXING_CONFIG_KEY); - Preconditions - .checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.INDEXING_CONFIG_KEY); + Preconditions.checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, + TableConfig.INDEXING_CONFIG_KEY); IndexingConfig indexingConfig = JsonUtils.stringToObject(indexingConfigString, IndexingConfig.class); String customConfigString = simpleFields.get(TableConfig.CUSTOM_CONFIG_KEY); @@ -180,14 +179,16 @@ public static TableConfig fromZNRecord(ZNRecord znRecord) String instancePartitionsMapString = simpleFields.get(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY); if (instancePartitionsMapString != null) { instancePartitionsMap = JsonUtils.stringToObject(instancePartitionsMapString, - new TypeReference>() { }); + new TypeReference>() { + }); } Map segmentAssignmentConfigMap = null; String segmentAssignmentConfigMapString = simpleFields.get(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY); if (segmentAssignmentConfigMapString != null) { segmentAssignmentConfigMap = JsonUtils.stringToObject(segmentAssignmentConfigMapString, - new TypeReference>() { }); + new TypeReference>() { + }); } return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, @@ -228,8 +229,8 @@ public static ZNRecord toZNRecord(TableConfig tableConfig) } Map instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); if (instanceAssignmentConfigMap != null) { - simpleFields - .put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(instanceAssignmentConfigMap)); + simpleFields.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, + JsonUtils.objectToString(instanceAssignmentConfigMap)); } List fieldConfigList = tableConfig.getFieldConfigList(); if (fieldConfigList != null) { @@ -263,11 +264,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig) simpleFields.put(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY, JsonUtils.objectToString(tableConfig.getInstancePartitionsMap())); } - Map segmentAssignmentConfigMap = - tableConfig.getSegmentAssignmentConfigMap(); + Map segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap(); if (segmentAssignmentConfigMap != null) { - simpleFields - .put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(segmentAssignmentConfigMap)); + simpleFields.put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY, + JsonUtils.objectToString(segmentAssignmentConfigMap)); } ZNRecord znRecord = new ZNRecord(tableConfig.getTableName()); @@ -443,8 +443,8 @@ public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig */ public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig, InstancePartitionsType instancePartitionsType) { - return hasPreConfiguredInstancePartitions(tableConfig) - && tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType); + return hasPreConfiguredInstancePartitions(tableConfig) && tableConfig.getInstancePartitionsMap() + .containsKey(instancePartitionsType); } /** diff --git a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java index e9bd29701b0b..304e4485840e 100644 --- a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java +++ b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java @@ -53,7 +53,6 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +77,7 @@ public class FlinkSegmentWriter implements SegmentWriter { private String _outputDirURI; private Schema _schema; private Set _fieldsToRead; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformer _recordEnricherPipeline; private RecordTransformer _recordTransformer; private File _stagingDir; @@ -139,7 +138,7 @@ public void init(TableConfig tableConfig, Schema schema, Map bat _schema = schema; _fieldsToRead = _schema.getColumnNames(); - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig); + _recordEnricherPipeline = RecordTransformer.fromTableConfig(_tableConfig); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); _avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema); _reusableRecord = new GenericData.Record(_avroSchema); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 3290c5e4f3c0..0efece511011 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -55,6 +55,7 @@ import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable; import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; @@ -76,7 +77,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.metrics.PinotMeter; import org.apache.pinot.spi.plugin.PluginManager; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; @@ -275,7 +275,7 @@ public void deleteSegmentFile() { private final int _partitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; - private final RecordEnricherPipeline _recordEnricherPipeline; + private final RecordTransformer _recordEnricherPipeline; private final TransformPipeline _transformPipeline; private PartitionGroupConsumer _partitionGroupConsumer = null; private StreamMetadataProvider _partitionMetadataProvider = null; @@ -1514,10 +1514,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf } try { - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig); + _recordEnricherPipeline = RecordTransformer.fromTableConfig(tableConfig); } catch (Exception e) { _realtimeTableDataManager.addSegmentError(_segmentNameStr, - new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e)); + new SegmentErrorInfo(now(), "Failed to initialize the Record Transformer pipeline", e)); throw e; } _transformPipeline = new TransformPipeline(tableConfig, schema); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index def4f75b290e..b2770a003658 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -43,7 +43,6 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -290,7 +289,7 @@ private List generateSegment(Map partitionT GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId); SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), - RecordEnricherPipeline.getPassThroughPipeline(), + RecordTransformer.getPassThroughPipeline(), TransformPipeline.getPassThroughPipeline()); driver.build(); outputSegmentDirs.add(driver.getOutputDirectory()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 77b651d943aa..df0989c04b2d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -48,7 +48,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +68,7 @@ public class SegmentMapper { private final List _fieldSpecs; private final boolean _includeNullFields; private final int _numSortFields; - private final RecordEnricherPipeline _recordEnricherPipeline; + private final RecordTransformer _recordEnricherPipeline; private final CompositeTransformer _recordTransformer; private final TimeHandler _timeHandler; private final Partitioner[] _partitioners; @@ -94,7 +93,7 @@ public SegmentMapper(List recordReaderFileConfigs, _fieldSpecs = pair.getLeft(); _numSortFields = pair.getRight(); _includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled(); - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig); + _recordEnricherPipeline = RecordTransformer.fromTableConfig(tableConfig); _recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema); _timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig); List partitionerConfigs = processorConfig.getPartitionerConfigs(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java similarity index 50% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java rename to pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java index 59c1e6fb348d..8cbe959de355 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java @@ -16,45 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.plugin.record.enricher; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RecordEnricherRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class); - private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); + private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); private RecordEnricherRegistry() { } - public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, - RecordEnricherValidationConfig config) { - if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { - throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); - } - - RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) - .validateEnrichmentConfig(enrichmentConfig.getProperties(), config); - } - - public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig) - throws IOException { - if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { - throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); - } - return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) - .createEnricher(enrichmentConfig.getProperties()); + public static Map getRecordEnricherFactoryMap() { + return RECORD_ENRICHER_FACTORY_MAP; } static { - for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) { + for (RecordTransformer recordEnricherFactory : ServiceLoader.load(RecordTransformer.class)) { LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType()); RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java index 790015d8af31..637ad2498357 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java @@ -24,8 +24,8 @@ import com.yscope.clp.compressorfrontend.MessageEncoder; import java.io.IOException; import java.util.List; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.recordenricher.RecordEnricher; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.sql.parsers.rewriter.ClpRewriter; import org.slf4j.Logger; @@ -39,11 +39,12 @@ * 2. 'x_dictVars' - The dictionary variables of the encoded message * 3. 'x_encodedVars' - The encoded variables of the encoded message */ -public class CLPEncodingEnricher implements RecordEnricher { +public class CLPEncodingEnricher implements RecordTransformer { private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class); private final ClpEnricherConfig _config; private final EncodedMessage _clpEncodedMessage; private final MessageEncoder _clpMessageEncoder; + private static final String ENRICHER_TYPE = "clpEnricher"; public CLPEncodingEnricher(JsonNode enricherProperties) throws IOException { _config = JsonUtils.jsonNodeToObject(enricherProperties, ClpEnricherConfig.class); @@ -58,7 +59,7 @@ public List getInputColumns() { } @Override - public void enrich(GenericRow record) { + public GenericRow transform(GenericRow record) { try { for (String field : _config.getFields()) { Object value = record.getValue(field); @@ -69,6 +70,7 @@ public void enrich(GenericRow record) { } catch (Exception e) { LOGGER.error("Failed to enrich record: {}", record); } + return record; } private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) { @@ -97,4 +99,24 @@ private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars); to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars); } + + @Override + public String getEnricherType() { + return ENRICHER_TYPE; + } + + @Override + public RecordTransformer createEnricher(JsonNode enricherProps) + throws IOException { + return new CLPEncodingEnricher(enricherProps); + } + + @Override + public void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) { + try { + ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse clp enricher config", e); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java deleted file mode 100644 index cacc4fdbc9b4..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.record.enricher.clp; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.auto.service.AutoService; -import java.io.IOException; -import org.apache.pinot.spi.recordenricher.RecordEnricher; -import org.apache.pinot.spi.recordenricher.RecordEnricherFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; -import org.apache.pinot.spi.utils.JsonUtils; - -@AutoService(RecordEnricherFactory.class) -public class CLPEncodingEnricherFactory implements RecordEnricherFactory { - private static final String ENRICHER_TYPE = "clpEnricher"; - @Override - public String getEnricherType() { - return ENRICHER_TYPE; - } - - @Override - public RecordEnricher createEnricher(JsonNode enricherProps) - throws IOException { - return new CLPEncodingEnricher(enricherProps); - } - - @Override - public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) { - try { - ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to parse clp enricher config", e); - } - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java index 92cf565220d9..fbcbea860bd1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java @@ -27,17 +27,22 @@ import java.util.Map; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.recordenricher.RecordEnricher; import org.apache.pinot.spi.utils.JsonUtils; /** * Enriches the record with custom functions. */ -public class CustomFunctionEnricher implements RecordEnricher { +public class CustomFunctionEnricher implements RecordTransformer { private final Map _fieldToFunctionEvaluator; private final List _fieldsToExtract; + private static final String TYPE = "generateColumn"; + @Override + public String getEnricherType() { + return TYPE; + } public CustomFunctionEnricher(JsonNode enricherProps) throws IOException { CustomFunctionEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); @@ -58,9 +63,34 @@ public List getInputColumns() { } @Override - public void enrich(GenericRow record) { + public GenericRow transform(GenericRow record) { _fieldToFunctionEvaluator.forEach((field, evaluator) -> { record.putValue(field, evaluator.evaluate(record)); }); + return record; + } + + @Override + public RecordTransformer createEnricher(JsonNode enricherProps) + throws IOException { + return new CustomFunctionEnricher(enricherProps); + } + + @Override + public void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) { + CustomFunctionEnricherConfig config; + try { + config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); + if (!validationConfig) { + return; + } + for (String function : config.getFieldToFunctionMap().values()) { + if (FunctionEvaluatorFactory.isGroovyExpression(function)) { + throw new IllegalArgumentException("Groovy expression is not allowed for enrichment"); + } + } + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse custom function enricher config", e); + } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java deleted file mode 100644 index f77308190359..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.record.enricher.function; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.auto.service.AutoService; -import java.io.IOException; -import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricher; -import org.apache.pinot.spi.recordenricher.RecordEnricherFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; -import org.apache.pinot.spi.utils.JsonUtils; - -@AutoService(RecordEnricherFactory.class) -public class CustomFunctionEnricherFactory implements RecordEnricherFactory { - private static final String TYPE = "generateColumn"; - @Override - public String getEnricherType() { - return TYPE; - } - - @Override - public RecordEnricher createEnricher(JsonNode enricherProps) - throws IOException { - return new CustomFunctionEnricher(enricherProps); - } - - @Override - public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) { - CustomFunctionEnricherConfig config; - try { - config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); - if (!validationConfig.isGroovyDisabled()) { - return; - } - for (String function : config.getFieldToFunctionMap().values()) { - if (FunctionEvaluatorFactory.isGroovyExpression(function)) { - throw new IllegalArgumentException("Groovy expression is not allowed for enrichment"); - } - } - } catch (IOException e) { - throw new IllegalArgumentException("Failed to parse custom function enricher config", e); - } - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index 0bf8fe571f18..a79f05dcca72 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -26,6 +26,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder; @@ -42,7 +43,6 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; public class RealtimeSegmentConverter { @@ -74,13 +74,27 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, SegmentZKPro _nullHandlingEnabled = nullHandlingEnabled; if (_tableConfig.getIngestionConfig() != null && _tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { - _enableColumnMajor = _tableConfig.getIngestionConfig() - .getStreamIngestionConfig().getColumnMajorSegmentBuilderEnabled(); + _enableColumnMajor = + _tableConfig.getIngestionConfig().getStreamIngestionConfig().getColumnMajorSegmentBuilderEnabled(); } else { _enableColumnMajor = _tableConfig.getIndexingConfig().isColumnMajorSegmentBuilderEnabled(); } } + /** + * Returns a new schema containing only physical columns + */ + @VisibleForTesting + public static Schema getUpdatedSchema(Schema original) { + Schema newSchema = new Schema(); + for (FieldSpec fieldSpec : original.getAllFieldSpecs()) { + if (!fieldSpec.isVirtualColumn()) { + newSchema.addField(fieldSpec); + } + } + return newSchema; + } + public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverMetrics) throws Exception { SegmentGeneratorConfig genConfig = new SegmentGeneratorConfig(_tableConfig, _dataSchema); @@ -129,7 +143,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM recordReader.init(_realtimeSegmentImpl, sortedDocIds); RealtimeSegmentSegmentCreationDataSource dataSource = new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader); - driver.init(genConfig, dataSource, RecordEnricherPipeline.getPassThroughPipeline(), + driver.init(genConfig, dataSource, RecordTransformer.getPassThroughPipeline(), TransformPipeline.getPassThroughPipeline()); if (!_enableColumnMajor) { @@ -159,20 +173,6 @@ private void addIndexOrDefault(SegmentGeneratorConfig ge } } - /** - * Returns a new schema containing only physical columns - */ - @VisibleForTesting - public static Schema getUpdatedSchema(Schema original) { - Schema newSchema = new Schema(); - for (FieldSpec fieldSpec : original.getAllFieldSpecs()) { - if (!fieldSpec.isVirtualColumn()) { - newSchema.addField(fieldSpec); - } - } - return newSchema; - } - public boolean isColumnMajorEnabled() { return _enableColumnMajor; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 72065132ae49..d3c906009e39 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -18,15 +18,79 @@ */ package org.apache.pinot.segment.local.recordtransformer; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.plugin.record.enricher.RecordEnricherRegistry; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The record transformer which takes a {@link GenericRow} and transform it based on some custom rules. */ public interface RecordTransformer extends Serializable { + final boolean GROOVY_DISABLED = false; + + static final Logger LOGGER = LoggerFactory.getLogger(RecordTransformer.class); + Map RECORD_ENRICHER_FACTORY_MAP = RecordEnricherRegistry.getRecordEnricherFactoryMap(); + final List ENRICHERS = new ArrayList<>(); + final Set COLUMNS_TO_EXTRACT = new HashSet<>(); + static final String NONE_TYPE = ""; + + static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, boolean config) { + if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { + throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); + } + RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) + .validateEnrichmentConfig(enrichmentConfig.getProperties(), config); + } + + static RecordTransformer createRecordEnricher(EnrichmentConfig enrichmentConfig) + throws IOException { + if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { + throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); + } + return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) + .createEnricher(enrichmentConfig.getProperties()); + } + + static RecordTransformer getPassThroughPipeline() { + return new RecordTransformer() { + }; + } + + static RecordTransformer fromIngestionConfig(IngestionConfig ingestionConfig) { + RecordTransformer pipeline = new RecordTransformer() { + }; + if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) { + return pipeline; + } + List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); + for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { + try { + RecordTransformer enricher = RecordTransformer.createRecordEnricher(enrichmentConfig); + pipeline.add(enricher); + } catch (IOException e) { + throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e); + } + } + return pipeline; + } + + static RecordTransformer fromTableConfig(TableConfig tableConfig) { + return fromIngestionConfig(tableConfig.getIngestionConfig()); + } /** * Returns {@code true} if the transformer is no-op (can be skipped), {@code false} otherwise. @@ -42,5 +106,46 @@ default boolean isNoOp() { * @return Transformed record, or {@code null} if the record does not follow certain rules. */ @Nullable - GenericRow transform(GenericRow record); + default GenericRow transform(GenericRow record) { + return null; + } + + /** + * Returns the list of input columns required for enriching the record. + * This is used to make sure the required input fields are extracted. + */ + default List getInputColumns() { + return new ArrayList<>(); + } + + default String getEnricherType() { + return NONE_TYPE; + } + + default RecordTransformer createEnricher(JsonNode enricherProps) + throws IOException { + return null; + } + + default void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) { + } + + default boolean isGroovyDisabled() { + return GROOVY_DISABLED; + } + + default Set getColumnsToExtract() { + return COLUMNS_TO_EXTRACT; + } + + default void add(RecordTransformer enricher) { + ENRICHERS.add(enricher); + COLUMNS_TO_EXTRACT.addAll(enricher.getInputColumns()); + } + + default void run(GenericRow record) { + for (RecordTransformer enricher : ENRICHERS) { + enricher.transform(record); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java index 219a207d94e9..c6efdd0d3a80 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java @@ -19,13 +19,13 @@ package org.apache.pinot.segment.local.segment.creator; import org.apache.pinot.common.Utils; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.impl.stats.SegmentPreIndexStatsCollectorImpl; import org.apache.pinot.segment.spi.creator.SegmentCreationDataSource; import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,14 +39,14 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class); private final RecordReader _recordReader; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformer _recordEnricherPipeline; private TransformPipeline _transformPipeline; public RecordReaderSegmentCreationDataSource(RecordReader recordReader) { _recordReader = recordReader; } - public void setRecordEnricherPipeline(RecordEnricherPipeline recordEnricherPipeline) { + public void setRecordEnricherPipeline(RecordTransformer recordEnricherPipeline) { _recordEnricherPipeline = recordEnricherPipeline; } @@ -57,8 +57,8 @@ public void setTransformPipeline(TransformPipeline transformPipeline) { @Override public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) { try { - RecordEnricherPipeline recordEnricherPipeline = _recordEnricherPipeline != null ? _recordEnricherPipeline - : RecordEnricherPipeline.fromTableConfig(statsCollectorConfig.getTableConfig()); + RecordTransformer recordEnricherPipeline = _recordEnricherPipeline != null ? _recordEnricherPipeline + : RecordTransformer.fromTableConfig(statsCollectorConfig.getTableConfig()); TransformPipeline transformPipeline = _transformPipeline != null ? _transformPipeline : new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index ecfea58ca788..0c228dba7f19 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -80,7 +80,6 @@ import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.ReadMode; import org.slf4j.Logger; @@ -102,7 +101,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive private SegmentCreator _indexCreator; private SegmentIndexCreationInfo _segmentIndexCreationInfo; private Schema _dataSchema; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformer _recordEnricherPipeline; private TransformPipeline _transformPipeline; private IngestionSchemaValidator _ingestionSchemaValidator; private int _totalDocs = 0; @@ -113,6 +112,29 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive private long _totalStatsCollectorTime = 0; private boolean _continueOnError; + public static void persistCreationMeta(File indexDir, long crc, long creationTime) + throws IOException { + File segmentDir = SegmentDirectoryPaths.findSegmentDirectory(indexDir); + File creationMetaFile = new File(segmentDir, V1Constants.SEGMENT_CREATION_META); + try (DataOutputStream output = new DataOutputStream(new FileOutputStream(creationMetaFile))) { + output.writeLong(crc); + output.writeLong(creationTime); + } + } + + /** + * Uses config and column properties like storedType and length of elements to determine if + * varLengthDictionary should be used for a column + * @deprecated Use + * {@link DictionaryIndexType#shouldUseVarLengthDictionary(String, Set, DataType, ColumnStatistics)} instead. + */ + @Deprecated + public static boolean shouldUseVarLengthDictionary(String columnName, Set varLengthDictColumns, + DataType columnStoredType, ColumnStatistics columnProfile) { + return DictionaryIndexType.shouldUseVarLengthDictionary(columnName, varLengthDictColumns, columnStoredType, + columnProfile); + } + @Override public void init(SegmentGeneratorConfig config) throws Exception { @@ -160,7 +182,7 @@ public RecordReader getRecordReader() { public void init(SegmentGeneratorConfig config, RecordReader recordReader) throws Exception { SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader); - init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), + init(config, dataSource, RecordTransformer.fromTableConfig(config.getTableConfig()), new TransformPipeline(config.getTableConfig(), config.getSchema())); } @@ -168,13 +190,12 @@ public void init(SegmentGeneratorConfig config, RecordReader recordReader) public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer) throws Exception { - init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), + init(config, dataSource, RecordTransformer.fromTableConfig(config.getTableConfig()), new TransformPipeline(recordTransformer, complexTypeTransformer)); } public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, - RecordEnricherPipeline enricherPipeline, - TransformPipeline transformPipeline) + RecordTransformer enricherPipeline, TransformPipeline transformPipeline) throws Exception { _config = config; _recordReader = dataSource.getRecordReader(); @@ -520,16 +541,6 @@ public ColumnStatistics getColumnStatisticsCollector(final String columnName) return _segmentStats.getColumnProfileFor(columnName); } - public static void persistCreationMeta(File indexDir, long crc, long creationTime) - throws IOException { - File segmentDir = SegmentDirectoryPaths.findSegmentDirectory(indexDir); - File creationMetaFile = new File(segmentDir, V1Constants.SEGMENT_CREATION_META); - try (DataOutputStream output = new DataOutputStream(new FileOutputStream(creationMetaFile))) { - output.writeLong(crc); - output.writeLong(creationTime); - } - } - /** * Complete the stats gathering process and store the stats information in indexCreationInfoMap. */ @@ -562,19 +573,6 @@ void buildIndexCreationInfo() _segmentIndexCreationInfo.setTotalDocs(_totalDocs); } - /** - * Uses config and column properties like storedType and length of elements to determine if - * varLengthDictionary should be used for a column - * @deprecated Use - * {@link DictionaryIndexType#shouldUseVarLengthDictionary(String, Set, DataType, ColumnStatistics)} instead. - */ - @Deprecated - public static boolean shouldUseVarLengthDictionary(String columnName, Set varLengthDictColumns, - DataType columnStoredType, ColumnStatistics columnProfile) { - return DictionaryIndexType.shouldUseVarLengthDictionary(columnName, varLengthDictColumns, columnStoredType, - columnProfile); - } - /** * Returns the name of the segment associated with this index creation driver. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index 556b0258558f..fb2a28c7094b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -35,6 +35,7 @@ import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; @@ -65,7 +66,6 @@ import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; @@ -109,10 +109,9 @@ public static SegmentGeneratorConfig generateSegmentGeneratorConfig(TableConfig BatchIngestionConfig batchIngestionConfig) throws ClassNotFoundException, IOException { Preconditions.checkState(batchIngestionConfig != null && batchIngestionConfig.getBatchConfigMaps() != null - && batchIngestionConfig.getBatchConfigMaps().size() == 1, + && batchIngestionConfig.getBatchConfigMaps().size() == 1, "Must provide batchIngestionConfig and contains exactly 1 batchConfigMap for table: %s, " - + "for generating SegmentGeneratorConfig", - tableConfig.getTableName()); + + "for generating SegmentGeneratorConfig", tableConfig.getTableName()); // apply config override provided by user. BatchConfig batchConfig = @@ -125,8 +124,8 @@ public static SegmentGeneratorConfig generateSegmentGeneratorConfig(TableConfig segmentGeneratorConfig.setOutDir(batchConfig.getOutputDirURI()); // Reader configs - segmentGeneratorConfig - .setRecordReaderPath(RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString())); + segmentGeneratorConfig.setRecordReaderPath( + RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString())); Map recordReaderProps = batchConfig.getRecordReaderProps(); segmentGeneratorConfig.setReaderConfig(RecordReaderFactory.getRecordReaderConfig(batchConfig.getInputFormat(), IngestionConfigUtils.getRecordReaderProps(recordReaderProps))); @@ -170,8 +169,8 @@ private static SegmentNameGenerator getSegmentNameGenerator(BatchConfig batchCon return new SimpleSegmentNameGenerator(rawTableName, batchConfig.getSegmentNamePostfix(), batchConfig.isAppendUUIDToSegmentName(), batchConfig.isExcludeTimeInSegmentName()); default: - throw new IllegalStateException(String - .format("Unsupported segmentNameGeneratorType: %s for table: %s", segmentNameGeneratorType, + throw new IllegalStateException( + String.format("Unsupported segmentNameGeneratorType: %s for table: %s", segmentNameGeneratorType, tableConfig.getTableName())); } } @@ -199,8 +198,8 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf @Nullable AuthProvider authProvider) throws Exception { - SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, - authProvider); + SegmentGenerationJobSpec segmentUploadSpec = + generateSegmentUploadSpec(tableNameWithType, batchConfig, authProvider); List segmentTarURIStrs = segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList()); String pushMode = batchConfig.getPushMode(); @@ -209,8 +208,8 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf try { SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, segmentTarURIStrs); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: TAR, segment tars: [%s]", + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: TAR, segment tars: [%s]", segmentTarURIStrs), e); } break; @@ -229,8 +228,9 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf } SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: URI, segment URIs: [%s]", segmentUris), e); + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: URI, segment URIs: [%s]", + segmentUris), e); } break; case METADATA: @@ -240,12 +240,13 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); } PinotFS outputFileFS = getOutputPinotFS(batchConfig, outputSegmentDirURI); - Map segmentUriToTarPathMap = SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, - segmentUploadSpec.getPushJobSpec(), segmentTarURIStrs.toArray(new String[0])); + Map segmentUriToTarPathMap = + SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, segmentUploadSpec.getPushJobSpec(), + segmentTarURIStrs.toArray(new String[0])); SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, outputFileFS, segmentUriToTarPathMap); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: METADATA, segment URIs: [%s]", + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: METADATA, segment URIs: [%s]", segmentTarURIStrs), e); } break; @@ -382,7 +383,7 @@ private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig i } } - fields.addAll(RecordEnricherPipeline.fromIngestionConfig(ingestionConfig).getColumnsToExtract()); + fields.addAll(RecordTransformer.fromIngestionConfig(ingestionConfig).getColumnsToExtract()); List transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { for (TransformConfig transformConfig : transformConfigs) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 6729f1b027c6..a845e24105dc 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -45,6 +45,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformerV2; import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; @@ -85,8 +86,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.ingestion.batch.BatchConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherRegistry; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; @@ -106,9 +105,11 @@ * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done */ public final class TableConfigUtils { - private TableConfigUtils() { - } - + public final static EnumSet AVAILABLE_CORE_VALUE_AGGREGATORS = + EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTTHETASKETCH, + DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH, + SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS, + DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, DISTINCTCOUNTRAWULL); private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigUtils.class); private static final String SCHEDULE_KEY = "schedule"; private static final String STAR_TREE_CONFIG_NAME = "StarTreeIndex Config"; @@ -127,6 +128,9 @@ private TableConfigUtils() { ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE); + private TableConfigUtils() { + } + /** * @see TableConfigUtils#validate(TableConfig, Schema, String, boolean) */ @@ -507,12 +511,11 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); if (enrichmentConfigs != null) { for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { - RecordEnricherRegistry.validateEnrichmentConfig(enrichmentConfig, - new RecordEnricherValidationConfig(disableGroovy)); + RecordTransformer.validateEnrichmentConfig(enrichmentConfig, + disableGroovy); } } - // Transform configs List transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { @@ -626,12 +629,6 @@ static void validateStreamConfig(StreamConfig streamConfig) { } } - public final static EnumSet AVAILABLE_CORE_VALUE_AGGREGATORS = - EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTTHETASKETCH, - DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH, - SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS, - DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, DISTINCTCOUNTRAWULL); - @VisibleForTesting static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); @@ -764,9 +761,9 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) tableConfig.getRoutingConfig() != null && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()), "Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing"); Preconditions.checkState(tableConfig.getTenantConfig().getTagOverrideConfig() == null || ( - tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() == null - && tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() - == null), "Invalid tenant tag override used for Upsert/Dedup table"); + tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() == null + && tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() == null), + "Invalid tenant tag override used for Upsert/Dedup table"); // specifically for upsert UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); @@ -793,15 +790,13 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("The deleteRecordColumn - %s must be a single-valued column", deleteRecordColumn)); DataType dataType = fieldSpec.getDataType(); - Preconditions.checkState( - dataType == DataType.BOOLEAN || dataType == DataType.STRING || dataType.isNumeric(), + Preconditions.checkState(dataType == DataType.BOOLEAN || dataType == DataType.STRING || dataType.isNumeric(), String.format("The deleteRecordColumn - %s must be of type: String / Boolean / Numeric", deleteRecordColumn)); } String outOfOrderRecordColumn = upsertConfig.getOutOfOrderRecordColumn(); - Preconditions.checkState( - outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(), + Preconditions.checkState(outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(), "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table"); if (outOfOrderRecordColumn != null) { @@ -837,8 +832,8 @@ static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) { String comparisonColumn = comparisonColumns.get(0); DataType comparisonColumnDataType = schema.getFieldSpecFor(comparisonColumn).getDataType(); Preconditions.checkState(comparisonColumnDataType.isNumeric(), - "MetadataTTL / DeletedKeysTTL must have comparison column: %s in numeric type, found: %s", - comparisonColumn, comparisonColumnDataType); + "MetadataTTL / DeletedKeysTTL must have comparison column: %s in numeric type, found: %s", comparisonColumn, + comparisonColumnDataType); } if (upsertConfig.getMetadataTTL() > 0) { @@ -869,14 +864,11 @@ static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) { tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString()); if (instanceAssignmentConfig.getPartitionSelector() == InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR) { - Preconditions.checkState( - tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), + Preconditions.checkState(tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap needed for %s, as " - + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used", - instancePartitionsType)); + + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used", instancePartitionsType)); } else { - Preconditions.checkState( - !tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), + Preconditions.checkState(!tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s", instancePartitionsType)); } @@ -1332,9 +1324,8 @@ private static void validateForwardIndexDisabledIndexCompatibility(String column } Preconditions.checkState(!indexingConfig.isOptimizeDictionaryForMetrics() && !indexingConfig.isOptimizeDictionary(), - String.format( - "Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" - + " not supported with forward index for column: %s, disabled", columnName)); + String.format("Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" + + " not supported with forward index for column: %s, disabled", columnName)); boolean hasDictionary = fieldConfig.getEncodingType() == EncodingType.DICTIONARY; boolean hasInvertedIndex = @@ -1458,11 +1449,6 @@ public static void verifyHybridTableConfigs(String rawTableName, TableConfig off } } - // enum of all the skip-able validation types. - public enum ValidationType { - ALL, TASK, UPSERT - } - /** * needsEmptySegmentPruner checks if EmptySegmentPruner is needed for a TableConfig. * @param tableConfig Input table config. @@ -1539,4 +1525,9 @@ public static TableConfig createTableConfigFromOldFormat(TableConfig tableConfig } return clone; } + + // enum of all the skip-able validation types. + public enum ValidationType { + ALL, TASK, UPSERT + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java deleted file mode 100644 index 8e48ed71023a..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.recordenricher; - -import java.util.List; -import org.apache.pinot.spi.data.readers.GenericRow; - - -/** - * Interface for enriching records. - * If a column with the same name as the input column already exists in the record, it will be overwritten. - */ -public interface RecordEnricher { - /** - * Returns the list of input columns required for enriching the record. - * This is used to make sure the required input fields are extracted. - */ - List getInputColumns(); - - /** - * Enriches the given record, by adding new columns to the same record. - */ - void enrich(GenericRow record); -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java deleted file mode 100644 index adac697bdb6d..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.recordenricher; - -public interface RecordEnricherConfig { - void parse(); -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java deleted file mode 100644 index 4b55d0426013..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.recordenricher; - -import com.fasterxml.jackson.databind.JsonNode; -import java.io.IOException; - - -public interface RecordEnricherFactory { - String getEnricherType(); - RecordEnricher createEnricher(JsonNode enricherProps) throws IOException; - void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig); -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java deleted file mode 100644 index 5a50d685cafe..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.recordenricher; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; -import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; -import org.apache.pinot.spi.data.readers.GenericRow; - - -public class RecordEnricherPipeline { - private final List _enrichers = new ArrayList<>(); - private final Set _columnsToExtract = new HashSet<>(); - - public static RecordEnricherPipeline getPassThroughPipeline() { - return new RecordEnricherPipeline(); - } - - public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingestionConfig) { - RecordEnricherPipeline pipeline = new RecordEnricherPipeline(); - if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) { - return pipeline; - } - List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); - for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { - try { - RecordEnricher enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig); - pipeline.add(enricher); - } catch (IOException e) { - throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e); - } - } - return pipeline; - } - - public static RecordEnricherPipeline fromTableConfig(TableConfig tableConfig) { - return fromIngestionConfig(tableConfig.getIngestionConfig()); - } - - public Set getColumnsToExtract() { - return _columnsToExtract; - } - - public void add(RecordEnricher enricher) { - _enrichers.add(enricher); - _columnsToExtract.addAll(enricher.getInputColumns()); - } - - public void run(GenericRow record) { - for (RecordEnricher enricher : _enrichers) { - enricher.enrich(record); - } - } -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java deleted file mode 100644 index 86fb74155e59..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.pinot.spi.recordenricher; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * Interface for cluster constrains, which can be used to validate the record enricher configs - */ -public class RecordEnricherValidationConfig { - private final boolean _groovyDisabled; - - public RecordEnricherValidationConfig(boolean groovyDisabled) { - _groovyDisabled = groovyDisabled; - } - - public boolean isGroovyDisabled() { - return _groovyDisabled; - } -}