Skip to content

Commit

Permalink
[feature] allow dim table config to detect/disallow duplicate PK (apa…
Browse files Browse the repository at this point in the history
…che#12290)

* added dim table PK duplicate config key
* use false as default and change it to named as: error-on-duplicate-pk;

---------

Co-authored-by: Rong Rong <rongr@startree.ai>
  • Loading branch information
walterddr and Rong Rong authored Jan 22, 2024
1 parent f1fec06 commit eeaf1f0
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public static DimensionTableDataManager getInstanceByTableName(String tableNameW
private final AtomicInteger _loadToken = new AtomicInteger();

private boolean _disablePreload = false;
private boolean _errorOnDuplicatePrimaryKey = false;

@Override
protected void doInit() {
Expand All @@ -103,6 +104,7 @@ protected void doInit() {
DimensionTableConfig dimensionTableConfig = tableConfig.getDimensionTableConfig();
if (dimensionTableConfig != null) {
_disablePreload = dimensionTableConfig.isDisablePreload();
_errorOnDuplicatePrimaryKey = dimensionTableConfig.isErrorOnDuplicatePrimaryKey();
}
}

Expand Down Expand Up @@ -211,7 +213,12 @@ private DimensionTable createFastLookupDimensionTable() {
}
GenericRow row = new GenericRow();
recordReader.getRecord(i, row);
lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
GenericRow previousRow = lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
if (_errorOnDuplicatePrimaryKey && previousRow != null) {
throw new IllegalStateException(
"Caught exception while reading records from segment: " + indexSegment.getSegmentName()
+ "primary key already exist for: " + row.getPrimaryKey(primaryKeyColumns));
}
}
} catch (Exception e) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pinot.core.data.manager.offline;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -47,10 +50,13 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -60,14 +66,17 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;


@SuppressWarnings("unchecked")
public class DimensionTableDataManagerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), LoaderTest.class.getName());
private static final String RAW_TABLE_NAME = "dimBaseballTeams";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final String AVRO_DATA_PATH = "data/dimBaseballTeams.avro";
private static final String CSV_DATA_PATH = "data/dimBaseballTeams.csv";
private static final String SCHEMA_PATH = "data/dimBaseballTeams_schema.json";
private static final String TABLE_CONFIG_PATH = "data/dimBaseballTeams_config.json";

private File _indexDir;
private IndexLoadingConfig _indexLoadingConfig;
Expand All @@ -80,14 +89,22 @@ public void setUp()
ServerMetrics.register(mock(ServerMetrics.class));

// prepare segment data
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
assertNotNull(resourceUrl);
File avroFile = new File(resourceUrl.getFile());
URL dataPathUrl = getClass().getClassLoader().getResource(CSV_DATA_PATH);
URL schemaPathUrl = getClass().getClassLoader().getResource(SCHEMA_PATH);
URL configPathUrl = getClass().getClassLoader().getResource(TABLE_CONFIG_PATH);
assertNotNull(dataPathUrl);
assertNotNull(schemaPathUrl);
assertNotNull(configPathUrl);
File csvFile = new File(dataPathUrl.getFile());
Schema schema = createSchema(new File(schemaPathUrl.getFile()));
TableConfig tableConfig = createTableConfig(new File(configPathUrl.getFile()));

// create segment
File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);

SegmentGeneratorConfig segmentGeneratorConfig =
SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, tableDataDir, RAW_TABLE_NAME);
SegmentTestUtils.getSegmentGeneratorConfig(csvFile, FileFormat.CSV, tableDataDir, RAW_TABLE_NAME, tableConfig,
schema);
SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
driver.init(segmentGeneratorConfig);
driver.build();
Expand All @@ -111,8 +128,8 @@ private Schema getSchema() {
.setPrimaryKeyColumns(Collections.singletonList("teamID")).build();
}

private TableConfig getTableConfig(boolean disablePreload) {
DimensionTableConfig dimensionTableConfig = new DimensionTableConfig(disablePreload);
private TableConfig getTableConfig(boolean disablePreload, boolean errorOnDuplicatePrimaryKey) {
DimensionTableConfig dimensionTableConfig = new DimensionTableConfig(disablePreload, errorOnDuplicatePrimaryKey);
return new TableConfigBuilder(TableType.OFFLINE).setTableName("dimBaseballTeams")
.setDimensionTableConfig(dimensionTableConfig).build();
}
Expand All @@ -127,7 +144,7 @@ private Schema getSchemaWithExtraColumn() {
private DimensionTableDataManager makeTableDataManager(HelixManager helixManager) {
InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableConfig tableConfig = getTableConfig(false);
TableConfig tableConfig = getTableConfig(false, false);
DimensionTableDataManager tableDataManager =
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
tableDataManager.init(instanceDataManagerConfig, tableConfig, helixManager, null, null);
Expand Down Expand Up @@ -257,8 +274,8 @@ public void testLookupWithoutPreLoad()
ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn(
SchemaUtils.toZNRecord(getSchema()));
when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn(
TableConfigUtils.toZNRecord(getTableConfig(true)));
when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams_OFFLINE", null, AccessOption.PERSISTENT)).thenReturn(
TableConfigUtils.toZNRecord(getTableConfig(true, false)));
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager);

Expand Down Expand Up @@ -294,4 +311,42 @@ public void testLookupWithoutPreLoad()
resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");
}

@Test
public void testLookupErrorOnDuplicatePrimaryKey()
throws Exception {
HelixManager helixManager = mock(HelixManager.class);
ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn(
SchemaUtils.toZNRecord(getSchema()));
when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams_OFFLINE", null, AccessOption.PERSISTENT)).thenReturn(
TableConfigUtils.toZNRecord(getTableConfig(false, true)));
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager);

// try fetching data BEFORE loading segment
GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
assertNull(resp, "Response should be null if no segment is loaded");

try {
tableDataManager.addSegment(_indexDir, _indexLoadingConfig);
fail("Should error out when ErrorOnDuplicatePrimaryKey is configured to true");
} catch (Exception e) {
// expected;
}
}

protected static Schema createSchema(File schemaFile)
throws IOException {
InputStream inputStream = new FileInputStream(schemaFile);
Assert.assertNotNull(inputStream);
return JsonUtils.inputStreamToObject(inputStream, Schema.class);
}

protected static TableConfig createTableConfig(File tableConfigFile)
throws IOException {
InputStream inputStream = new FileInputStream(tableConfigFile);
Assert.assertNotNull(inputStream);
return JsonUtils.inputStreamToObject(inputStream, TableConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testRecordReaderFileConfigInit() throws Exception {
assertEquals(outputSegments.size(), 1);
ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 51);
assertEquals(segmentMetadata.getTotalDocs(), 52);
}

@Test
Expand Down
Binary file not shown.
8 changes: 0 additions & 8 deletions pinot-core/src/test/resources/data/dimBaseballTeams.avsc

This file was deleted.

1 change: 1 addition & 0 deletions pinot-core/src/test/resources/data/dimBaseballTeams.csv
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
teamID,teamName
"ANA","Anaheim Angels"
"ANA","Anaheim Angels"
"ARI","Arizona Diamondbacks"
"ATL","Atlanta Braves"
"BAL","Baltimore Orioles (original- 1901–1902 current- since 1954)"
Expand Down
18 changes: 18 additions & 0 deletions pinot-core/src/test/resources/data/dimBaseballTeams_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"tableName": "dimBaseballTeams",
"tableType": "OFFLINE",
"isDimTable": true,
"segmentsConfig": {
"segmentPushType": "REFRESH",
"replication": "1"
},
"tenants": {
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"metadata": {
"customConfigs": {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@

public class DimensionTableConfig extends BaseJsonConfig {
private final boolean _disablePreload;
private final boolean _errorOnDuplicatePrimaryKey;

@JsonCreator
public DimensionTableConfig(@JsonProperty(value = "disablePreload", required = true) boolean disablePreload) {
_disablePreload = disablePreload;
public DimensionTableConfig(@JsonProperty(value = "disablePreload") Boolean disablePreload,
@JsonProperty(value = "errorOnDuplicatePrimaryKey") Boolean errorOnDuplicatePrimaryKey) {
_disablePreload = disablePreload != null && disablePreload;
_errorOnDuplicatePrimaryKey = errorOnDuplicatePrimaryKey != null && errorOnDuplicatePrimaryKey;
}

public boolean isDisablePreload() {
return _disablePreload;
}

public boolean isErrorOnDuplicatePrimaryKey() {
return _errorOnDuplicatePrimaryKey;
}
}

0 comments on commit eeaf1f0

Please sign in to comment.