From eeaf1f081107f37dca546b31e8f5378b38f28537 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Mon, 22 Jan 2024 15:47:13 -0800 Subject: [PATCH] [feature] allow dim table config to detect/disallow duplicate PK (#12290) * added dim table PK duplicate config key * use false as default and change it to named as: error-on-duplicate-pk; --------- Co-authored-by: Rong Rong --- .../offline/DimensionTableDataManager.java | 9 ++- .../DimensionTableDataManagerTest.java | 75 +++++++++++++++--- .../SegmentProcessorFrameworkTest.java | 2 +- .../test/resources/data/dimBaseballTeams.avro | Bin 2214 -> 0 bytes .../test/resources/data/dimBaseballTeams.avsc | 8 -- .../test/resources/data/dimBaseballTeams.csv | 1 + .../data/dimBaseballTeams_config.json | 18 +++++ .../config/table/DimensionTableConfig.java | 11 ++- 8 files changed, 102 insertions(+), 22 deletions(-) delete mode 100644 pinot-core/src/test/resources/data/dimBaseballTeams.avro delete mode 100644 pinot-core/src/test/resources/data/dimBaseballTeams.avsc create mode 100644 pinot-core/src/test/resources/data/dimBaseballTeams_config.json diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index b78874a8e39c..72b0111c64f6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -87,6 +87,7 @@ public static DimensionTableDataManager getInstanceByTableName(String tableNameW private final AtomicInteger _loadToken = new AtomicInteger(); private boolean _disablePreload = false; + private boolean _errorOnDuplicatePrimaryKey = false; @Override protected void doInit() { @@ -103,6 +104,7 @@ protected void doInit() { DimensionTableConfig dimensionTableConfig = tableConfig.getDimensionTableConfig(); if (dimensionTableConfig != null) { _disablePreload = dimensionTableConfig.isDisablePreload(); + _errorOnDuplicatePrimaryKey = dimensionTableConfig.isErrorOnDuplicatePrimaryKey(); } } @@ -211,7 +213,12 @@ private DimensionTable createFastLookupDimensionTable() { } GenericRow row = new GenericRow(); recordReader.getRecord(i, row); - lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row); + GenericRow previousRow = lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row); + if (_errorOnDuplicatePrimaryKey && previousRow != null) { + throw new IllegalStateException( + "Caught exception while reading records from segment: " + indexSegment.getSegmentName() + + "primary key already exist for: " + row.getPrimaryKey(primaryKeyColumns)); + } } } catch (Exception e) { throw new RuntimeException( diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index 6a8c1b83c5ea..8c865ba925f8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -19,6 +19,9 @@ package org.apache.pinot.core.data.manager.offline; import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.URL; import java.util.Collections; import java.util.List; @@ -47,10 +50,13 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -60,6 +66,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; @SuppressWarnings("unchecked") @@ -67,7 +74,9 @@ public class DimensionTableDataManagerTest { private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), LoaderTest.class.getName()); private static final String RAW_TABLE_NAME = "dimBaseballTeams"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); - private static final String AVRO_DATA_PATH = "data/dimBaseballTeams.avro"; + private static final String CSV_DATA_PATH = "data/dimBaseballTeams.csv"; + private static final String SCHEMA_PATH = "data/dimBaseballTeams_schema.json"; + private static final String TABLE_CONFIG_PATH = "data/dimBaseballTeams_config.json"; private File _indexDir; private IndexLoadingConfig _indexLoadingConfig; @@ -80,14 +89,22 @@ public void setUp() ServerMetrics.register(mock(ServerMetrics.class)); // prepare segment data - URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH); - assertNotNull(resourceUrl); - File avroFile = new File(resourceUrl.getFile()); + URL dataPathUrl = getClass().getClassLoader().getResource(CSV_DATA_PATH); + URL schemaPathUrl = getClass().getClassLoader().getResource(SCHEMA_PATH); + URL configPathUrl = getClass().getClassLoader().getResource(TABLE_CONFIG_PATH); + assertNotNull(dataPathUrl); + assertNotNull(schemaPathUrl); + assertNotNull(configPathUrl); + File csvFile = new File(dataPathUrl.getFile()); + Schema schema = createSchema(new File(schemaPathUrl.getFile())); + TableConfig tableConfig = createTableConfig(new File(configPathUrl.getFile())); // create segment File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME); + SegmentGeneratorConfig segmentGeneratorConfig = - SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, tableDataDir, RAW_TABLE_NAME); + SegmentTestUtils.getSegmentGeneratorConfig(csvFile, FileFormat.CSV, tableDataDir, RAW_TABLE_NAME, tableConfig, + schema); SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null); driver.init(segmentGeneratorConfig); driver.build(); @@ -111,8 +128,8 @@ private Schema getSchema() { .setPrimaryKeyColumns(Collections.singletonList("teamID")).build(); } - private TableConfig getTableConfig(boolean disablePreload) { - DimensionTableConfig dimensionTableConfig = new DimensionTableConfig(disablePreload); + private TableConfig getTableConfig(boolean disablePreload, boolean errorOnDuplicatePrimaryKey) { + DimensionTableConfig dimensionTableConfig = new DimensionTableConfig(disablePreload, errorOnDuplicatePrimaryKey); return new TableConfigBuilder(TableType.OFFLINE).setTableName("dimBaseballTeams") .setDimensionTableConfig(dimensionTableConfig).build(); } @@ -127,7 +144,7 @@ private Schema getSchemaWithExtraColumn() { private DimensionTableDataManager makeTableDataManager(HelixManager helixManager) { InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); - TableConfig tableConfig = getTableConfig(false); + TableConfig tableConfig = getTableConfig(false, false); DimensionTableDataManager tableDataManager = DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME); tableDataManager.init(instanceDataManagerConfig, tableConfig, helixManager, null, null); @@ -257,8 +274,8 @@ public void testLookupWithoutPreLoad() ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn( SchemaUtils.toZNRecord(getSchema())); - when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn( - TableConfigUtils.toZNRecord(getTableConfig(true))); + when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams_OFFLINE", null, AccessOption.PERSISTENT)).thenReturn( + TableConfigUtils.toZNRecord(getTableConfig(true, false))); when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager); @@ -294,4 +311,42 @@ public void testLookupWithoutPreLoad() resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"})); assertNull(resp, "Response should be null if no segment is loaded"); } + + @Test + public void testLookupErrorOnDuplicatePrimaryKey() + throws Exception { + HelixManager helixManager = mock(HelixManager.class); + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn( + SchemaUtils.toZNRecord(getSchema())); + when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams_OFFLINE", null, AccessOption.PERSISTENT)).thenReturn( + TableConfigUtils.toZNRecord(getTableConfig(false, true))); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager); + + // try fetching data BEFORE loading segment + GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"})); + assertNull(resp, "Response should be null if no segment is loaded"); + + try { + tableDataManager.addSegment(_indexDir, _indexLoadingConfig); + fail("Should error out when ErrorOnDuplicatePrimaryKey is configured to true"); + } catch (Exception e) { + // expected; + } + } + + protected static Schema createSchema(File schemaFile) + throws IOException { + InputStream inputStream = new FileInputStream(schemaFile); + Assert.assertNotNull(inputStream); + return JsonUtils.inputStreamToObject(inputStream, Schema.class); + } + + protected static TableConfig createTableConfig(File tableConfigFile) + throws IOException { + InputStream inputStream = new FileInputStream(tableConfigFile); + Assert.assertNotNull(inputStream); + return JsonUtils.inputStreamToObject(inputStream, TableConfig.class); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java index 43e5e9648a6f..8b3e150a1b09 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java @@ -212,7 +212,7 @@ public void testRecordReaderFileConfigInit() throws Exception { assertEquals(outputSegments.size(), 1); ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap); SegmentMetadata segmentMetadata = segment.getSegmentMetadata(); - assertEquals(segmentMetadata.getTotalDocs(), 51); + assertEquals(segmentMetadata.getTotalDocs(), 52); } @Test diff --git a/pinot-core/src/test/resources/data/dimBaseballTeams.avro b/pinot-core/src/test/resources/data/dimBaseballTeams.avro deleted file mode 100644 index b2bb5fc172624621820704dbdbedece9f5e66e3d..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2214 zcmah~%Wfk@6vZQ}g;=5yiztdpULYG~(zeHsBtl5-b`pEie&Fs=2#Ar&-DP*-a+Ot8 zPU1n4SRo++8#e6t2v%(P1jLRN8@>SkfLq;e+Y=(OYP;^E?&F+OgYg%+{8M}*#Zf3C z628rsjJ=oqS7A)%__rrt7W8UC3a6+bNhoAgI79`G=gG+!D>BB61q9EtbG5+Xe~jwL zI-DlgxN=wS8SpsvFhJ@XA;rN(I7V%X=YmIL9L~THc>9iKm}`t$65kMI zwH)tt3o}jULJ~BPR4}4YQAj$a9J6Pra#F7R_01%O}T`GL^}b#xI@KF#^)xZ^Woa=%!#BS z=8$GvNYl8qzRoV)y=&{#*u5rg7OFD6RZc3^q?CHqBx={zceMo!B)O-Vm@8#t9vwd{ z74A6RiGO*RIV_cCNyl(Q1!6DzP@@!L##S7)MKlGYJrZNemF+JVL;|+#mZTAxg4bNx z7ug4|l$|b$hRSjsui#=vCqibrUnSmDxhJL z#d5u~*k$B~0RAG>5n1uER9)iMz#F0QFQxqJw z&w~9n(Na*20%|5}oqNu~Ik?dXqaKzF#%y&4FUuVPive&tz(Kgg^oWjXb(Oz#zs+4j z5m+IY>J@Ty%!r1G8xYj%$&MS%1V_7WBeBA4XWWgGlI1zhN1jmWZU99o6M;t)_?eK? z__W{O^;sh#?>V17a-7tV;;c~tsx?eK)p8ja*XufaJ>WInwAt%=zvg)IvvuH_=^LUqfeXEE|DXr^B}#yP1h;0sJy4XM;93&Q(AnKW zsC@tW`&^&gqH7^%=tAfi8)U<>b%Lbh=gC~l)ufz_<2B|`c{=&5>yp*KzBt;rq&e($ zO0f0zHntwo3Qn%k=471^4m-uoVTVp*13v?ofpb|H;8}vk&E~CP$2lCvl;Mc5MQqrX zfdWij{f5c>VE{(^<0Zh=NL}SPYp-n7(X4C%5C+XGLHO|1qv2-LYmKc{A*_}n&uHtg z)9SntV;F*7fIOI>S~|*_aqI@CL#nkJFXc2wLn=XhK|>fL)0ps;%KL5PV~`egKQZnQJyrB$w)JWe&#TwOe8Bj0l??|^P)+2E-`Rkg#xA9v3#%ffv=B{ zCzezJ5XG(0ZpSNu#cgdZE|!sbV3J14Ydr?t@&hk^#LFHMIU%7oB9Y`wKU;ap2d%>Z z&lm8p;T0;j$qi*CG=fQ4!PzH!0l5Q~k1#xtu=-#yDg**VM+>x>sCDIce$QQD6@%y* zz3>T0nJ{b3XteVhHsBc}cMDJtD!4gsq_1SNlHhtLO&|0SUgE7{0Ez3Rhfn#xY#HTK diff --git a/pinot-core/src/test/resources/data/dimBaseballTeams.avsc b/pinot-core/src/test/resources/data/dimBaseballTeams.avsc deleted file mode 100644 index ee527e5af853..000000000000 --- a/pinot-core/src/test/resources/data/dimBaseballTeams.avsc +++ /dev/null @@ -1,8 +0,0 @@ -{"namespace": "baseballTeams.avro", - "type": "record", - "name": "baseballTeam", - "fields": [ - {"name": "teamID", "type": "string"}, - {"name": "teamName", "type": "string"} - ] -} diff --git a/pinot-core/src/test/resources/data/dimBaseballTeams.csv b/pinot-core/src/test/resources/data/dimBaseballTeams.csv index 4f5e6b927af7..bd7c50bfbd54 100644 --- a/pinot-core/src/test/resources/data/dimBaseballTeams.csv +++ b/pinot-core/src/test/resources/data/dimBaseballTeams.csv @@ -1,5 +1,6 @@ teamID,teamName "ANA","Anaheim Angels" +"ANA","Anaheim Angels" "ARI","Arizona Diamondbacks" "ATL","Atlanta Braves" "BAL","Baltimore Orioles (original- 1901–1902 current- since 1954)" diff --git a/pinot-core/src/test/resources/data/dimBaseballTeams_config.json b/pinot-core/src/test/resources/data/dimBaseballTeams_config.json new file mode 100644 index 000000000000..8e6e6a769934 --- /dev/null +++ b/pinot-core/src/test/resources/data/dimBaseballTeams_config.json @@ -0,0 +1,18 @@ +{ + "tableName": "dimBaseballTeams", + "tableType": "OFFLINE", + "isDimTable": true, + "segmentsConfig": { + "segmentPushType": "REFRESH", + "replication": "1" + }, + "tenants": { + }, + "tableIndexConfig": { + "loadMode": "MMAP" + }, + "metadata": { + "customConfigs": { + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DimensionTableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DimensionTableConfig.java index b1f7a2bda02e..625a4f925ed7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DimensionTableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DimensionTableConfig.java @@ -25,13 +25,20 @@ public class DimensionTableConfig extends BaseJsonConfig { private final boolean _disablePreload; + private final boolean _errorOnDuplicatePrimaryKey; @JsonCreator - public DimensionTableConfig(@JsonProperty(value = "disablePreload", required = true) boolean disablePreload) { - _disablePreload = disablePreload; + public DimensionTableConfig(@JsonProperty(value = "disablePreload") Boolean disablePreload, + @JsonProperty(value = "errorOnDuplicatePrimaryKey") Boolean errorOnDuplicatePrimaryKey) { + _disablePreload = disablePreload != null && disablePreload; + _errorOnDuplicatePrimaryKey = errorOnDuplicatePrimaryKey != null && errorOnDuplicatePrimaryKey; } public boolean isDisablePreload() { return _disablePreload; } + + public boolean isErrorOnDuplicatePrimaryKey() { + return _errorOnDuplicatePrimaryKey; + } }