Skip to content

Commit

Permalink
Merge RecordEnricher into RecordTransformer
Browse files Browse the repository at this point in the history
  • Loading branch information
aadilkhalifa committed Jul 30, 2024
1 parent e80d95f commit 2aca00f
Show file tree
Hide file tree
Showing 45 changed files with 385 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ public void testSerDe()
new StreamIngestionConfig(Collections.singletonList(Collections.singletonMap("streamType", "kafka"))));
ingestionConfig.setFilterConfig(new FilterConfig("filterFunc(foo)"));
ingestionConfig.setTransformConfigs(
Arrays.asList(new TransformConfig("bar", "func(moo)"), new TransformConfig("zoo", "myfunc()")));
Arrays.asList(new TransformConfig("bar", "func(moo)", null, null),
new TransformConfig("zoo", "myfunc()", null, null)));
ingestionConfig.setComplexTypeConfig(new ComplexTypeConfig(Arrays.asList("c1", "c2"), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, Collections.emptyMap()));
ingestionConfig.setAggregationConfigs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -53,7 +54,6 @@
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -78,7 +78,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
private String _outputDirURI;
private Schema _schema;
private Set<String> _fieldsToRead;
private RecordEnricherPipeline _recordEnricherPipeline;
private RecordTransformerPipeline _recordTransformerPipeline;
private RecordTransformer _recordTransformer;

private File _stagingDir;
Expand Down Expand Up @@ -139,7 +139,7 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat

_schema = schema;
_fieldsToRead = _schema.getColumnNames();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig);
_recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(_tableConfig);
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
_avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
_reusableRecord = new GenericData.Record(_avroSchema);
Expand Down Expand Up @@ -175,7 +175,7 @@ private void resetBuffer()
public void collect(GenericRow row)
throws IOException {
long startTime = System.currentTimeMillis();
_recordEnricherPipeline.run(row);
_recordTransformerPipeline.run(row);
GenericRow transform = _recordTransformer.transform(row);
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead);
_rowCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
Expand All @@ -80,7 +81,6 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
Expand Down Expand Up @@ -283,7 +283,7 @@ public void deleteSegmentFile() {
private final int _partitionGroupId;
private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus;
final String _clientId;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final RecordTransformerPipeline _recordTransformerPipeline;
private final TransformPipeline _transformPipeline;
private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _partitionMetadataProvider = null;
Expand Down Expand Up @@ -603,7 +603,7 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee
_numRowsErrored++;
} else {
try {
_recordEnricherPipeline.run(decodedRow.getResult());
_recordTransformerPipeline.run(decodedRow.getResult());
_transformPipeline.processRow(decodedRow.getResult(), reusedResult);
} catch (Exception e) {
_numRowsErrored++;
Expand Down Expand Up @@ -1588,7 +1588,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_streamDataDecoder = localStreamDataDecoder.get();

try {
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(tableConfig);
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.core.segment.processing.reducer.Reducer;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
Expand All @@ -43,7 +44,6 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -291,7 +291,7 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
RecordEnricherPipeline.getPassThroughPipeline(),
RecordTransformerPipeline.getPassThroughPipeline(),
TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -70,7 +70,7 @@ public class SegmentMapper {
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
private final int _numSortFields;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final RecordTransformerPipeline _recordTransformerPipeline;
private final CompositeTransformer _recordTransformer;
private final ComplexTypeTransformer _complexTypeTransformer;
private final TimeHandler _timeHandler;
Expand All @@ -96,7 +96,7 @@ public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
_fieldSpecs = pair.getLeft();
_numSortFields = pair.getRight();
_includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(tableConfig);
_recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema);
_complexTypeTransformer = ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
_timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
Expand Down Expand Up @@ -172,7 +172,7 @@ private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow
observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
reuse = recordReader.next(reuse);
_recordEnricherPipeline.run(reuse);
_recordTransformerPipeline.run(reuse);

if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
//noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public void testCompatibilityWithTableConfig() {
// schema doesn't have destination columns from transformConfigs
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("colA", "round(colB, 1000)")));
ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("colA",
"round(colB, 1000)", null, null)));
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig).build();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ public class JsonUnnestIngestionFromAvroQueriesTest extends BaseQueriesTest {
.build();
private static final TableConfig TABLE_CONFIG =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(
new IngestionConfig(null, null, null, null,
List.of(new TransformConfig("eventTimeColumn", "eventTimeColumn.seconds * 1000"),
new TransformConfig("eventTimeColumn_10m", "round(eventTimeColumn, 60000)")),
new IngestionConfig(null, null, null,
List.of(new TransformConfig("eventTimeColumn", "eventTimeColumn.seconds * 1000", null, null),
new TransformConfig("eventTimeColumn_10m", "round(eventTimeColumn, 60000)", null, null)),
new ComplexTypeConfig(List.of(JSON_COLUMN), null, null, null), null, null, null)
).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ protected void buildSegment()

TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME)
.setIngestionConfig(new IngestionConfig(null, null, null, null,
.setIngestionConfig(new IngestionConfig(null, null, null,
Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || "
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")),
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)", null, null)),
null, null, null, null))
.build();
Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected List<FieldConfig> getFieldConfigs() {
@Override
protected IngestionConfig getIngestionConfig() {
List<TransformConfig> transforms = new ArrayList<>();
transforms.add(new TransformConfig("timestampInEpoch", "now()"));
transforms.add(new TransformConfig("timestampInEpoch", "now()", null, null));

IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transforms);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ protected IngestionConfig getIngestionConfig() {
new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5 }, AirlineID, ArrDelayMinutes)");
ingestionConfig.setFilterConfig(filterConfig);
List<TransformConfig> transformConfigs = Arrays.asList(
new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)"),
new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)"),
new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)"));
new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)", null, null),
new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)", null, null),
new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)", null, null));
ingestionConfig.setTransformConfigs(transformConfigs);
return ingestionConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private void setupTableWithNonDefaultDatabase(List<File> avroFiles)
noDicCols.add(customCol);
tableConfig.getIndexingConfig().setNoDictionaryColumns(noDicCols);
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(List.of(new TransformConfig(customCol, defaultCol)));
ingestionConfig.setTransformConfigs(List.of(new TransformConfig(customCol, defaultCol, null, null)));
tableConfig.setIngestionConfig(ingestionConfig);
addTableConfig(tableConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,15 +1585,15 @@ private void reloadWithExtraColumns()

TableConfig tableConfig = getOfflineTableConfig();
List<TransformConfig> transformConfigs =
Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"),
new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"),
new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0"),
new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"),
new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs"),
new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs"),
new TransformConfig("NewAddedRawDerivedStringDimension", "reverse(DestCityName)"),
new TransformConfig("NewAddedRawDerivedMVIntDimension", "array(ActualElapsedTime)"),
new TransformConfig("NewAddedDerivedMVDoubleDimension", "array(ArrDelayMinutes)"));
Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24", null, null),
new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000", null, null),
new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0", null, null),
new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')", null, null),
new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs", null, null),
new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs", null, null),
new TransformConfig("NewAddedRawDerivedStringDimension", "reverse(DestCityName)", null, null),
new TransformConfig("NewAddedRawDerivedMVIntDimension", "array(ActualElapsedTime)", null, null),
new TransformConfig("NewAddedDerivedMVDoubleDimension", "array(ArrDelayMinutes)", null, null));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
tableConfig.setIngestionConfig(ingestionConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setUp()
TableConfig realtimeTableConfig = createRealtimeTableConfig(avroFiles.get(0));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(
Collections.singletonList(new TransformConfig("ts", "fromEpochDays(DaysSinceEpoch)")));
Collections.singletonList(new TransformConfig("ts", "fromEpochDays(DaysSinceEpoch)", null, null)));
realtimeTableConfig.setIngestionConfig(ingestionConfig);
FieldConfig tsFieldConfig =
new FieldConfig("ts", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.TIMESTAMP, null, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ public String getTableName() {
@Override
public TableConfig createOfflineTableConfig() {
List<TransformConfig> transformConfigs = Arrays.asList(
new TransformConfig(MY_MAP_STR_K1_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k1')"),
new TransformConfig(MY_MAP_STR_K2_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k2')"),
new TransformConfig(MY_MAP_STR_K1_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k1')",
null, null),
new TransformConfig(MY_MAP_STR_K2_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k2')",
null, null),
new TransformConfig(COMPLEX_MAP_STR_K3_FIELD_NAME,
"jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')"));
"jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')", null, null));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setIngestionConfig(ingestionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ public Schema createSchema() {
@Override
public TableConfig createOfflineTableConfig() {
List<TransformConfig> transformConfigs = Arrays.asList(
new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")"),
new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")"));
new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")",
null, null),
new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")",
null, null));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setIngestionConfig(ingestionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ public void setUp()
.setSortedColumn(D1).build();
IngestionConfig ingestionConfigEpochHours = new IngestionConfig();
ingestionConfigEpochHours.setTransformConfigs(
Collections.singletonList(new TransformConfig(T_TRX, "toEpochHours(t)")));
Collections.singletonList(new TransformConfig(T_TRX, "toEpochHours(t)", null, null)));
TableConfig tableConfigEpochHours =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX)
.setSortedColumn(D1).setIngestionConfig(ingestionConfigEpochHours).build();
IngestionConfig ingestionConfigSDF = new IngestionConfig();
ingestionConfigSDF.setTransformConfigs(
Collections.singletonList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')")));
Collections.singletonList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')", null, null)));
TableConfig tableConfigSDF =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX)
.setSortedColumn(D1).setIngestionConfig(ingestionConfigSDF).build();
Expand Down
Loading

0 comments on commit 2aca00f

Please sign in to comment.