From d066872e0fbfd69ab55be75aa4aaf9fc7b343c83 Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Mon, 6 May 2024 00:41:47 -0400 Subject: [PATCH 01/14] #13049 Refactor RecordTransformer to include RecordEnricher. Removed RecordEnricher as it serves similar purpose as RecordTransformer. --- .../flink/sink/FlinkSegmentWriter.java | 2 +- .../realtime/RealtimeSegmentDataManager.java | 2 +- .../framework/SegmentProcessorFramework.java | 2 +- .../processing/mapper/SegmentMapper.java | 2 +- .../enricher}/RecordEnricherConfig.java | 2 +- .../enricher}/RecordEnricherFactory.java | 5 ++- .../enricher}/RecordEnricherPipeline.java | 13 +++--- .../enricher}/RecordEnricherRegistry.java | 5 ++- .../RecordEnricherValidationConfig.java | 2 +- .../enricher/clp/CLPEncodingEnricher.java | 7 ++-- .../clp/CLPEncodingEnricherFactory.java | 8 ++-- .../function/CustomFunctionEnricher.java | 7 ++-- .../CustomFunctionEnricherFactory.java | 8 ++-- .../converter/RealtimeSegmentConverter.java | 2 +- .../recordtransformer/RecordTransformer.java | 7 ++++ ...RecordReaderSegmentCreationDataSource.java | 2 +- .../impl/SegmentIndexCreationDriverImpl.java | 2 +- .../segment/local/utils/IngestionUtils.java | 2 +- .../segment/local/utils/TableConfigUtils.java | 4 +- .../spi/recordenricher/RecordEnricher.java | 40 ------------------- 20 files changed, 48 insertions(+), 76 deletions(-) rename {pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher => pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher}/RecordEnricherConfig.java (94%) rename {pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher => pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher}/RecordEnricherFactory.java (83%) rename {pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher => pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher}/RecordEnricherPipeline.java (85%) rename {pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher => pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher}/RecordEnricherRegistry.java (92%) rename {pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher => pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher}/RecordEnricherValidationConfig.java (96%) delete mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java diff --git a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java index e9bd29701b0b..cc7961e2502e 100644 --- a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java +++ b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java @@ -53,7 +53,7 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 3290c5e4f3c0..c02e59b89cc1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -76,7 +76,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.metrics.PinotMeter; import org.apache.pinot.spi.plugin.PluginManager; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index def4f75b290e..c2c6a44fbbe8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -43,7 +43,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 77b651d943aa..8833f8bbd9ef 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -48,7 +48,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherConfig.java similarity index 94% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java rename to pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherConfig.java index adac697bdb6d..fd4cbe5f9e78 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherConfig.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.plugin.record.enricher; public interface RecordEnricherConfig { void parse(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherFactory.java similarity index 83% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java rename to pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherFactory.java index 4b55d0426013..2bc6cd08c417 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherFactory.java @@ -16,14 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.plugin.record.enricher; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; public interface RecordEnricherFactory { String getEnricherType(); - RecordEnricher createEnricher(JsonNode enricherProps) throws IOException; + RecordTransformer createEnricher(JsonNode enricherProps) throws IOException; void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherPipeline.java similarity index 85% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java rename to pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherPipeline.java index 5a50d685cafe..96c769e28f8b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherPipeline.java @@ -16,13 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.plugin.record.enricher; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; @@ -30,7 +31,7 @@ public class RecordEnricherPipeline { - private final List _enrichers = new ArrayList<>(); + private final List _enrichers = new ArrayList<>(); private final Set _columnsToExtract = new HashSet<>(); public static RecordEnricherPipeline getPassThroughPipeline() { @@ -45,7 +46,7 @@ public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingesti List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { try { - RecordEnricher enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig); + RecordTransformer enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig); pipeline.add(enricher); } catch (IOException e) { throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e); @@ -62,14 +63,14 @@ public Set getColumnsToExtract() { return _columnsToExtract; } - public void add(RecordEnricher enricher) { + public void add(RecordTransformer enricher) { _enrichers.add(enricher); _columnsToExtract.addAll(enricher.getInputColumns()); } public void run(GenericRow record) { - for (RecordEnricher enricher : _enrichers) { - enricher.enrich(record); + for (RecordTransformer enricher : _enrichers) { + enricher.transform(record); } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java similarity index 92% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java rename to pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java index 59c1e6fb348d..0b749e4bf5af 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.plugin.record.enricher; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,7 @@ public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, .validateEnrichmentConfig(enrichmentConfig.getProperties(), config); } - public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig) + public static RecordTransformer createRecordEnricher(EnrichmentConfig enrichmentConfig) throws IOException { if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherValidationConfig.java similarity index 96% rename from pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java rename to pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherValidationConfig.java index 86fb74155e59..890a10dff54b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherValidationConfig.java @@ -1,4 +1,4 @@ -package org.apache.pinot.spi.recordenricher; +package org.apache.pinot.plugin.record.enricher; /** * Licensed to the Apache Software Foundation (ASF) under one diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java index 790015d8af31..b42a79f6a4ec 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java @@ -24,8 +24,8 @@ import com.yscope.clp.compressorfrontend.MessageEncoder; import java.io.IOException; import java.util.List; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.recordenricher.RecordEnricher; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.sql.parsers.rewriter.ClpRewriter; import org.slf4j.Logger; @@ -39,7 +39,7 @@ * 2. 'x_dictVars' - The dictionary variables of the encoded message * 3. 'x_encodedVars' - The encoded variables of the encoded message */ -public class CLPEncodingEnricher implements RecordEnricher { +public class CLPEncodingEnricher implements RecordTransformer { private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class); private final ClpEnricherConfig _config; private final EncodedMessage _clpEncodedMessage; @@ -58,7 +58,7 @@ public List getInputColumns() { } @Override - public void enrich(GenericRow record) { + public GenericRow transform(GenericRow record) { try { for (String field : _config.getFields()) { Object value = record.getValue(field); @@ -69,6 +69,7 @@ public void enrich(GenericRow record) { } catch (Exception e) { LOGGER.error("Failed to enrich record: {}", record); } + return record; } private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java index cacc4fdbc9b4..64e0c5cccb25 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java @@ -21,9 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.auto.service.AutoService; import java.io.IOException; -import org.apache.pinot.spi.recordenricher.RecordEnricher; -import org.apache.pinot.spi.recordenricher.RecordEnricherFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; +import org.apache.pinot.plugin.record.enricher.RecordEnricherFactory; +import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.JsonUtils; @AutoService(RecordEnricherFactory.class) @@ -35,7 +35,7 @@ public String getEnricherType() { } @Override - public RecordEnricher createEnricher(JsonNode enricherProps) + public RecordTransformer createEnricher(JsonNode enricherProps) throws IOException { return new CLPEncodingEnricher(enricherProps); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java index 92cf565220d9..e423a356fcb9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java @@ -27,15 +27,15 @@ import java.util.Map; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.recordenricher.RecordEnricher; import org.apache.pinot.spi.utils.JsonUtils; /** * Enriches the record with custom functions. */ -public class CustomFunctionEnricher implements RecordEnricher { +public class CustomFunctionEnricher implements RecordTransformer { private final Map _fieldToFunctionEvaluator; private final List _fieldsToExtract; @@ -58,9 +58,10 @@ public List getInputColumns() { } @Override - public void enrich(GenericRow record) { + public GenericRow transform(GenericRow record) { _fieldToFunctionEvaluator.forEach((field, evaluator) -> { record.putValue(field, evaluator.evaluate(record)); }); + return record; } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java index f77308190359..cbedbfa2e500 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java @@ -22,9 +22,9 @@ import com.google.auto.service.AutoService; import java.io.IOException; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricher; -import org.apache.pinot.spi.recordenricher.RecordEnricherFactory; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; +import org.apache.pinot.plugin.record.enricher.RecordEnricherFactory; +import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.JsonUtils; @AutoService(RecordEnricherFactory.class) @@ -36,7 +36,7 @@ public String getEnricherType() { } @Override - public RecordEnricher createEnricher(JsonNode enricherProps) + public RecordTransformer createEnricher(JsonNode enricherProps) throws IOException { return new CustomFunctionEnricher(enricherProps); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index 0bf8fe571f18..6641836e284d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -42,7 +42,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; public class RealtimeSegmentConverter { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 72065132ae49..f09af7632011 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.recordtransformer; import java.io.Serializable; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; @@ -43,4 +44,10 @@ default boolean isNoOp() { */ @Nullable GenericRow transform(GenericRow record); + + /** + * Returns the list of input columns required for enriching the record. + * This is used to make sure the required input fields are extracted. + */ + List getInputColumns(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java index 219a207d94e9..33dddbee982b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java @@ -25,7 +25,7 @@ import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index ecfea58ca788..880b663fdb18 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -80,7 +80,7 @@ import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.ReadMode; import org.slf4j.Logger; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index 556b0258558f..4512c0b04e40 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -65,7 +65,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; -import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 6729f1b027c6..1c79f0b1debc 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -85,8 +85,8 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.ingestion.batch.BatchConfig; -import org.apache.pinot.spi.recordenricher.RecordEnricherRegistry; -import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig; +import org.apache.pinot.plugin.record.enricher.RecordEnricherRegistry; +import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java deleted file mode 100644 index 8e48ed71023a..000000000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.recordenricher; - -import java.util.List; -import org.apache.pinot.spi.data.readers.GenericRow; - - -/** - * Interface for enriching records. - * If a column with the same name as the input column already exists in the record, it will be overwritten. - */ -public interface RecordEnricher { - /** - * Returns the list of input columns required for enriching the record. - * This is used to make sure the required input fields are extracted. - */ - List getInputColumns(); - - /** - * Enriches the given record, by adding new columns to the same record. - */ - void enrich(GenericRow record); -} From ee427766bbbfb4cd9e348630c83a06980544828b Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Mon, 6 May 2024 00:47:57 -0400 Subject: [PATCH 02/14] tranform RecordTransformer method --- .../segment/local/recordtransformer/RecordTransformer.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index f09af7632011..3119f0394762 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.recordtransformer; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; @@ -49,5 +50,7 @@ default boolean isNoOp() { * Returns the list of input columns required for enriching the record. * This is used to make sure the required input fields are extracted. */ - List getInputColumns(); + default List getInputColumns() { + return new ArrayList<>(); + } } From d4f3cd608fee9f37b2ff2adb68740a25e4eb8064 Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Mon, 6 May 2024 01:04:52 -0400 Subject: [PATCH 03/14] Formatting issues in build --- .../CustomFunctionEnricherFactory.java | 2 + .../converter/RealtimeSegmentConverter.java | 32 +++++----- .../impl/SegmentIndexCreationDriverImpl.java | 49 ++++++++-------- .../segment/local/utils/IngestionUtils.java | 35 +++++------ .../segment/local/utils/TableConfigUtils.java | 58 ++++++++----------- 5 files changed, 85 insertions(+), 91 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java index cbedbfa2e500..10dded790242 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java @@ -27,9 +27,11 @@ import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.JsonUtils; + @AutoService(RecordEnricherFactory.class) public class CustomFunctionEnricherFactory implements RecordEnricherFactory { private static final String TYPE = "generateColumn"; + @Override public String getEnricherType() { return TYPE; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index 6641836e284d..36c7379bd146 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -74,13 +74,27 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, SegmentZKPro _nullHandlingEnabled = nullHandlingEnabled; if (_tableConfig.getIngestionConfig() != null && _tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) { - _enableColumnMajor = _tableConfig.getIngestionConfig() - .getStreamIngestionConfig().getColumnMajorSegmentBuilderEnabled(); + _enableColumnMajor = + _tableConfig.getIngestionConfig().getStreamIngestionConfig().getColumnMajorSegmentBuilderEnabled(); } else { _enableColumnMajor = _tableConfig.getIndexingConfig().isColumnMajorSegmentBuilderEnabled(); } } + /** + * Returns a new schema containing only physical columns + */ + @VisibleForTesting + public static Schema getUpdatedSchema(Schema original) { + Schema newSchema = new Schema(); + for (FieldSpec fieldSpec : original.getAllFieldSpecs()) { + if (!fieldSpec.isVirtualColumn()) { + newSchema.addField(fieldSpec); + } + } + return newSchema; + } + public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverMetrics) throws Exception { SegmentGeneratorConfig genConfig = new SegmentGeneratorConfig(_tableConfig, _dataSchema); @@ -159,20 +173,6 @@ private void addIndexOrDefault(SegmentGeneratorConfig ge } } - /** - * Returns a new schema containing only physical columns - */ - @VisibleForTesting - public static Schema getUpdatedSchema(Schema original) { - Schema newSchema = new Schema(); - for (FieldSpec fieldSpec : original.getAllFieldSpecs()) { - if (!fieldSpec.isVirtualColumn()) { - newSchema.addField(fieldSpec); - } - } - return newSchema; - } - public boolean isColumnMajorEnabled() { return _enableColumnMajor; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index 880b663fdb18..95d2c127f78e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -113,6 +113,29 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive private long _totalStatsCollectorTime = 0; private boolean _continueOnError; + public static void persistCreationMeta(File indexDir, long crc, long creationTime) + throws IOException { + File segmentDir = SegmentDirectoryPaths.findSegmentDirectory(indexDir); + File creationMetaFile = new File(segmentDir, V1Constants.SEGMENT_CREATION_META); + try (DataOutputStream output = new DataOutputStream(new FileOutputStream(creationMetaFile))) { + output.writeLong(crc); + output.writeLong(creationTime); + } + } + + /** + * Uses config and column properties like storedType and length of elements to determine if + * varLengthDictionary should be used for a column + * @deprecated Use + * {@link DictionaryIndexType#shouldUseVarLengthDictionary(String, Set, DataType, ColumnStatistics)} instead. + */ + @Deprecated + public static boolean shouldUseVarLengthDictionary(String columnName, Set varLengthDictColumns, + DataType columnStoredType, ColumnStatistics columnProfile) { + return DictionaryIndexType.shouldUseVarLengthDictionary(columnName, varLengthDictColumns, columnStoredType, + columnProfile); + } + @Override public void init(SegmentGeneratorConfig config) throws Exception { @@ -173,8 +196,7 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo } public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, - RecordEnricherPipeline enricherPipeline, - TransformPipeline transformPipeline) + RecordEnricherPipeline enricherPipeline, TransformPipeline transformPipeline) throws Exception { _config = config; _recordReader = dataSource.getRecordReader(); @@ -520,16 +542,6 @@ public ColumnStatistics getColumnStatisticsCollector(final String columnName) return _segmentStats.getColumnProfileFor(columnName); } - public static void persistCreationMeta(File indexDir, long crc, long creationTime) - throws IOException { - File segmentDir = SegmentDirectoryPaths.findSegmentDirectory(indexDir); - File creationMetaFile = new File(segmentDir, V1Constants.SEGMENT_CREATION_META); - try (DataOutputStream output = new DataOutputStream(new FileOutputStream(creationMetaFile))) { - output.writeLong(crc); - output.writeLong(creationTime); - } - } - /** * Complete the stats gathering process and store the stats information in indexCreationInfoMap. */ @@ -562,19 +574,6 @@ void buildIndexCreationInfo() _segmentIndexCreationInfo.setTotalDocs(_totalDocs); } - /** - * Uses config and column properties like storedType and length of elements to determine if - * varLengthDictionary should be used for a column - * @deprecated Use - * {@link DictionaryIndexType#shouldUseVarLengthDictionary(String, Set, DataType, ColumnStatistics)} instead. - */ - @Deprecated - public static boolean shouldUseVarLengthDictionary(String columnName, Set varLengthDictColumns, - DataType columnStoredType, ColumnStatistics columnProfile) { - return DictionaryIndexType.shouldUseVarLengthDictionary(columnName, varLengthDictColumns, columnStoredType, - columnProfile); - } - /** * Returns the name of the segment associated with this index creation driver. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index 4512c0b04e40..6d791b652e8f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -109,10 +109,9 @@ public static SegmentGeneratorConfig generateSegmentGeneratorConfig(TableConfig BatchIngestionConfig batchIngestionConfig) throws ClassNotFoundException, IOException { Preconditions.checkState(batchIngestionConfig != null && batchIngestionConfig.getBatchConfigMaps() != null - && batchIngestionConfig.getBatchConfigMaps().size() == 1, + && batchIngestionConfig.getBatchConfigMaps().size() == 1, "Must provide batchIngestionConfig and contains exactly 1 batchConfigMap for table: %s, " - + "for generating SegmentGeneratorConfig", - tableConfig.getTableName()); + + "for generating SegmentGeneratorConfig", tableConfig.getTableName()); // apply config override provided by user. BatchConfig batchConfig = @@ -125,8 +124,8 @@ public static SegmentGeneratorConfig generateSegmentGeneratorConfig(TableConfig segmentGeneratorConfig.setOutDir(batchConfig.getOutputDirURI()); // Reader configs - segmentGeneratorConfig - .setRecordReaderPath(RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString())); + segmentGeneratorConfig.setRecordReaderPath( + RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString())); Map recordReaderProps = batchConfig.getRecordReaderProps(); segmentGeneratorConfig.setReaderConfig(RecordReaderFactory.getRecordReaderConfig(batchConfig.getInputFormat(), IngestionConfigUtils.getRecordReaderProps(recordReaderProps))); @@ -170,8 +169,8 @@ private static SegmentNameGenerator getSegmentNameGenerator(BatchConfig batchCon return new SimpleSegmentNameGenerator(rawTableName, batchConfig.getSegmentNamePostfix(), batchConfig.isAppendUUIDToSegmentName(), batchConfig.isExcludeTimeInSegmentName()); default: - throw new IllegalStateException(String - .format("Unsupported segmentNameGeneratorType: %s for table: %s", segmentNameGeneratorType, + throw new IllegalStateException( + String.format("Unsupported segmentNameGeneratorType: %s for table: %s", segmentNameGeneratorType, tableConfig.getTableName())); } } @@ -199,8 +198,8 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf @Nullable AuthProvider authProvider) throws Exception { - SegmentGenerationJobSpec segmentUploadSpec = generateSegmentUploadSpec(tableNameWithType, batchConfig, - authProvider); + SegmentGenerationJobSpec segmentUploadSpec = + generateSegmentUploadSpec(tableNameWithType, batchConfig, authProvider); List segmentTarURIStrs = segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList()); String pushMode = batchConfig.getPushMode(); @@ -209,8 +208,8 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf try { SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS, segmentTarURIStrs); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: TAR, segment tars: [%s]", + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: TAR, segment tars: [%s]", segmentTarURIStrs), e); } break; @@ -229,8 +228,9 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf } SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: URI, segment URIs: [%s]", segmentUris), e); + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: URI, segment URIs: [%s]", + segmentUris), e); } break; case METADATA: @@ -240,12 +240,13 @@ public static void uploadSegment(String tableNameWithType, BatchConfig batchConf outputSegmentDirURI = URI.create(batchConfig.getOutputSegmentDirURI()); } PinotFS outputFileFS = getOutputPinotFS(batchConfig, outputSegmentDirURI); - Map segmentUriToTarPathMap = SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, - segmentUploadSpec.getPushJobSpec(), segmentTarURIStrs.toArray(new String[0])); + Map segmentUriToTarPathMap = + SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, segmentUploadSpec.getPushJobSpec(), + segmentTarURIStrs.toArray(new String[0])); SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec, outputFileFS, segmentUriToTarPathMap); } catch (RetriableOperationException | AttemptsExceededException e) { - throw new RuntimeException(String - .format("Caught exception while uploading segments. Push mode: METADATA, segment URIs: [%s]", + throw new RuntimeException( + String.format("Caught exception while uploading segments. Push mode: METADATA, segment URIs: [%s]", segmentTarURIStrs), e); } break; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 1c79f0b1debc..0618842b61d2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -106,9 +106,11 @@ * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done */ public final class TableConfigUtils { - private TableConfigUtils() { - } - + public final static EnumSet AVAILABLE_CORE_VALUE_AGGREGATORS = + EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTTHETASKETCH, + DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH, + SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS, + DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, DISTINCTCOUNTRAWULL); private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigUtils.class); private static final String SCHEDULE_KEY = "schedule"; private static final String STAR_TREE_CONFIG_NAME = "StarTreeIndex Config"; @@ -127,6 +129,9 @@ private TableConfigUtils() { ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE); + private TableConfigUtils() { + } + /** * @see TableConfigUtils#validate(TableConfig, Schema, String, boolean) */ @@ -512,7 +517,6 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - // Transform configs List transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { @@ -626,12 +630,6 @@ static void validateStreamConfig(StreamConfig streamConfig) { } } - public final static EnumSet AVAILABLE_CORE_VALUE_AGGREGATORS = - EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTTHETASKETCH, - DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH, - SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS, - DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, DISTINCTCOUNTRAWULL); - @VisibleForTesting static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); @@ -764,9 +762,9 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) tableConfig.getRoutingConfig() != null && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()), "Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing"); Preconditions.checkState(tableConfig.getTenantConfig().getTagOverrideConfig() == null || ( - tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() == null - && tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() - == null), "Invalid tenant tag override used for Upsert/Dedup table"); + tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() == null + && tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() == null), + "Invalid tenant tag override used for Upsert/Dedup table"); // specifically for upsert UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); @@ -793,15 +791,13 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("The deleteRecordColumn - %s must be a single-valued column", deleteRecordColumn)); DataType dataType = fieldSpec.getDataType(); - Preconditions.checkState( - dataType == DataType.BOOLEAN || dataType == DataType.STRING || dataType.isNumeric(), + Preconditions.checkState(dataType == DataType.BOOLEAN || dataType == DataType.STRING || dataType.isNumeric(), String.format("The deleteRecordColumn - %s must be of type: String / Boolean / Numeric", deleteRecordColumn)); } String outOfOrderRecordColumn = upsertConfig.getOutOfOrderRecordColumn(); - Preconditions.checkState( - outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(), + Preconditions.checkState(outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(), "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table"); if (outOfOrderRecordColumn != null) { @@ -837,8 +833,8 @@ static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) { String comparisonColumn = comparisonColumns.get(0); DataType comparisonColumnDataType = schema.getFieldSpecFor(comparisonColumn).getDataType(); Preconditions.checkState(comparisonColumnDataType.isNumeric(), - "MetadataTTL / DeletedKeysTTL must have comparison column: %s in numeric type, found: %s", - comparisonColumn, comparisonColumnDataType); + "MetadataTTL / DeletedKeysTTL must have comparison column: %s in numeric type, found: %s", comparisonColumn, + comparisonColumnDataType); } if (upsertConfig.getMetadataTTL() > 0) { @@ -869,14 +865,11 @@ static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) { tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString()); if (instanceAssignmentConfig.getPartitionSelector() == InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR) { - Preconditions.checkState( - tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), + Preconditions.checkState(tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap needed for %s, as " - + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used", - instancePartitionsType)); + + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used", instancePartitionsType)); } else { - Preconditions.checkState( - !tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), + Preconditions.checkState(!tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s", instancePartitionsType)); } @@ -1332,9 +1325,8 @@ private static void validateForwardIndexDisabledIndexCompatibility(String column } Preconditions.checkState(!indexingConfig.isOptimizeDictionaryForMetrics() && !indexingConfig.isOptimizeDictionary(), - String.format( - "Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" - + " not supported with forward index for column: %s, disabled", columnName)); + String.format("Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" + + " not supported with forward index for column: %s, disabled", columnName)); boolean hasDictionary = fieldConfig.getEncodingType() == EncodingType.DICTIONARY; boolean hasInvertedIndex = @@ -1458,11 +1450,6 @@ public static void verifyHybridTableConfigs(String rawTableName, TableConfig off } } - // enum of all the skip-able validation types. - public enum ValidationType { - ALL, TASK, UPSERT - } - /** * needsEmptySegmentPruner checks if EmptySegmentPruner is needed for a TableConfig. * @param tableConfig Input table config. @@ -1539,4 +1526,9 @@ public static TableConfig createTableConfigFromOldFormat(TableConfig tableConfig } return clone; } + + // enum of all the skip-able validation types. + public enum ValidationType { + ALL, TASK, UPSERT + } } From b4eb21be5bb1ca68a7cffc9e5ed08e5a9b0bbf0a Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Mon, 6 May 2024 01:24:43 -0400 Subject: [PATCH 04/14] Formatting of code in Pinot --- .../common/utils/config/TableConfigUtils.java | 32 +++++++++---------- .../CustomFunctionEnricherFactory.java | 2 +- .../converter/RealtimeSegmentConverter.java | 4 ++- ...RecordReaderSegmentCreationDataSource.java | 2 +- .../impl/SegmentIndexCreationDriverImpl.java | 2 +- 5 files changed, 22 insertions(+), 20 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java index 0a59696b10e1..701dc59773ad 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java @@ -62,12 +62,11 @@ public class TableConfigUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigUtils.class); + private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing"; private TableConfigUtils() { } - private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing"; - public static TableConfig fromZNRecord(ZNRecord znRecord) throws IOException { Map simpleFields = znRecord.getSimpleFields(); @@ -80,8 +79,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord) Preconditions.checkState(tableType != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TABLE_TYPE_KEY); String validationConfigString = simpleFields.get(TableConfig.VALIDATION_CONFIG_KEY); - Preconditions - .checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.VALIDATION_CONFIG_KEY); + Preconditions.checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, + TableConfig.VALIDATION_CONFIG_KEY); SegmentsValidationAndRetentionConfig validationConfig = JsonUtils.stringToObject(validationConfigString, SegmentsValidationAndRetentionConfig.class); @@ -90,8 +89,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord) TenantConfig tenantConfig = JsonUtils.stringToObject(tenantConfigString, TenantConfig.class); String indexingConfigString = simpleFields.get(TableConfig.INDEXING_CONFIG_KEY); - Preconditions - .checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.INDEXING_CONFIG_KEY); + Preconditions.checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, + TableConfig.INDEXING_CONFIG_KEY); IndexingConfig indexingConfig = JsonUtils.stringToObject(indexingConfigString, IndexingConfig.class); String customConfigString = simpleFields.get(TableConfig.CUSTOM_CONFIG_KEY); @@ -180,14 +179,16 @@ public static TableConfig fromZNRecord(ZNRecord znRecord) String instancePartitionsMapString = simpleFields.get(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY); if (instancePartitionsMapString != null) { instancePartitionsMap = JsonUtils.stringToObject(instancePartitionsMapString, - new TypeReference>() { }); + new TypeReference>() { + }); } Map segmentAssignmentConfigMap = null; String segmentAssignmentConfigMapString = simpleFields.get(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY); if (segmentAssignmentConfigMapString != null) { segmentAssignmentConfigMap = JsonUtils.stringToObject(segmentAssignmentConfigMapString, - new TypeReference>() { }); + new TypeReference>() { + }); } return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, @@ -228,8 +229,8 @@ public static ZNRecord toZNRecord(TableConfig tableConfig) } Map instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); if (instanceAssignmentConfigMap != null) { - simpleFields - .put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(instanceAssignmentConfigMap)); + simpleFields.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, + JsonUtils.objectToString(instanceAssignmentConfigMap)); } List fieldConfigList = tableConfig.getFieldConfigList(); if (fieldConfigList != null) { @@ -263,11 +264,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig) simpleFields.put(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY, JsonUtils.objectToString(tableConfig.getInstancePartitionsMap())); } - Map segmentAssignmentConfigMap = - tableConfig.getSegmentAssignmentConfigMap(); + Map segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap(); if (segmentAssignmentConfigMap != null) { - simpleFields - .put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(segmentAssignmentConfigMap)); + simpleFields.put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY, + JsonUtils.objectToString(segmentAssignmentConfigMap)); } ZNRecord znRecord = new ZNRecord(tableConfig.getTableName()); @@ -443,8 +443,8 @@ public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig */ public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig, InstancePartitionsType instancePartitionsType) { - return hasPreConfiguredInstancePartitions(tableConfig) - && tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType); + return hasPreConfiguredInstancePartitions(tableConfig) && tableConfig.getInstancePartitionsMap() + .containsKey(instancePartitionsType); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java index 10dded790242..20b489213c48 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java @@ -21,9 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.auto.service.AutoService; import java.io.IOException; -import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.plugin.record.enricher.RecordEnricherFactory; import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig; +import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.JsonUtils; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index 36c7379bd146..b3e6d9479fec 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -24,14 +24,17 @@ import javax.annotation.Nullable; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; + import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.SegmentVersion; + import org.apache.pinot.segment.spi.index.FstIndexConfig; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; @@ -42,7 +45,6 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; public class RealtimeSegmentConverter { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java index 33dddbee982b..c0dfdcfad0cb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java @@ -19,13 +19,13 @@ package org.apache.pinot.segment.local.segment.creator; import org.apache.pinot.common.Utils; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.segment.local.segment.creator.impl.stats.SegmentPreIndexStatsCollectorImpl; import org.apache.pinot.segment.spi.creator.SegmentCreationDataSource; import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index 95d2c127f78e..a8596fccbb3b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; +import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; @@ -80,7 +81,6 @@ import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.ReadMode; import org.slf4j.Logger; From 0e022932fe67c482a452ca81bd307b190372c748 Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Sat, 11 May 2024 00:17:07 -0400 Subject: [PATCH 05/14] Remove unused interface RecordEnricherConfig --- .../record/enricher/RecordEnricherConfig.java | 23 ------------------- 1 file changed, 23 deletions(-) delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherConfig.java diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherConfig.java deleted file mode 100644 index fd4cbe5f9e78..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherConfig.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.record.enricher; - -public interface RecordEnricherConfig { - void parse(); -} From 12e9a87eb7716964c72a4f2d698a6cb9a26963a6 Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 00:05:08 -0400 Subject: [PATCH 06/14] Merge Pipeline, Factory and ValidationConfig into RecordTransformer from RecordEnricher package --- .../flink/sink/FlinkSegmentWriter.java | 5 +- .../realtime/RealtimeSegmentDataManager.java | 8 +- .../framework/SegmentProcessorFramework.java | 3 +- .../processing/mapper/SegmentMapper.java | 5 +- .../enricher/RecordEnricherPipeline.java | 76 --------------- .../enricher/RecordEnricherRegistry.java | 63 ------------ .../RecordEnricherValidationConfig.java | 35 ------- ...tory.java => RecordTransformerLoader.java} | 25 +++-- .../enricher/clp/CLPEncodingEnricher.java | 21 ++++ .../clp/CLPEncodingEnricherFactory.java | 51 ---------- .../function/CustomFunctionEnricher.java | 29 ++++++ .../CustomFunctionEnricherFactory.java | 63 ------------ .../converter/RealtimeSegmentConverter.java | 4 +- .../recordtransformer/RecordTransformer.java | 97 ++++++++++++++++++- ...RecordReaderSegmentCreationDataSource.java | 10 +- .../impl/SegmentIndexCreationDriverImpl.java | 9 +- .../segment/local/utils/IngestionUtils.java | 4 +- .../segment/local/utils/TableConfigUtils.java | 7 +- 18 files changed, 190 insertions(+), 325 deletions(-) delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherPipeline.java delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherValidationConfig.java rename pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/{RecordEnricherFactory.java => RecordTransformerLoader.java} (53%) delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java diff --git a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java index cc7961e2502e..304e4485840e 100644 --- a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java +++ b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java @@ -53,7 +53,6 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +77,7 @@ public class FlinkSegmentWriter implements SegmentWriter { private String _outputDirURI; private Schema _schema; private Set _fieldsToRead; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformer _recordEnricherPipeline; private RecordTransformer _recordTransformer; private File _stagingDir; @@ -139,7 +138,7 @@ public void init(TableConfig tableConfig, Schema schema, Map bat _schema = schema; _fieldsToRead = _schema.getColumnNames(); - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig); + _recordEnricherPipeline = RecordTransformer.fromTableConfig(_tableConfig); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); _avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema); _reusableRecord = new GenericData.Record(_avroSchema); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index c02e59b89cc1..0efece511011 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -55,6 +55,7 @@ import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable; import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; @@ -76,7 +77,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.metrics.PinotMeter; import org.apache.pinot.spi.plugin.PluginManager; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; @@ -275,7 +275,7 @@ public void deleteSegmentFile() { private final int _partitionGroupId; private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; final String _clientId; - private final RecordEnricherPipeline _recordEnricherPipeline; + private final RecordTransformer _recordEnricherPipeline; private final TransformPipeline _transformPipeline; private PartitionGroupConsumer _partitionGroupConsumer = null; private StreamMetadataProvider _partitionMetadataProvider = null; @@ -1514,10 +1514,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf } try { - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig); + _recordEnricherPipeline = RecordTransformer.fromTableConfig(tableConfig); } catch (Exception e) { _realtimeTableDataManager.addSegmentError(_segmentNameStr, - new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e)); + new SegmentErrorInfo(now(), "Failed to initialize the Record Transformer pipeline", e)); throw e; } _transformPipeline = new TransformPipeline(tableConfig, schema); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index c2c6a44fbbe8..b2770a003658 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -43,7 +43,6 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -290,7 +289,7 @@ private List generateSegment(Map partitionT GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId); SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), - RecordEnricherPipeline.getPassThroughPipeline(), + RecordTransformer.getPassThroughPipeline(), TransformPipeline.getPassThroughPipeline()); driver.build(); outputSegmentDirs.add(driver.getOutputDirectory()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 8833f8bbd9ef..df0989c04b2d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -48,7 +48,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +68,7 @@ public class SegmentMapper { private final List _fieldSpecs; private final boolean _includeNullFields; private final int _numSortFields; - private final RecordEnricherPipeline _recordEnricherPipeline; + private final RecordTransformer _recordEnricherPipeline; private final CompositeTransformer _recordTransformer; private final TimeHandler _timeHandler; private final Partitioner[] _partitioners; @@ -94,7 +93,7 @@ public SegmentMapper(List recordReaderFileConfigs, _fieldSpecs = pair.getLeft(); _numSortFields = pair.getRight(); _includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled(); - _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig); + _recordEnricherPipeline = RecordTransformer.fromTableConfig(tableConfig); _recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema); _timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig); List partitionerConfigs = processorConfig.getPartitionerConfigs(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherPipeline.java deleted file mode 100644 index 96c769e28f8b..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherPipeline.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.record.enricher; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; -import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; -import org.apache.pinot.spi.data.readers.GenericRow; - - -public class RecordEnricherPipeline { - private final List _enrichers = new ArrayList<>(); - private final Set _columnsToExtract = new HashSet<>(); - - public static RecordEnricherPipeline getPassThroughPipeline() { - return new RecordEnricherPipeline(); - } - - public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingestionConfig) { - RecordEnricherPipeline pipeline = new RecordEnricherPipeline(); - if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) { - return pipeline; - } - List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); - for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { - try { - RecordTransformer enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig); - pipeline.add(enricher); - } catch (IOException e) { - throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e); - } - } - return pipeline; - } - - public static RecordEnricherPipeline fromTableConfig(TableConfig tableConfig) { - return fromIngestionConfig(tableConfig.getIngestionConfig()); - } - - public Set getColumnsToExtract() { - return _columnsToExtract; - } - - public void add(RecordTransformer enricher) { - _enrichers.add(enricher); - _columnsToExtract.addAll(enricher.getInputColumns()); - } - - public void run(GenericRow record) { - for (RecordTransformer enricher : _enrichers) { - enricher.transform(record); - } - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java deleted file mode 100644 index 0b749e4bf5af..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.record.enricher; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.ServiceLoader; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class RecordEnricherRegistry { - private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class); - private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); - - private RecordEnricherRegistry() { - } - - public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, - RecordEnricherValidationConfig config) { - if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { - throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); - } - - RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) - .validateEnrichmentConfig(enrichmentConfig.getProperties(), config); - } - - public static RecordTransformer createRecordEnricher(EnrichmentConfig enrichmentConfig) - throws IOException { - if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { - throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); - } - return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) - .createEnricher(enrichmentConfig.getProperties()); - } - - static { - for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) { - LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType()); - RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory); - } - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherValidationConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherValidationConfig.java deleted file mode 100644 index 890a10dff54b..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherValidationConfig.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.pinot.plugin.record.enricher; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * Interface for cluster constrains, which can be used to validate the record enricher configs - */ -public class RecordEnricherValidationConfig { - private final boolean _groovyDisabled; - - public RecordEnricherValidationConfig(boolean groovyDisabled) { - _groovyDisabled = groovyDisabled; - } - - public boolean isGroovyDisabled() { - return _groovyDisabled; - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordTransformerLoader.java similarity index 53% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherFactory.java rename to pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordTransformerLoader.java index 2bc6cd08c417..f26b2581be23 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordTransformerLoader.java @@ -18,13 +18,26 @@ */ package org.apache.pinot.plugin.record.enricher; -import com.fasterxml.jackson.databind.JsonNode; -import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public interface RecordEnricherFactory { - String getEnricherType(); - RecordTransformer createEnricher(JsonNode enricherProps) throws IOException; - void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig); +public class RecordTransformerLoader { + private static final Logger LOGGER = LoggerFactory.getLogger(RecordTransformerLoader.class); + private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); + + static { + for (RecordTransformer recordEnricherFactory : ServiceLoader.load(RecordTransformer.class)) { + LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType()); + RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory); + } + } + + public static Map getRecordEnricherFactoryMap() { + return RECORD_ENRICHER_FACTORY_MAP; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java index b42a79f6a4ec..637ad2498357 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java @@ -44,6 +44,7 @@ public class CLPEncodingEnricher implements RecordTransformer { private final ClpEnricherConfig _config; private final EncodedMessage _clpEncodedMessage; private final MessageEncoder _clpMessageEncoder; + private static final String ENRICHER_TYPE = "clpEnricher"; public CLPEncodingEnricher(JsonNode enricherProperties) throws IOException { _config = JsonUtils.jsonNodeToObject(enricherProperties, ClpEnricherConfig.class); @@ -98,4 +99,24 @@ private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars); to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars); } + + @Override + public String getEnricherType() { + return ENRICHER_TYPE; + } + + @Override + public RecordTransformer createEnricher(JsonNode enricherProps) + throws IOException { + return new CLPEncodingEnricher(enricherProps); + } + + @Override + public void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) { + try { + ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse clp enricher config", e); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java deleted file mode 100644 index 64e0c5cccb25..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.record.enricher.clp; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.auto.service.AutoService; -import java.io.IOException; -import org.apache.pinot.plugin.record.enricher.RecordEnricherFactory; -import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; -import org.apache.pinot.spi.utils.JsonUtils; - -@AutoService(RecordEnricherFactory.class) -public class CLPEncodingEnricherFactory implements RecordEnricherFactory { - private static final String ENRICHER_TYPE = "clpEnricher"; - @Override - public String getEnricherType() { - return ENRICHER_TYPE; - } - - @Override - public RecordTransformer createEnricher(JsonNode enricherProps) - throws IOException { - return new CLPEncodingEnricher(enricherProps); - } - - @Override - public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) { - try { - ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to parse clp enricher config", e); - } - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java index e423a356fcb9..fbcbea860bd1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java @@ -38,6 +38,11 @@ public class CustomFunctionEnricher implements RecordTransformer { private final Map _fieldToFunctionEvaluator; private final List _fieldsToExtract; + private static final String TYPE = "generateColumn"; + @Override + public String getEnricherType() { + return TYPE; + } public CustomFunctionEnricher(JsonNode enricherProps) throws IOException { CustomFunctionEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); @@ -64,4 +69,28 @@ public GenericRow transform(GenericRow record) { }); return record; } + + @Override + public RecordTransformer createEnricher(JsonNode enricherProps) + throws IOException { + return new CustomFunctionEnricher(enricherProps); + } + + @Override + public void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) { + CustomFunctionEnricherConfig config; + try { + config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); + if (!validationConfig) { + return; + } + for (String function : config.getFieldToFunctionMap().values()) { + if (FunctionEvaluatorFactory.isGroovyExpression(function)) { + throw new IllegalArgumentException("Groovy expression is not allowed for enrichment"); + } + } + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse custom function enricher config", e); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java deleted file mode 100644 index 20b489213c48..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.record.enricher.function; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.auto.service.AutoService; -import java.io.IOException; -import org.apache.pinot.plugin.record.enricher.RecordEnricherFactory; -import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig; -import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; -import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; -import org.apache.pinot.spi.utils.JsonUtils; - - -@AutoService(RecordEnricherFactory.class) -public class CustomFunctionEnricherFactory implements RecordEnricherFactory { - private static final String TYPE = "generateColumn"; - - @Override - public String getEnricherType() { - return TYPE; - } - - @Override - public RecordTransformer createEnricher(JsonNode enricherProps) - throws IOException { - return new CustomFunctionEnricher(enricherProps); - } - - @Override - public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) { - CustomFunctionEnricherConfig config; - try { - config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class); - if (!validationConfig.isGroovyDisabled()) { - return; - } - for (String function : config.getFieldToFunctionMap().values()) { - if (FunctionEvaluatorFactory.isGroovyExpression(function)) { - throw new IllegalArgumentException("Groovy expression is not allowed for enrichment"); - } - } - } catch (IOException e) { - throw new IllegalArgumentException("Failed to parse custom function enricher config", e); - } - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index b3e6d9479fec..e6341f877cd2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -24,9 +24,9 @@ import javax.annotation.Nullable; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder; @@ -145,7 +145,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM recordReader.init(_realtimeSegmentImpl, sortedDocIds); RealtimeSegmentSegmentCreationDataSource dataSource = new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader); - driver.init(genConfig, dataSource, RecordEnricherPipeline.getPassThroughPipeline(), + driver.init(genConfig, dataSource, RecordTransformer.getPassThroughPipeline(), TransformPipeline.getPassThroughPipeline()); if (!_enableColumnMajor) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 3119f0394762..21edaa257ca1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -18,18 +18,37 @@ */ package org.apache.pinot.segment.local.recordtransformer; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.plugin.record.enricher.RecordTransformerLoader; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The record transformer which takes a {@link GenericRow} and transform it based on some custom rules. */ public interface RecordTransformer extends Serializable { + final boolean _groovyDisabled = false; + static final Logger LOGGER = LoggerFactory.getLogger(RecordTransformer.class); + Map RECORD_ENRICHER_FACTORY_MAP = RecordTransformerLoader.getRecordEnricherFactoryMap(); + final List _enrichers = new ArrayList<>(); + final Set _columnsToExtract = new HashSet<>(); + static final String NONE_TYPE = ""; /** * Returns {@code true} if the transformer is no-op (can be skipped), {@code false} otherwise. */ @@ -44,7 +63,7 @@ default boolean isNoOp() { * @return Transformed record, or {@code null} if the record does not follow certain rules. */ @Nullable - GenericRow transform(GenericRow record); + default GenericRow transform(GenericRow record) {return null;} /** * Returns the list of input columns required for enriching the record. @@ -53,4 +72,80 @@ default boolean isNoOp() { default List getInputColumns() { return new ArrayList<>(); } + + default String getEnricherType() { + return NONE_TYPE; + } + + default RecordTransformer createEnricher(JsonNode enricherProps) + throws IOException { + return null; + } + + default void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) { + } + + default boolean isGroovyDisabled() { + return _groovyDisabled; + } + + static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, + boolean config) { + if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { + throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); + } + + RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) + .validateEnrichmentConfig(enrichmentConfig.getProperties(), config); + } + + static RecordTransformer createRecordEnricher(EnrichmentConfig enrichmentConfig) + throws IOException { + if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { + throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); + } + return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) + .createEnricher(enrichmentConfig.getProperties()); + } + + static RecordTransformer getPassThroughPipeline() { + return null; + } + + static RecordTransformer fromIngestionConfig(IngestionConfig ingestionConfig) { + RecordTransformer pipeline = new RecordTransformer() { + }; + if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) { + return pipeline; + } + List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); + for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { + try { + RecordTransformer enricher = RecordTransformer.createRecordEnricher(enrichmentConfig); + pipeline.add(enricher); + } catch (IOException e) { + throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e); + } + } + return pipeline; + } + + static RecordTransformer fromTableConfig(TableConfig tableConfig) { + return fromIngestionConfig(tableConfig.getIngestionConfig()); + } + + default Set getColumnsToExtract() { + return _columnsToExtract; + } + + default void add(RecordTransformer enricher) { + _enrichers.add(enricher); + _columnsToExtract.addAll(enricher.getInputColumns()); + } + + default void run(GenericRow record) { + for (RecordTransformer enricher : _enrichers) { + enricher.transform(record); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java index c0dfdcfad0cb..c6efdd0d3a80 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java @@ -19,7 +19,7 @@ package org.apache.pinot.segment.local.segment.creator; import org.apache.pinot.common.Utils; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.impl.stats.SegmentPreIndexStatsCollectorImpl; import org.apache.pinot.segment.spi.creator.SegmentCreationDataSource; import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector; @@ -39,14 +39,14 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class); private final RecordReader _recordReader; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformer _recordEnricherPipeline; private TransformPipeline _transformPipeline; public RecordReaderSegmentCreationDataSource(RecordReader recordReader) { _recordReader = recordReader; } - public void setRecordEnricherPipeline(RecordEnricherPipeline recordEnricherPipeline) { + public void setRecordEnricherPipeline(RecordTransformer recordEnricherPipeline) { _recordEnricherPipeline = recordEnricherPipeline; } @@ -57,8 +57,8 @@ public void setTransformPipeline(TransformPipeline transformPipeline) { @Override public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) { try { - RecordEnricherPipeline recordEnricherPipeline = _recordEnricherPipeline != null ? _recordEnricherPipeline - : RecordEnricherPipeline.fromTableConfig(statsCollectorConfig.getTableConfig()); + RecordTransformer recordEnricherPipeline = _recordEnricherPipeline != null ? _recordEnricherPipeline + : RecordTransformer.fromTableConfig(statsCollectorConfig.getTableConfig()); TransformPipeline transformPipeline = _transformPipeline != null ? _transformPipeline : new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index a8596fccbb3b..0c228dba7f19 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -36,7 +36,6 @@ import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; @@ -102,7 +101,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive private SegmentCreator _indexCreator; private SegmentIndexCreationInfo _segmentIndexCreationInfo; private Schema _dataSchema; - private RecordEnricherPipeline _recordEnricherPipeline; + private RecordTransformer _recordEnricherPipeline; private TransformPipeline _transformPipeline; private IngestionSchemaValidator _ingestionSchemaValidator; private int _totalDocs = 0; @@ -183,7 +182,7 @@ public RecordReader getRecordReader() { public void init(SegmentGeneratorConfig config, RecordReader recordReader) throws Exception { SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader); - init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), + init(config, dataSource, RecordTransformer.fromTableConfig(config.getTableConfig()), new TransformPipeline(config.getTableConfig(), config.getSchema())); } @@ -191,12 +190,12 @@ public void init(SegmentGeneratorConfig config, RecordReader recordReader) public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer) throws Exception { - init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), + init(config, dataSource, RecordTransformer.fromTableConfig(config.getTableConfig()), new TransformPipeline(recordTransformer, complexTypeTransformer)); } public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, - RecordEnricherPipeline enricherPipeline, TransformPipeline transformPipeline) + RecordTransformer enricherPipeline, TransformPipeline transformPipeline) throws Exception { _config = config; _recordReader = dataSource.getRecordReader(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index 6d791b652e8f..fb2a28c7094b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -35,6 +35,7 @@ import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; @@ -65,7 +66,6 @@ import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; -import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; @@ -383,7 +383,7 @@ private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig i } } - fields.addAll(RecordEnricherPipeline.fromIngestionConfig(ingestionConfig).getColumnsToExtract()); + fields.addAll(RecordTransformer.fromIngestionConfig(ingestionConfig).getColumnsToExtract()); List transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { for (TransformConfig transformConfig : transformConfigs) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 0618842b61d2..a845e24105dc 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -45,6 +45,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformerV2; import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; @@ -85,8 +86,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.ingestion.batch.BatchConfig; -import org.apache.pinot.plugin.record.enricher.RecordEnricherRegistry; -import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; @@ -512,8 +511,8 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc List enrichmentConfigs = ingestionConfig.getEnrichmentConfigs(); if (enrichmentConfigs != null) { for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) { - RecordEnricherRegistry.validateEnrichmentConfig(enrichmentConfig, - new RecordEnricherValidationConfig(disableGroovy)); + RecordTransformer.validateEnrichmentConfig(enrichmentConfig, + disableGroovy); } } From f4424daf1ffa5d8cf979702fedc7eb12f83b9000 Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 00:14:56 -0400 Subject: [PATCH 07/14] Checkstyle changes for the PR --- .../recordtransformer/RecordTransformer.java | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 21edaa257ca1..cbbe1453b222 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -22,11 +22,9 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.ServiceLoader; import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.plugin.record.enricher.RecordTransformerLoader; @@ -42,55 +40,15 @@ * The record transformer which takes a {@link GenericRow} and transform it based on some custom rules. */ public interface RecordTransformer extends Serializable { - final boolean _groovyDisabled = false; + final boolean groovyDisabled = false; static final Logger LOGGER = LoggerFactory.getLogger(RecordTransformer.class); Map RECORD_ENRICHER_FACTORY_MAP = RecordTransformerLoader.getRecordEnricherFactoryMap(); - final List _enrichers = new ArrayList<>(); - final Set _columnsToExtract = new HashSet<>(); + final List enrichers = new ArrayList<>(); + final Set columnsToExtract = new HashSet<>(); static final String NONE_TYPE = ""; - /** - * Returns {@code true} if the transformer is no-op (can be skipped), {@code false} otherwise. - */ - default boolean isNoOp() { - return false; - } - - /** - * Transforms a record based on some custom rules. - * - * @param record Record to transform - * @return Transformed record, or {@code null} if the record does not follow certain rules. - */ - @Nullable - default GenericRow transform(GenericRow record) {return null;} - /** - * Returns the list of input columns required for enriching the record. - * This is used to make sure the required input fields are extracted. - */ - default List getInputColumns() { - return new ArrayList<>(); - } - - default String getEnricherType() { - return NONE_TYPE; - } - - default RecordTransformer createEnricher(JsonNode enricherProps) - throws IOException { - return null; - } - - default void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) { - } - - default boolean isGroovyDisabled() { - return _groovyDisabled; - } - - static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, - boolean config) { + static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, boolean config) { if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); } @@ -134,17 +92,59 @@ static RecordTransformer fromTableConfig(TableConfig tableConfig) { return fromIngestionConfig(tableConfig.getIngestionConfig()); } + /** + * Returns {@code true} if the transformer is no-op (can be skipped), {@code false} otherwise. + */ + default boolean isNoOp() { + return false; + } + + /** + * Transforms a record based on some custom rules. + * + * @param record Record to transform + * @return Transformed record, or {@code null} if the record does not follow certain rules. + */ + @Nullable + default GenericRow transform(GenericRow record) { + return null; + } + + /** + * Returns the list of input columns required for enriching the record. + * This is used to make sure the required input fields are extracted. + */ + default List getInputColumns() { + return new ArrayList<>(); + } + + default String getEnricherType() { + return NONE_TYPE; + } + + default RecordTransformer createEnricher(JsonNode enricherProps) + throws IOException { + return null; + } + + default void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) { + } + + default boolean isGroovyDisabled() { + return groovyDisabled; + } + default Set getColumnsToExtract() { - return _columnsToExtract; + return columnsToExtract; } default void add(RecordTransformer enricher) { - _enrichers.add(enricher); - _columnsToExtract.addAll(enricher.getInputColumns()); + enrichers.add(enricher); + columnsToExtract.addAll(enricher.getInputColumns()); } default void run(GenericRow record) { - for (RecordTransformer enricher : _enrichers) { + for (RecordTransformer enricher : enrichers) { enricher.transform(record); } } From e261dae6571a641ba5a69fdfff9bdcbad0c8049e Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 00:32:48 -0400 Subject: [PATCH 08/14] Checkstyle exceptions in the PR --- ...oader.java => RecordEnricherRegistry.java} | 6 ++++-- .../recordtransformer/RecordTransformer.java | 20 +++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) rename pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/{RecordTransformerLoader.java => RecordEnricherRegistry.java} (91%) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordTransformerLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java similarity index 91% rename from pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordTransformerLoader.java rename to pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java index f26b2581be23..76af3c93866e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordTransformerLoader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java @@ -18,16 +18,18 @@ */ package org.apache.pinot.plugin.record.enricher; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RecordTransformerLoader { - private static final Logger LOGGER = LoggerFactory.getLogger(RecordTransformerLoader.class); +public class RecordEnricherRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class); private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); static { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index cbbe1453b222..a04c1a5dcc6e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; -import org.apache.pinot.plugin.record.enricher.RecordTransformerLoader; +import org.apache.pinot.plugin.record.enricher.RecordEnricherRegistry; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; @@ -40,12 +40,12 @@ * The record transformer which takes a {@link GenericRow} and transform it based on some custom rules. */ public interface RecordTransformer extends Serializable { - final boolean groovyDisabled = false; + final boolean GROOVY_DISABLED = false; static final Logger LOGGER = LoggerFactory.getLogger(RecordTransformer.class); - Map RECORD_ENRICHER_FACTORY_MAP = RecordTransformerLoader.getRecordEnricherFactoryMap(); - final List enrichers = new ArrayList<>(); - final Set columnsToExtract = new HashSet<>(); + Map RECORD_ENRICHER_FACTORY_MAP = RecordEnricherRegistry.getRecordEnricherFactoryMap(); + final List ENRICHERS = new ArrayList<>(); + final Set COLUMNS_TO_EXTRACT = new HashSet<>(); static final String NONE_TYPE = ""; static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, boolean config) { @@ -131,20 +131,20 @@ default void validateEnrichmentConfig(JsonNode enricherProps, boolean validation } default boolean isGroovyDisabled() { - return groovyDisabled; + return GROOVY_DISABLED; } default Set getColumnsToExtract() { - return columnsToExtract; + return COLUMNS_TO_EXTRACT; } default void add(RecordTransformer enricher) { - enrichers.add(enricher); - columnsToExtract.addAll(enricher.getInputColumns()); + ENRICHERS.add(enricher); + COLUMNS_TO_EXTRACT.addAll(enricher.getInputColumns()); } default void run(GenericRow record) { - for (RecordTransformer enricher : enrichers) { + for (RecordTransformer enricher : ENRICHERS) { enricher.transform(record); } } From 4a18a04c85a346697f845ca095c8c58fde8d6404 Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 00:33:24 -0400 Subject: [PATCH 09/14] Checkstyle exception --- .../plugin/record/enricher/RecordEnricherRegistry.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java index 76af3c93866e..54205c91c2e6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java @@ -32,14 +32,14 @@ public class RecordEnricherRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class); private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); + public static Map getRecordEnricherFactoryMap() { + return RECORD_ENRICHER_FACTORY_MAP; + } + static { for (RecordTransformer recordEnricherFactory : ServiceLoader.load(RecordTransformer.class)) { LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType()); RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory); } } - - public static Map getRecordEnricherFactoryMap() { - return RECORD_ENRICHER_FACTORY_MAP; - } } From b328fecd01b33acd924295455b9a9cd0d4384c98 Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 00:42:52 -0400 Subject: [PATCH 10/14] Create a private constructer for not allowing instantiation --- .../plugin/record/enricher/RecordEnricherRegistry.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java index 54205c91c2e6..79987c65ce5a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java @@ -18,12 +18,10 @@ */ package org.apache.pinot.plugin.record.enricher; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; -import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +30,10 @@ public class RecordEnricherRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class); private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); + private RecordEnricherRegistry() { + + } + public static Map getRecordEnricherFactoryMap() { return RECORD_ENRICHER_FACTORY_MAP; } From cfdf2c97db98b1578c6911e50bbcfcba5da74c4c Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 00:45:15 -0400 Subject: [PATCH 11/14] checkstyle exception --- .../pinot/plugin/record/enricher/RecordEnricherRegistry.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java index 79987c65ce5a..8cbe959de355 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/RecordEnricherRegistry.java @@ -31,7 +31,6 @@ public class RecordEnricherRegistry { private static final Map RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); private RecordEnricherRegistry() { - } public static Map getRecordEnricherFactoryMap() { From 420273630ca766a28c926e6813ee8c8bbecdbdac Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 00:54:36 -0400 Subject: [PATCH 12/14] checkstyle violation - formatting --- .../local/realtime/converter/RealtimeSegmentConverter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index e6341f877cd2..a79f05dcca72 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -31,10 +31,8 @@ import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; - import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.SegmentVersion; - import org.apache.pinot.segment.spi.index.FstIndexConfig; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; From 437c840925aa41e5c211989e27c042854ded3f88 Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 01:37:22 -0400 Subject: [PATCH 13/14] Failing test - RecordTransformer --- .../segment/local/recordtransformer/RecordTransformer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index a04c1a5dcc6e..8fd546fcdc8b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -67,7 +67,8 @@ static RecordTransformer createRecordEnricher(EnrichmentConfig enrichmentConfig) } static RecordTransformer getPassThroughPipeline() { - return null; + return new RecordTransformer() { + }; } static RecordTransformer fromIngestionConfig(IngestionConfig ingestionConfig) { From 7c7f3cd32b091733bbe647cc5f9a8e933ba03bda Mon Sep 17 00:00:00 2001 From: deepthi912 Date: Thu, 30 May 2024 08:48:58 -0400 Subject: [PATCH 14/14] Checkstyle change - Pinot test skipped --- .../pinot/segment/local/recordtransformer/RecordTransformer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java index 8fd546fcdc8b..d3c906009e39 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformer.java @@ -52,7 +52,6 @@ static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, boolean if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); } - RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) .validateEnrichmentConfig(enrichmentConfig.getProperties(), config); }