Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#13049 Refactored RecordTransformer & merged RecordEnricher #13086

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@

public class TableConfigUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigUtils.class);
private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing";

private TableConfigUtils() {
}

private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing";

public static TableConfig fromZNRecord(ZNRecord znRecord)
throws IOException {
Map<String, String> simpleFields = znRecord.getSimpleFields();
Expand All @@ -80,8 +79,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
Preconditions.checkState(tableType != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TABLE_TYPE_KEY);

String validationConfigString = simpleFields.get(TableConfig.VALIDATION_CONFIG_KEY);
Preconditions
.checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.VALIDATION_CONFIG_KEY);
Preconditions.checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE,
TableConfig.VALIDATION_CONFIG_KEY);
SegmentsValidationAndRetentionConfig validationConfig =
JsonUtils.stringToObject(validationConfigString, SegmentsValidationAndRetentionConfig.class);

Expand All @@ -90,8 +89,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
TenantConfig tenantConfig = JsonUtils.stringToObject(tenantConfigString, TenantConfig.class);

String indexingConfigString = simpleFields.get(TableConfig.INDEXING_CONFIG_KEY);
Preconditions
.checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.INDEXING_CONFIG_KEY);
Preconditions.checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE,
TableConfig.INDEXING_CONFIG_KEY);
IndexingConfig indexingConfig = JsonUtils.stringToObject(indexingConfigString, IndexingConfig.class);

String customConfigString = simpleFields.get(TableConfig.CUSTOM_CONFIG_KEY);
Expand Down Expand Up @@ -180,14 +179,16 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
String instancePartitionsMapString = simpleFields.get(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY);
if (instancePartitionsMapString != null) {
instancePartitionsMap = JsonUtils.stringToObject(instancePartitionsMapString,
new TypeReference<Map<InstancePartitionsType, String>>() { });
new TypeReference<Map<InstancePartitionsType, String>>() {
});
}

Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap = null;
String segmentAssignmentConfigMapString = simpleFields.get(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY);
if (segmentAssignmentConfigMapString != null) {
segmentAssignmentConfigMap = JsonUtils.stringToObject(segmentAssignmentConfigMapString,
new TypeReference<Map<String, SegmentAssignmentConfig>>() { });
new TypeReference<Map<String, SegmentAssignmentConfig>>() {
});
}

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
Expand Down Expand Up @@ -228,8 +229,8 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
}
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
if (instanceAssignmentConfigMap != null) {
simpleFields
.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(instanceAssignmentConfigMap));
simpleFields.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY,
JsonUtils.objectToString(instanceAssignmentConfigMap));
}
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList != null) {
Expand Down Expand Up @@ -263,11 +264,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
simpleFields.put(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY,
JsonUtils.objectToString(tableConfig.getInstancePartitionsMap()));
}
Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap =
tableConfig.getSegmentAssignmentConfigMap();
Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
if (segmentAssignmentConfigMap != null) {
simpleFields
.put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(segmentAssignmentConfigMap));
simpleFields.put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY,
JsonUtils.objectToString(segmentAssignmentConfigMap));
}

ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
Expand Down Expand Up @@ -443,8 +443,8 @@ public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig
*/
public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
return hasPreConfiguredInstancePartitions(tableConfig)
&& tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType);
return hasPreConfiguredInstancePartitions(tableConfig) && tableConfig.getInstancePartitionsMap()
.containsKey(instancePartitionsType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.plugin.record.enricher.RecordEnricherPipeline;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.recordenricher;
package org.apache.pinot.plugin.record.enricher;

public interface RecordEnricherConfig {
void parse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.recordenricher;
package org.apache.pinot.plugin.record.enricher;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;


public interface RecordEnricherFactory {
String getEnricherType();
RecordEnricher createEnricher(JsonNode enricherProps) throws IOException;
RecordTransformer createEnricher(JsonNode enricherProps) throws IOException;
void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.recordenricher;
package org.apache.pinot.plugin.record.enricher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.readers.GenericRow;


public class RecordEnricherPipeline {
private final List<RecordEnricher> _enrichers = new ArrayList<>();
private final List<RecordTransformer> _enrichers = new ArrayList<>();
private final Set<String> _columnsToExtract = new HashSet<>();

public static RecordEnricherPipeline getPassThroughPipeline() {
Expand All @@ -45,7 +46,7 @@ public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingesti
List<EnrichmentConfig> enrichmentConfigs = ingestionConfig.getEnrichmentConfigs();
for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
try {
RecordEnricher enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig);
RecordTransformer enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig);
pipeline.add(enricher);
} catch (IOException e) {
throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e);
Expand All @@ -62,14 +63,14 @@ public Set<String> getColumnsToExtract() {
return _columnsToExtract;
}

public void add(RecordEnricher enricher) {
public void add(RecordTransformer enricher) {
_enrichers.add(enricher);
_columnsToExtract.addAll(enricher.getInputColumns());
}

public void run(GenericRow record) {
for (RecordEnricher enricher : _enrichers) {
enricher.enrich(record);
for (RecordTransformer enricher : _enrichers) {
enricher.transform(record);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.recordenricher;
package org.apache.pinot.plugin.record.enricher;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,7 +45,7 @@ public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig,
.validateEnrichmentConfig(enrichmentConfig.getProperties(), config);
}

public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig)
public static RecordTransformer createRecordEnricher(EnrichmentConfig enrichmentConfig)
throws IOException {
if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) {
throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.pinot.spi.recordenricher;
package org.apache.pinot.plugin.record.enricher;

/**
* Licensed to the Apache Software Foundation (ASF) under one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.yscope.clp.compressorfrontend.MessageEncoder;
import java.io.IOException;
import java.util.List;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordenricher.RecordEnricher;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
import org.slf4j.Logger;
Expand All @@ -39,7 +39,7 @@
* 2. 'x_dictVars' - The dictionary variables of the encoded message
* 3. 'x_encodedVars' - The encoded variables of the encoded message
*/
public class CLPEncodingEnricher implements RecordEnricher {
public class CLPEncodingEnricher implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
private final ClpEnricherConfig _config;
private final EncodedMessage _clpEncodedMessage;
Expand All @@ -58,7 +58,7 @@ public List<String> getInputColumns() {
}

@Override
public void enrich(GenericRow record) {
public GenericRow transform(GenericRow record) {
try {
for (String field : _config.getFields()) {
Object value = record.getValue(field);
Expand All @@ -69,6 +69,7 @@ public void enrich(GenericRow record) {
} catch (Exception e) {
LOGGER.error("Failed to enrich record: {}", record);
}
return record;
}

private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.auto.service.AutoService;
import java.io.IOException;
import org.apache.pinot.spi.recordenricher.RecordEnricher;
import org.apache.pinot.spi.recordenricher.RecordEnricherFactory;
import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
import org.apache.pinot.plugin.record.enricher.RecordEnricherFactory;
import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.utils.JsonUtils;

@AutoService(RecordEnricherFactory.class)
Expand All @@ -35,7 +35,7 @@ public String getEnricherType() {
}

@Override
public RecordEnricher createEnricher(JsonNode enricherProps)
public RecordTransformer createEnricher(JsonNode enricherProps)
throws IOException {
return new CLPEncodingEnricher(enricherProps);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
import java.util.Map;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordenricher.RecordEnricher;
import org.apache.pinot.spi.utils.JsonUtils;


/**
* Enriches the record with custom functions.
*/
public class CustomFunctionEnricher implements RecordEnricher {
public class CustomFunctionEnricher implements RecordTransformer {
private final Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
private final List<String> _fieldsToExtract;

Expand All @@ -58,9 +58,10 @@ public List<String> getInputColumns() {
}

@Override
public void enrich(GenericRow record) {
public GenericRow transform(GenericRow record) {
_fieldToFunctionEvaluator.forEach((field, evaluator) -> {
record.putValue(field, evaluator.evaluate(record));
});
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.auto.service.AutoService;
import java.io.IOException;
import org.apache.pinot.plugin.record.enricher.RecordEnricherFactory;
import org.apache.pinot.plugin.record.enricher.RecordEnricherValidationConfig;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.spi.recordenricher.RecordEnricher;
import org.apache.pinot.spi.recordenricher.RecordEnricherFactory;
import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.utils.JsonUtils;


@AutoService(RecordEnricherFactory.class)
public class CustomFunctionEnricherFactory implements RecordEnricherFactory {
private static final String TYPE = "generateColumn";

@Override
public String getEnricherType() {
return TYPE;
}

@Override
public RecordEnricher createEnricher(JsonNode enricherProps)
public RecordTransformer createEnricher(JsonNode enricherProps)
throws IOException {
return new CustomFunctionEnricher(enricherProps);
}
Expand Down
Loading
Loading