Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge RecordEnricher into RecordTransformer #13704

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ public void testSerDe()
new StreamIngestionConfig(Collections.singletonList(Collections.singletonMap("streamType", "kafka"))));
ingestionConfig.setFilterConfig(new FilterConfig("filterFunc(foo)"));
ingestionConfig.setTransformConfigs(
Arrays.asList(new TransformConfig("bar", "func(moo)"), new TransformConfig("zoo", "myfunc()")));
Arrays.asList(new TransformConfig("bar", "func(moo)", null, null),
new TransformConfig("zoo", "myfunc()", null, null)));
ingestionConfig.setComplexTypeConfig(new ComplexTypeConfig(Arrays.asList("c1", "c2"), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, Collections.emptyMap()));
ingestionConfig.setAggregationConfigs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -53,7 +54,6 @@
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -78,7 +78,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
private String _outputDirURI;
private Schema _schema;
private Set<String> _fieldsToRead;
private RecordEnricherPipeline _recordEnricherPipeline;
private RecordTransformerPipeline _recordTransformerPipeline;
private RecordTransformer _recordTransformer;

private File _stagingDir;
Expand Down Expand Up @@ -139,7 +139,7 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat

_schema = schema;
_fieldsToRead = _schema.getColumnNames();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig);
_recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(_tableConfig);
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
_avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
_reusableRecord = new GenericData.Record(_avroSchema);
Expand Down Expand Up @@ -175,7 +175,7 @@ private void resetBuffer()
public void collect(GenericRow row)
throws IOException {
long startTime = System.currentTimeMillis();
_recordEnricherPipeline.run(row);
_recordTransformerPipeline.run(row);
GenericRow transform = _recordTransformer.transform(row);
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead);
_rowCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
Expand All @@ -80,7 +81,6 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
Expand Down Expand Up @@ -283,7 +283,7 @@ public void deleteSegmentFile() {
private final int _partitionGroupId;
private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus;
final String _clientId;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final RecordTransformerPipeline _recordTransformerPipeline;
private final TransformPipeline _transformPipeline;
private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _partitionMetadataProvider = null;
Expand Down Expand Up @@ -603,7 +603,7 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee
_numRowsErrored++;
} else {
try {
_recordEnricherPipeline.run(decodedRow.getResult());
_recordTransformerPipeline.run(decodedRow.getResult());
_transformPipeline.processRow(decodedRow.getResult(), reusedResult);
} catch (Exception e) {
_numRowsErrored++;
Expand Down Expand Up @@ -1588,7 +1588,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_streamDataDecoder = localStreamDataDecoder.get();

try {
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(tableConfig);
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.core.segment.processing.reducer.Reducer;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
Expand All @@ -43,7 +44,6 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -291,7 +291,7 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
RecordEnricherPipeline.getPassThroughPipeline(),
RecordTransformerPipeline.getPassThroughPipeline(),
TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerPipeline;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -70,7 +70,7 @@ public class SegmentMapper {
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
private final int _numSortFields;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final RecordTransformerPipeline _recordTransformerPipeline;
private final CompositeTransformer _recordTransformer;
private final ComplexTypeTransformer _complexTypeTransformer;
private final TimeHandler _timeHandler;
Expand All @@ -96,7 +96,7 @@ public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
_fieldSpecs = pair.getLeft();
_numSortFields = pair.getRight();
_includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_recordTransformerPipeline = RecordTransformerPipeline.fromTableConfig(tableConfig);
_recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema);
_complexTypeTransformer = ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
_timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
Expand Down Expand Up @@ -172,7 +172,7 @@ private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow
observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
reuse = recordReader.next(reuse);
_recordEnricherPipeline.run(reuse);
_recordTransformerPipeline.run(reuse);

if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
//noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public void testCompatibilityWithTableConfig() {
// schema doesn't have destination columns from transformConfigs
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("colA", "round(colB, 1000)")));
ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("colA",
"round(colB, 1000)", null, null)));
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig).build();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ public class JsonUnnestIngestionFromAvroQueriesTest extends BaseQueriesTest {
.build();
private static final TableConfig TABLE_CONFIG =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(
new IngestionConfig(null, null, null, null,
List.of(new TransformConfig("eventTimeColumn", "eventTimeColumn.seconds * 1000"),
new TransformConfig("eventTimeColumn_10m", "round(eventTimeColumn, 60000)")),
new IngestionConfig(null, null, null,
List.of(new TransformConfig("eventTimeColumn", "eventTimeColumn.seconds * 1000", null, null),
new TransformConfig("eventTimeColumn_10m", "round(eventTimeColumn, 60000)", null, null)),
new ComplexTypeConfig(List.of(JSON_COLUMN), null, null, null), null, null, null)
).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ protected void buildSegment()

TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME)
.setIngestionConfig(new IngestionConfig(null, null, null, null,
.setIngestionConfig(new IngestionConfig(null, null, null,
Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || "
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")),
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)", null, null)),
null, null, null, null))
.build();
Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected List<FieldConfig> getFieldConfigs() {
@Override
protected IngestionConfig getIngestionConfig() {
List<TransformConfig> transforms = new ArrayList<>();
transforms.add(new TransformConfig("timestampInEpoch", "now()"));
transforms.add(new TransformConfig("timestampInEpoch", "now()", null, null));

IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transforms);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ protected IngestionConfig getIngestionConfig() {
new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5 }, AirlineID, ArrDelayMinutes)");
ingestionConfig.setFilterConfig(filterConfig);
List<TransformConfig> transformConfigs = Arrays.asList(
new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)"),
new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)"),
new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)"));
new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)", null, null),
new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)", null, null),
new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)", null, null));
ingestionConfig.setTransformConfigs(transformConfigs);
return ingestionConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private void setupTableWithNonDefaultDatabase(List<File> avroFiles)
noDicCols.add(customCol);
tableConfig.getIndexingConfig().setNoDictionaryColumns(noDicCols);
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(List.of(new TransformConfig(customCol, defaultCol)));
ingestionConfig.setTransformConfigs(List.of(new TransformConfig(customCol, defaultCol, null, null)));
tableConfig.setIngestionConfig(ingestionConfig);
addTableConfig(tableConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,15 +1585,15 @@ private void reloadWithExtraColumns()

TableConfig tableConfig = getOfflineTableConfig();
List<TransformConfig> transformConfigs =
Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"),
new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"),
new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0"),
new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"),
new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs"),
new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs"),
new TransformConfig("NewAddedRawDerivedStringDimension", "reverse(DestCityName)"),
new TransformConfig("NewAddedRawDerivedMVIntDimension", "array(ActualElapsedTime)"),
new TransformConfig("NewAddedDerivedMVDoubleDimension", "array(ArrDelayMinutes)"));
Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24", null, null),
new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000", null, null),
new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0", null, null),
new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')", null, null),
new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs", null, null),
new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs", null, null),
new TransformConfig("NewAddedRawDerivedStringDimension", "reverse(DestCityName)", null, null),
new TransformConfig("NewAddedRawDerivedMVIntDimension", "array(ActualElapsedTime)", null, null),
new TransformConfig("NewAddedDerivedMVDoubleDimension", "array(ArrDelayMinutes)", null, null));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
tableConfig.setIngestionConfig(ingestionConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setUp()
TableConfig realtimeTableConfig = createRealtimeTableConfig(avroFiles.get(0));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(
Collections.singletonList(new TransformConfig("ts", "fromEpochDays(DaysSinceEpoch)")));
Collections.singletonList(new TransformConfig("ts", "fromEpochDays(DaysSinceEpoch)", null, null)));
realtimeTableConfig.setIngestionConfig(ingestionConfig);
FieldConfig tsFieldConfig =
new FieldConfig("ts", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.TIMESTAMP, null, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ public String getTableName() {
@Override
public TableConfig createOfflineTableConfig() {
List<TransformConfig> transformConfigs = Arrays.asList(
new TransformConfig(MY_MAP_STR_K1_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k1')"),
new TransformConfig(MY_MAP_STR_K2_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k2')"),
new TransformConfig(MY_MAP_STR_K1_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k1')",
null, null),
new TransformConfig(MY_MAP_STR_K2_FIELD_NAME, "jsonPathString(" + MY_MAP_STR_FIELD_NAME + ", '$.k2')",
null, null),
new TransformConfig(COMPLEX_MAP_STR_K3_FIELD_NAME,
"jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')"));
"jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')", null, null));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setIngestionConfig(ingestionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ public Schema createSchema() {
@Override
public TableConfig createOfflineTableConfig() {
List<TransformConfig> transformConfigs = Arrays.asList(
new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")"),
new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")"));
new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")",
null, null),
new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")",
null, null));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setIngestionConfig(ingestionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ public void setUp()
.setSortedColumn(D1).build();
IngestionConfig ingestionConfigEpochHours = new IngestionConfig();
ingestionConfigEpochHours.setTransformConfigs(
Collections.singletonList(new TransformConfig(T_TRX, "toEpochHours(t)")));
Collections.singletonList(new TransformConfig(T_TRX, "toEpochHours(t)", null, null)));
TableConfig tableConfigEpochHours =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX)
.setSortedColumn(D1).setIngestionConfig(ingestionConfigEpochHours).build();
IngestionConfig ingestionConfigSDF = new IngestionConfig();
ingestionConfigSDF.setTransformConfigs(
Collections.singletonList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')")));
Collections.singletonList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')", null, null)));
TableConfig tableConfigSDF =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX)
.setSortedColumn(D1).setIngestionConfig(ingestionConfigSDF).build();
Expand Down
Loading
Loading