From d2cde887b561c276bc1ee9ae0b6ec80da22552e2 Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Wed, 3 Apr 2024 11:46:34 +0800 Subject: [PATCH] [#2697] improvement(core): Support different entity with the same name under a namespace (#2700) ### What changes were proposed in this pull request? Support store entities with the same name under a namespace. ### Why are the changes needed? It's a necessary feature for Gravitino. Fix: #2697 ### Does this PR introduce _any_ user-facing change? N/A. ### How was this patch tested? UTs --- .../java/com/datastrato/gravitino/Entity.java | 31 +++ .../storage/kv/BinaryEntityEncoderUtil.java | 185 ++++++++++++++++++ .../storage/kv/BinaryEntityKeyEncoder.java | 59 +++--- .../gravitino/storage/kv/KvEntityStore.java | 137 +------------ .../gravitino/storage/TestEntityStorage.java | 96 +++++++++ .../storage/kv/TestEntityKeyEncoding.java | 59 ++++++ .../storage/kv/TestKvGarbageCollector.java | 29 ++- 7 files changed, 437 insertions(+), 159 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityEncoderUtil.java diff --git a/core/src/main/java/com/datastrato/gravitino/Entity.java b/core/src/main/java/com/datastrato/gravitino/Entity.java index 754c3641229..64e9632a0bb 100644 --- a/core/src/main/java/com/datastrato/gravitino/Entity.java +++ b/core/src/main/java/com/datastrato/gravitino/Entity.java @@ -4,7 +4,9 @@ */ package com.datastrato.gravitino; +import com.google.common.collect.ImmutableList; import java.io.Serializable; +import java.util.List; import java.util.Map; import lombok.Getter; @@ -43,6 +45,35 @@ public static EntityType fromShortName(String shortName) { } throw new IllegalArgumentException("Unknown entity type: " + shortName); } + + /** + * Returns the parent entity types of the given entity type. The parent entity types are the + * entity types that are higher in the hierarchy than the given entity type. For example, the + * parent entity types of a table are metalake, catalog, and schema. (Sequence: root to leaf) + * + * @param entityType The entity type for which to get the parent entity types. + * @return The parent entity types of the given entity type. + */ + public static List getParentEntityTypes(EntityType entityType) { + switch (entityType) { + case METALAKE: + return ImmutableList.of(); + case CATALOG: + return ImmutableList.of(METALAKE); + case SCHEMA: + return ImmutableList.of(METALAKE, CATALOG); + case TABLE: + case FILESET: + case TOPIC: + case USER: + case GROUP: + return ImmutableList.of(METALAKE, CATALOG, SCHEMA); + case COLUMN: + return ImmutableList.of(METALAKE, CATALOG, SCHEMA, TABLE); + default: + throw new IllegalArgumentException("Unknown entity type: " + entityType); + } + } } /** diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityEncoderUtil.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityEncoderUtil.java new file mode 100644 index 00000000000..ce1488c46be --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityEncoderUtil.java @@ -0,0 +1,185 @@ +/* + * Copyright 2023 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.kv; + +import static com.datastrato.gravitino.Entity.EntityType.CATALOG; +import static com.datastrato.gravitino.Entity.EntityType.FILESET; +import static com.datastrato.gravitino.Entity.EntityType.METALAKE; +import static com.datastrato.gravitino.Entity.EntityType.SCHEMA; +import static com.datastrato.gravitino.Entity.EntityType.TABLE; +import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.LOG; +import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.NAMESPACE_SEPARATOR; +import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.TYPE_AND_NAME_SEPARATOR; + +import com.datastrato.gravitino.Entity.EntityType; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.storage.NameMappingService; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; + +public class BinaryEntityEncoderUtil { + + // The entity types in version 0.4.x, entities in this set do not have the prefix in the name-id + // mapping. Why do we introduce it? We need to make it backward compatible. + public static final Set VERSION_0_4_COMPATIBLE_ENTITY_TYPES = + ImmutableSet.of(METALAKE, CATALOG, SCHEMA, TABLE); + + private BinaryEntityEncoderUtil() {} + + /** + * Generate the key for name to id mapping. Currently, the mapping is as following. + * + *
+   *   Assume we have the following entities:
+   *   metalake: a1        ---- 1
+   *   catalog : a1.b1     ---- 2
+   *   schema  : a1.b1.c   ---- 3
+   *
+   *   metalake: a2        ---- 4
+   *   catalog : a2.b2     ---- 5
+   *   schema  : a2.b2.c   ---- 6
+   *   schema  : a2.b2.c1  ---- 7
+   *
+   *   metalake: a1        ---- 1 means the name of metalake is a1 and the corresponding id is 1
+   * 
+ * + * Then we will store the name to id mapping as follows + * + *
+   *  a1         -- 1
+   * 	1/b1       -- 2
+   * 	1/2/c      -- 3
+   * 	a2         -- 4
+   * 	4/b2       -- 5
+   * 	4/5/c      -- 6
+   * 	4/5/c1     -- 7
+   * 
+ * + * @param nameIdentifier name of a specific entity + * @return key that maps to the id of a specific entity. See above, The key maybe like '4/5/c1' + * @throws IOException if error occurs + */ + public static String generateKeyForMapping( + NameIdentifier nameIdentifier, EntityType entityType, NameMappingService nameMappingService) + throws IOException { + Namespace namespace = nameIdentifier.namespace(); + String name = nameIdentifier.name(); + + List parentTypes = EntityType.getParentEntityTypes(entityType); + long[] ids = new long[namespace.length()]; + for (int i = 0; i < ids.length; i++) { + ids[i] = + nameMappingService.getIdByName( + concatIdAndName( + ArrayUtils.subarray(ids, 0, i), namespace.level(i), parentTypes.get(i))); + } + + return concatIdAndName(ids, name, entityType); + } + + /** + * Concatenate the namespace ids and the name of the entity. Assuming the namespace ids are [1, 2] + * and the name is 'schema', the result will be '1/2/sc_schema'. + * + *

Attention, in order to make this change backward compatible, if the entity type is TABLE, we + * will not add a prefix to the name. If the entity type is not TABLE, we will add the prefix to + * the name. + * + * @param namespaceIds namespace ids, which are the ids of the parent entities + * @param name name of the entity + * @param type type of the entity + * @return concatenated string that used in the id-name mapping. + */ + public static String concatIdAndName(long[] namespaceIds, String name, EntityType type) { + String context = + Joiner.on(NAMESPACE_SEPARATOR) + .join( + Arrays.stream(namespaceIds).mapToObj(String::valueOf).collect(Collectors.toList())); + // We need to make it backward compatible, so we need to check if the name is already prefixed. + String mappingName = + VERSION_0_4_COMPATIBLE_ENTITY_TYPES.contains(type) + ? name + : type.getShortName() + TYPE_AND_NAME_SEPARATOR + name; + return StringUtils.isBlank(context) ? mappingName : context + NAMESPACE_SEPARATOR + mappingName; + } + + /** + * Get key prefix of all sub-entities under a specific entities. For example, as a metalake will + * start with `ml/{metalake_id}`, sub-entities under this metalake will have the prefix + * + *

+   *   catalog: ca/{metalake_id}
+   *   schema:  sc/{metalake_id}
+   *   table:   ta/{metalake_id}
+   * 
+ * + * Why the sub-entities under this metalake start with those prefixes, please see {@link + * KvEntityStore} java class doc. + * + * @param ident identifier of an entity + * @param type type of entity + * @return list of sub-entities prefix + * @throws IOException if error occurs + */ + public static List getSubEntitiesPrefix( + NameIdentifier ident, EntityType type, BinaryEntityKeyEncoder entityKeyEncoder) + throws IOException { + List prefixes = Lists.newArrayList(); + byte[] encode = entityKeyEncoder.encode(ident, type, true); + switch (type) { + case METALAKE: + prefixes.add(replacePrefixTypeInfo(encode, CATALOG.getShortName())); + prefixes.add(replacePrefixTypeInfo(encode, SCHEMA.getShortName())); + prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName())); + prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName())); + break; + case CATALOG: + prefixes.add(replacePrefixTypeInfo(encode, SCHEMA.getShortName())); + prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName())); + prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName())); + break; + case SCHEMA: + prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName())); + prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName())); + break; + case TABLE: + case FILESET: + break; + default: + LOG.warn("Currently unknown type: {}, please check it", type); + } + Collections.reverse(prefixes); + return prefixes; + } + + /** + * Replace the prefix type info with the new type info. + * + * @param encode the encoded byte array + * @param subTypePrefix the new type prefix + * @return the new byte array + */ + public static byte[] replacePrefixTypeInfo(byte[] encode, String subTypePrefix) { + byte[] result = new byte[encode.length]; + System.arraycopy(encode, 0, result, 0, encode.length); + byte[] bytes = subTypePrefix.getBytes(StandardCharsets.UTF_8); + result[0] = bytes[0]; + result[1] = bytes[1]; + + return result; + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityKeyEncoder.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityKeyEncoder.java index dfac508507b..773b3769486 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityKeyEncoder.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/BinaryEntityKeyEncoder.java @@ -20,16 +20,13 @@ import com.datastrato.gravitino.utils.ByteUtils; import com.datastrato.gravitino.utils.Bytes; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Arrays; +import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,19 +37,19 @@ * *
  *     Key                                Value
- * ml_{ml_id}               -----    metalake info
- * ml_{ml_id}               -----    metalake info
- * ca_{ml_id}_{ca_id}       -----    catalog_info
- * ca_{ml_id}_{ca_id}       -----    catalog_info
- * sc_{ml_id}_{ca_id}_{sc_id} ---    schema_info
- * sc_{ml_id}_{ca_id}_{sc_id} ---    schema_info
- * br_{ml_id}_{ca_id}_{br_id} ---    broker_info
- * br_{ml_id}_{ca_id}_{br_id} ---    broker_info
+ * ml/{ml_id}               -----    metalake info
+ * ml/{ml_id}               -----    metalake info
+ * ca/{ml_id}/{ca_id}       -----    catalog_info
+ * ca/{ml_id}/{ca_id}       -----    catalog_info
+ * sc/{ml_id}/{ca_id}/{sc_id} ---    schema_info
+ * sc/{ml_id}/{ca_id}/{sc_id} ---    schema_info
+ * br/{ml_id}/{ca_id}/{br_id} ---    broker_info
+ * br/{ml_id}/{ca_id}/{br_id} ---    broker_info
  *
- * ta_{ml_id}_{ca_id}_{sc_id}_{table_id}    -----    table_info
- * ta_{ml_id}_{ca_id}_{sc_id}_{table_id}    -----    table_info
- * to_{ml_id}_{ca_id}_{br_id}_{to_id}       -----    topic_info
- * to_{ml_id}_{ca_id}_{br_id}_{to_id}       -----    topic_info
+ * ta/{ml_id}/{ca_id}/{sc_id}/{table_id}    -----    table_info
+ * ta/{ml_id}/{ca_id}/{sc_id}/{table_id}    -----    table_info
+ * to/{ml_id}/{ca_id}/{br_id}/{to_id}       -----    topic_info
+ * to/{ml_id}/{ca_id}/{br_id}/{to_id}       -----    topic_info
  * 
*/ public class BinaryEntityKeyEncoder implements EntityKeyEncoder { @@ -60,6 +57,8 @@ public class BinaryEntityKeyEncoder implements EntityKeyEncoder { public static final String NAMESPACE_SEPARATOR = "/"; + public static final String TYPE_AND_NAME_SEPARATOR = "_"; + @VisibleForTesting static final byte[] BYTABLE_NAMESPACE_SEPARATOR = NAMESPACE_SEPARATOR.getBytes(StandardCharsets.UTF_8); @@ -92,14 +91,6 @@ public BinaryEntityKeyEncoder(NameMappingService nameMappingService) { this.nameMappingService = nameMappingService; } - private String generateMappingKey(long[] namespaceIds, String name) { - String context = - Joiner.on(NAMESPACE_SEPARATOR) - .join( - Arrays.stream(namespaceIds).mapToObj(String::valueOf).collect(Collectors.toList())); - return StringUtils.isBlank(context) ? name : context + NAMESPACE_SEPARATOR + name; - } - /** * Encode entity key for KV backend, e.g., RocksDB. The key is used to store the entity in the * backend. @@ -114,8 +105,11 @@ private byte[] encodeEntity( NameIdentifier identifier, EntityType entityType, boolean nullIfMissing) throws IOException { String[] nameSpace = identifier.namespace().levels(); long[] namespaceIds = new long[nameSpace.length]; + List parentEntityTypes = EntityType.getParentEntityTypes(entityType); for (int i = 0; i < nameSpace.length; i++) { - String nameKey = generateMappingKey(ArrayUtils.subarray(namespaceIds, 0, i), nameSpace[i]); + String nameKey = + BinaryEntityEncoderUtil.concatIdAndName( + ArrayUtils.subarray(namespaceIds, 0, i), nameSpace[i], parentEntityTypes.get(i)); if (nullIfMissing && null == nameMappingService.getIdByName(nameKey)) { return null; } @@ -135,7 +129,8 @@ private byte[] encodeEntity( // This is for point query and need to use specific name long[] namespaceAndNameIds = new long[namespaceIds.length + 1]; System.arraycopy(namespaceIds, 0, namespaceAndNameIds, 0, namespaceIds.length); - String nameKey = generateMappingKey(namespaceIds, identifier.name()); + String nameKey = + BinaryEntityEncoderUtil.concatIdAndName(namespaceIds, identifier.name(), entityType); if (nullIfMissing && null == nameMappingService.getIdByName(nameKey)) { return null; } @@ -240,10 +235,18 @@ public Pair decode(byte[] key) throws IOException { // Please review the id-name mapping content in KvNameMappingService.java and // method generateMappingKey in this class. String[] names = new String[ids.length]; + List parents = EntityType.getParentEntityTypes(entityType); for (int i = 0; i < ids.length; i++) { - // The format of name is like '{metalake_id}/{catalog_id}/schema_name' + // The format of name is like '{metalake_id}/{catalog_id}/sc_schema_name' String name = nameMappingService.getNameById(ids[i]); - names[i] = name.split(NAMESPACE_SEPARATOR, i + 1)[i]; + // extract the real name from the name mapping service + // The name for table is 'table' NOT 'ta_table' to make it backward compatible. + EntityType currentEntityType = i < parents.size() ? parents.get(i) : entityType; + if (BinaryEntityEncoderUtil.VERSION_0_4_COMPATIBLE_ENTITY_TYPES.contains(currentEntityType)) { + names[i] = name.split(NAMESPACE_SEPARATOR, i + 1)[i]; + } else { + names[i] = name.split(NAMESPACE_SEPARATOR, i + 1)[i].substring(3); + } } NameIdentifier nameIdentifier = NameIdentifier.of(names); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java index c241211bde0..6ede9954145 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java @@ -6,15 +6,12 @@ package com.datastrato.gravitino.storage.kv; import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE; -import static com.datastrato.gravitino.Entity.EntityType.CATALOG; -import static com.datastrato.gravitino.Entity.EntityType.FILESET; import static com.datastrato.gravitino.Entity.EntityType.GROUP; import static com.datastrato.gravitino.Entity.EntityType.METALAKE; -import static com.datastrato.gravitino.Entity.EntityType.SCHEMA; -import static com.datastrato.gravitino.Entity.EntityType.TABLE; import static com.datastrato.gravitino.Entity.EntityType.USER; -import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.LOG; -import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.NAMESPACE_SEPARATOR; +import static com.datastrato.gravitino.storage.kv.BinaryEntityEncoderUtil.generateKeyForMapping; +import static com.datastrato.gravitino.storage.kv.BinaryEntityEncoderUtil.getSubEntitiesPrefix; +import static com.datastrato.gravitino.storage.kv.BinaryEntityEncoderUtil.replacePrefixTypeInfo; import com.datastrato.gravitino.Config; import com.datastrato.gravitino.Entity; @@ -37,22 +34,16 @@ import com.datastrato.gravitino.utils.Bytes; import com.datastrato.gravitino.utils.Executable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; -import java.util.stream.Collectors; import lombok.Getter; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,7 +193,8 @@ public E update( // Update the name mapping nameMappingService.updateName( - generateKeyForMapping(ident), generateKeyForMapping(updatedE.nameIdentifier())); + generateKeyForMapping(ident, entityType, nameMappingService), + generateKeyForMapping(updatedE.nameIdentifier(), entityType, nameMappingService)); // Update the entity to store transactionalKvBackend.put(key, serDe.serialize(updatedE), true); @@ -210,64 +202,6 @@ public E update( }); } - private String concatIdAndName(long[] namespaceIds, String name) { - String context = - Joiner.on(NAMESPACE_SEPARATOR) - .join( - Arrays.stream(namespaceIds).mapToObj(String::valueOf).collect(Collectors.toList())); - return StringUtils.isBlank(context) ? name : context + NAMESPACE_SEPARATOR + name; - } - - /** - * Generate the key for name to id mapping. Currently, the mapping is as following. - * - *
-   *   Assume we have the following entities:
-   *   metalake: a1        ---- 1
-   *   catalog : a1.b1     ---- 2
-   *   schema  : a1.b1.c   ---- 3
-   *
-   *   metalake: a2        ---- 4
-   *   catalog : a2.b2     ---- 5
-   *   schema  : a2.b2.c   ---- 6
-   *   schema  : a2.b2.c1  ---- 7
-   *
-   *   metalake: a1        ---- 1 means the name of metalake is a1 and the corresponding id is 1
-   * 
- * - * Then we will store the name to id mapping as follows - * - *
-   *  a1         -- 1
-   * 	1/b1       -- 2
-   * 	1/2/c      -- 3
-   * 	a2         -- 4
-   * 	4/b2       -- 5
-   * 	4/5/c      -- 6
-   * 	4/5/c1     -- 7
-   * 
- * - * @param nameIdentifier name of a specific entity - * @return key that maps to the id of a specific entity. See above, The key maybe like '4/5/c1' - * @throws IOException if error occurs - */ - public String generateKeyForMapping(NameIdentifier nameIdentifier) throws IOException { - if (nameIdentifier.namespace().isEmpty()) { - return nameIdentifier.name(); - } - Namespace namespace = nameIdentifier.namespace(); - String name = nameIdentifier.name(); - - long[] ids = new long[namespace.length()]; - for (int i = 0; i < ids.length; i++) { - ids[i] = - nameMappingService.getIdByName( - concatIdAndName(ArrayUtils.subarray(ids, 0, i), namespace.level(i))); - } - - return concatIdAndName(ids, name); - } - @Override public E get( NameIdentifier ident, EntityType entityType, Class e) @@ -287,54 +221,6 @@ public E get( return serDe.deserialize(value, e); } - /** - * Get key prefix of all sub-entities under a specific entities. For example, as a metalake will - * start with `ml_{metalake_id}`, sub-entities under this metalake will have the prefix - * - *
-   *   catalog: ca_{metalake_id}
-   *   schema:  sc_{metalake_id}
-   *   table:   ta_{metalake_id}
-   * 
- * - * Why the sub-entities under this metalake start with those prefixes, please see {@link - * KvEntityStore} java class doc. - * - * @param ident identifier of an entity - * @param type type of entity - * @return list of sub-entities prefix - * @throws IOException if error occurs - */ - private List getSubEntitiesPrefix(NameIdentifier ident, EntityType type) - throws IOException { - List prefixes = Lists.newArrayList(); - byte[] encode = entityKeyEncoder.encode(ident, type, true); - switch (type) { - case METALAKE: - prefixes.add(replacePrefixTypeInfo(encode, CATALOG.getShortName())); - prefixes.add(replacePrefixTypeInfo(encode, SCHEMA.getShortName())); - prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName())); - prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName())); - break; - case CATALOG: - prefixes.add(replacePrefixTypeInfo(encode, SCHEMA.getShortName())); - prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName())); - prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName())); - break; - case SCHEMA: - prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName())); - prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName())); - break; - case TABLE: - case FILESET: - break; - default: - LOG.warn("Currently unknown type: {}, please check it", type); - } - Collections.reverse(prefixes); - return prefixes; - } - void deleteAuthorizationEntitiesIfNecessary(NameIdentifier ident, EntityType type) throws IOException { if (type != METALAKE) { @@ -354,16 +240,6 @@ void deleteAuthorizationEntitiesIfNecessary(NameIdentifier ident, EntityType typ } } - private byte[] replacePrefixTypeInfo(byte[] encode, String subTypePrefix) { - byte[] result = new byte[encode.length]; - System.arraycopy(encode, 0, result, 0, encode.length); - byte[] bytes = subTypePrefix.getBytes(StandardCharsets.UTF_8); - result[0] = bytes[0]; - result[1] = bytes[1]; - - return result; - } - @Override public boolean delete(NameIdentifier ident, EntityType entityType, boolean cascade) throws IOException { @@ -374,7 +250,8 @@ public boolean delete(NameIdentifier ident, EntityType entityType, boolean casca } byte[] dataKey = entityKeyEncoder.encode(ident, entityType, true); - List subEntityPrefix = getSubEntitiesPrefix(ident, entityType); + List subEntityPrefix = + getSubEntitiesPrefix(ident, entityType, (BinaryEntityKeyEncoder) entityKeyEncoder); if (subEntityPrefix.isEmpty()) { // has no sub-entities return transactionalKvBackend.delete(dataKey); diff --git a/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java b/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java index e311fc4a0e6..48064870943 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java @@ -568,6 +568,102 @@ void testEntityDelete(String type) throws IOException { } } + @ParameterizedTest + @MethodSource("storageProvider") + void testSameNameUnderANameSpace(String type) throws IOException { + Config config = Mockito.mock(Config.class); + init(type, config); + try (EntityStore store = EntityStoreFactory.createEntityStore(config)) { + store.initialize(config); + if (store instanceof RelationalEntityStore) { + prepareJdbcTable(); + } + + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + + BaseMetalake metalake1 = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake1", auditInfo); + CatalogEntity catalog1 = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.of("metalake1"), + "catalog1", + auditInfo); + SchemaEntity schema1 = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.of("metalake1", "catalog1"), + "schema1", + auditInfo); + + Namespace namespace = Namespace.of("metalake1", "catalog1", "schema1"); + TableEntity table1 = + createTableEntity(RandomIdGenerator.INSTANCE.nextId(), namespace, "sameName", auditInfo); + + FilesetEntity filesetEntity1 = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), namespace, "sameName", auditInfo); + + store.put(metalake1); + store.put(catalog1); + store.put(schema1); + store.put(table1); + store.put(filesetEntity1); + + NameIdentifier identifier = NameIdentifier.of("metalake1", "catalog1", "schema1", "sameName"); + + TableEntity loadedTableEntity = + store.get(identifier, Entity.EntityType.TABLE, TableEntity.class); + Assertions.assertEquals(table1.id(), loadedTableEntity.id()); + FilesetEntity loadedFilesetEntity = + store.get(identifier, Entity.EntityType.FILESET, FilesetEntity.class); + Assertions.assertEquals(filesetEntity1.id(), loadedFilesetEntity.id()); + + // Remove anyone will not affect another + store.delete(identifier, Entity.EntityType.TABLE); + store.get(identifier, Entity.EntityType.FILESET, FilesetEntity.class); + + // JDBC use id as the primary key, so we need to change the id of table1 if we want to store + // it again + table1 = + createTableEntity(RandomIdGenerator.INSTANCE.nextId(), namespace, "sameName", auditInfo); + store.put(table1); + store.delete(identifier, Entity.EntityType.FILESET); + store.get(identifier, Entity.EntityType.TABLE, TableEntity.class); + + // Rename anyone will not affect another + filesetEntity1 = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), namespace, "sameName", auditInfo); + store.put(filesetEntity1); + long table1Id = table1.id(); + store.update( + identifier, + TableEntity.class, + Entity.EntityType.TABLE, + e -> createTableEntity(table1Id, namespace, "sameNameChanged", e.auditInfo())); + + NameIdentifier changedNameIdentifier = + NameIdentifier.of("metalake1", "catalog1", "schema1", "sameNameChanged"); + store.get(changedNameIdentifier, Entity.EntityType.TABLE, TableEntity.class); + store.get(identifier, Entity.EntityType.FILESET, FilesetEntity.class); + + table1 = + createTableEntity(RandomIdGenerator.INSTANCE.nextId(), namespace, "sameName", auditInfo); + store.put(table1); + long filesetId = filesetEntity1.id(); + store.update( + identifier, + FilesetEntity.class, + Entity.EntityType.FILESET, + e -> createFilesetEntity(filesetId, namespace, "sameNameChanged", e.auditInfo())); + + store.get(identifier, Entity.EntityType.TABLE, TableEntity.class); + store.get(changedNameIdentifier, Entity.EntityType.FILESET, FilesetEntity.class); + } + } + @ParameterizedTest @MethodSource("storageProvider") void testDeleteAndRename(String type) throws IOException { diff --git a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestEntityKeyEncoding.java b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestEntityKeyEncoding.java index b23b8b88cbf..60c066284f0 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestEntityKeyEncoding.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestEntityKeyEncoding.java @@ -76,6 +76,65 @@ private IdGenerator getIdGeneratorAndSpy(BinaryEntityKeyEncoder entityKeyEncoder return spyIdGenerator; } + @Test + void testFilesetAndTableWithSameName() + throws IOException, NoSuchFieldException, IllegalAccessException { + Config config = getConfig(); + try (KvEntityStore kvEntityStore = getKvEntityStore(config)) { + BinaryEntityKeyEncoder encoder = (BinaryEntityKeyEncoder) kvEntityStore.entityKeyEncoder; + IdGenerator mockIdGenerator = getIdGeneratorAndSpy(encoder); + Mockito.doReturn(0L).when(mockIdGenerator).nextId(); + NameIdentifier mateLakeIdentifier1 = NameIdentifier.of("metalake"); + encoder.encode(mateLakeIdentifier1, EntityType.METALAKE); + + Mockito.doReturn(1L).when(mockIdGenerator).nextId(); + NameIdentifier catalogIdentifier = NameIdentifier.of("metalake", "catalogs"); + encoder.encode(catalogIdentifier, EntityType.CATALOG); + + Mockito.doReturn(2L).when(mockIdGenerator).nextId(); + NameIdentifier schemaIdentifier = NameIdentifier.of("metalake", "catalogs", "schema"); + encoder.encode(schemaIdentifier, EntityType.SCHEMA); + + Mockito.doReturn(3L).when(mockIdGenerator).nextId(); + NameIdentifier tableIdentifier = + NameIdentifier.of("metalake", "catalogs", "schema", "theSame"); + byte[] tableKey = encoder.encode(tableIdentifier, EntityType.TABLE); + byte[] expectKey = + Bytes.concat( + EntityType.TABLE.getShortName().getBytes(StandardCharsets.UTF_8), + BYTABLE_NAMESPACE_SEPARATOR, + ByteUtils.longToByte(0L), + BYTABLE_NAMESPACE_SEPARATOR, + ByteUtils.longToByte(1L), + BYTABLE_NAMESPACE_SEPARATOR, + ByteUtils.longToByte(2L), + BYTABLE_NAMESPACE_SEPARATOR, + ByteUtils.longToByte(3L)); + + Assertions.assertArrayEquals(expectKey, tableKey); + + Mockito.doReturn(4L).when(mockIdGenerator).nextId(); + NameIdentifier filesetIdentifier = + NameIdentifier.of("metalake", "catalogs", "schema", "theSame"); + byte[] filesetKey = encoder.encode(filesetIdentifier, EntityType.FILESET); + + // Check the id of table is NOT the same as the id of fileset + Assertions.assertNotEquals(tableKey, filesetKey); + expectKey = + Bytes.concat( + EntityType.FILESET.getShortName().getBytes(StandardCharsets.UTF_8), + BYTABLE_NAMESPACE_SEPARATOR, + ByteUtils.longToByte(0L), + BYTABLE_NAMESPACE_SEPARATOR, + ByteUtils.longToByte(1L), + BYTABLE_NAMESPACE_SEPARATOR, + ByteUtils.longToByte(2L), + BYTABLE_NAMESPACE_SEPARATOR, + ByteUtils.longToByte(4L)); + Assertions.assertArrayEquals(expectKey, filesetKey); + } + } + @Test public void testIdentifierEncoding() throws IOException, IllegalAccessException, NoSuchFieldException { diff --git a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java index 815dbfff3f9..fea6ac7bb41 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java @@ -29,8 +29,10 @@ import com.datastrato.gravitino.meta.BaseMetalake; import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.FilesetEntity; +import com.datastrato.gravitino.meta.GroupEntity; import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.TableEntity; +import com.datastrato.gravitino.meta.UserEntity; import com.datastrato.gravitino.storage.TransactionIdGenerator; import com.datastrato.gravitino.storage.kv.KvGarbageCollector.LogHelper; import java.io.File; @@ -184,6 +186,20 @@ void testRemoveWithGCCollector1() throws IOException, InterruptedException { kvEntityStore.put(schemaEntity); kvEntityStore.put(tableEntity); kvEntityStore.put(filesetEntity); + kvEntityStore.put( + UserEntity.builder() + .withId(1L) + .withAuditInfo(auditInfo) + .withName("the same") + .withNamespace(Namespace.of("metalake1", "catalog1", "schema1")) + .build()); + kvEntityStore.put( + GroupEntity.builder() + .withId(2L) + .withAuditInfo(auditInfo) + .withName("the same") + .withNamespace(Namespace.of("metalake1", "catalog1", "schema1")) + .build()); // now try to scan raw data from kv store KvBackend kvBackend = kvEntityStore.backend; @@ -196,7 +212,7 @@ void testRemoveWithGCCollector1() throws IOException, InterruptedException { .endInclusive(false) .build()); - Assertions.assertEquals(5, data.size()); + Assertions.assertEquals(7, data.size()); KvGarbageCollector kvGarbageCollector = kvEntityStore.kvGarbageCollector; for (Pair pair : data) { @@ -224,6 +240,17 @@ void testRemoveWithGCCollector1() throws IOException, InterruptedException { NameIdentifier.of("metalake1", "catalog1", "schema1", "fileset1"), helper.identifier); break; + case USER: + Assertions.assertEquals( + NameIdentifier.of("metalake1", "catalog1", "schema1", "the same"), + helper.identifier); + break; + + case GROUP: + Assertions.assertEquals( + NameIdentifier.of("metalake1", "catalog1", "schema1", "the same"), + helper.identifier); + break; default: Assertions.fail(); }