Skip to content

Commit

Permalink
[#2697] improvement(core): Support different entity with the same nam…
Browse files Browse the repository at this point in the history
…e under a namespace (#2700)

### What changes were proposed in this pull request?

Support store entities with the same name under a namespace. 

### Why are the changes needed?

It's a necessary feature for Gravitino. 

Fix: #2697 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

UTs
  • Loading branch information
yuqi1129 authored Apr 3, 2024
1 parent 1544303 commit d2cde88
Show file tree
Hide file tree
Showing 7 changed files with 437 additions and 159 deletions.
31 changes: 31 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/Entity.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
*/
package com.datastrato.gravitino;

import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import lombok.Getter;

Expand Down Expand Up @@ -43,6 +45,35 @@ public static EntityType fromShortName(String shortName) {
}
throw new IllegalArgumentException("Unknown entity type: " + shortName);
}

/**
* Returns the parent entity types of the given entity type. The parent entity types are the
* entity types that are higher in the hierarchy than the given entity type. For example, the
* parent entity types of a table are metalake, catalog, and schema. (Sequence: root to leaf)
*
* @param entityType The entity type for which to get the parent entity types.
* @return The parent entity types of the given entity type.
*/
public static List<EntityType> getParentEntityTypes(EntityType entityType) {
switch (entityType) {
case METALAKE:
return ImmutableList.of();
case CATALOG:
return ImmutableList.of(METALAKE);
case SCHEMA:
return ImmutableList.of(METALAKE, CATALOG);
case TABLE:
case FILESET:
case TOPIC:
case USER:
case GROUP:
return ImmutableList.of(METALAKE, CATALOG, SCHEMA);
case COLUMN:
return ImmutableList.of(METALAKE, CATALOG, SCHEMA, TABLE);
default:
throw new IllegalArgumentException("Unknown entity type: " + entityType);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2023 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.kv;

import static com.datastrato.gravitino.Entity.EntityType.CATALOG;
import static com.datastrato.gravitino.Entity.EntityType.FILESET;
import static com.datastrato.gravitino.Entity.EntityType.METALAKE;
import static com.datastrato.gravitino.Entity.EntityType.SCHEMA;
import static com.datastrato.gravitino.Entity.EntityType.TABLE;
import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.LOG;
import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.NAMESPACE_SEPARATOR;
import static com.datastrato.gravitino.storage.kv.BinaryEntityKeyEncoder.TYPE_AND_NAME_SEPARATOR;

import com.datastrato.gravitino.Entity.EntityType;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.storage.NameMappingService;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

public class BinaryEntityEncoderUtil {

// The entity types in version 0.4.x, entities in this set do not have the prefix in the name-id
// mapping. Why do we introduce it? We need to make it backward compatible.
public static final Set<EntityType> VERSION_0_4_COMPATIBLE_ENTITY_TYPES =
ImmutableSet.of(METALAKE, CATALOG, SCHEMA, TABLE);

private BinaryEntityEncoderUtil() {}

/**
* Generate the key for name to id mapping. Currently, the mapping is as following.
*
* <pre>
* Assume we have the following entities:
* metalake: a1 ---- 1
* catalog : a1.b1 ---- 2
* schema : a1.b1.c ---- 3
*
* metalake: a2 ---- 4
* catalog : a2.b2 ---- 5
* schema : a2.b2.c ---- 6
* schema : a2.b2.c1 ---- 7
*
* metalake: a1 ---- 1 means the name of metalake is a1 and the corresponding id is 1
* </pre>
*
* Then we will store the name to id mapping as follows
*
* <pre>
* a1 -- 1
* 1/b1 -- 2
* 1/2/c -- 3
* a2 -- 4
* 4/b2 -- 5
* 4/5/c -- 6
* 4/5/c1 -- 7
* </pre>
*
* @param nameIdentifier name of a specific entity
* @return key that maps to the id of a specific entity. See above, The key maybe like '4/5/c1'
* @throws IOException if error occurs
*/
public static String generateKeyForMapping(
NameIdentifier nameIdentifier, EntityType entityType, NameMappingService nameMappingService)
throws IOException {
Namespace namespace = nameIdentifier.namespace();
String name = nameIdentifier.name();

List<EntityType> parentTypes = EntityType.getParentEntityTypes(entityType);
long[] ids = new long[namespace.length()];
for (int i = 0; i < ids.length; i++) {
ids[i] =
nameMappingService.getIdByName(
concatIdAndName(
ArrayUtils.subarray(ids, 0, i), namespace.level(i), parentTypes.get(i)));
}

return concatIdAndName(ids, name, entityType);
}

/**
* Concatenate the namespace ids and the name of the entity. Assuming the namespace ids are [1, 2]
* and the name is 'schema', the result will be '1/2/sc_schema'.
*
* <p>Attention, in order to make this change backward compatible, if the entity type is TABLE, we
* will not add a prefix to the name. If the entity type is not TABLE, we will add the prefix to
* the name.
*
* @param namespaceIds namespace ids, which are the ids of the parent entities
* @param name name of the entity
* @param type type of the entity
* @return concatenated string that used in the id-name mapping.
*/
public static String concatIdAndName(long[] namespaceIds, String name, EntityType type) {
String context =
Joiner.on(NAMESPACE_SEPARATOR)
.join(
Arrays.stream(namespaceIds).mapToObj(String::valueOf).collect(Collectors.toList()));
// We need to make it backward compatible, so we need to check if the name is already prefixed.
String mappingName =
VERSION_0_4_COMPATIBLE_ENTITY_TYPES.contains(type)
? name
: type.getShortName() + TYPE_AND_NAME_SEPARATOR + name;
return StringUtils.isBlank(context) ? mappingName : context + NAMESPACE_SEPARATOR + mappingName;
}

/**
* Get key prefix of all sub-entities under a specific entities. For example, as a metalake will
* start with `ml/{metalake_id}`, sub-entities under this metalake will have the prefix
*
* <pre>
* catalog: ca/{metalake_id}
* schema: sc/{metalake_id}
* table: ta/{metalake_id}
* </pre>
*
* Why the sub-entities under this metalake start with those prefixes, please see {@link
* KvEntityStore} java class doc.
*
* @param ident identifier of an entity
* @param type type of entity
* @return list of sub-entities prefix
* @throws IOException if error occurs
*/
public static List<byte[]> getSubEntitiesPrefix(
NameIdentifier ident, EntityType type, BinaryEntityKeyEncoder entityKeyEncoder)
throws IOException {
List<byte[]> prefixes = Lists.newArrayList();
byte[] encode = entityKeyEncoder.encode(ident, type, true);
switch (type) {
case METALAKE:
prefixes.add(replacePrefixTypeInfo(encode, CATALOG.getShortName()));
prefixes.add(replacePrefixTypeInfo(encode, SCHEMA.getShortName()));
prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName()));
prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName()));
break;
case CATALOG:
prefixes.add(replacePrefixTypeInfo(encode, SCHEMA.getShortName()));
prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName()));
prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName()));
break;
case SCHEMA:
prefixes.add(replacePrefixTypeInfo(encode, TABLE.getShortName()));
prefixes.add(replacePrefixTypeInfo(encode, FILESET.getShortName()));
break;
case TABLE:
case FILESET:
break;
default:
LOG.warn("Currently unknown type: {}, please check it", type);
}
Collections.reverse(prefixes);
return prefixes;
}

/**
* Replace the prefix type info with the new type info.
*
* @param encode the encoded byte array
* @param subTypePrefix the new type prefix
* @return the new byte array
*/
public static byte[] replacePrefixTypeInfo(byte[] encode, String subTypePrefix) {
byte[] result = new byte[encode.length];
System.arraycopy(encode, 0, result, 0, encode.length);
byte[] bytes = subTypePrefix.getBytes(StandardCharsets.UTF_8);
result[0] = bytes[0];
result[1] = bytes[1];

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import com.datastrato.gravitino.utils.ByteUtils;
import com.datastrato.gravitino.utils.Bytes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,26 +37,28 @@
*
* <pre>
* Key Value
* ml_{ml_id} ----- metalake info
* ml_{ml_id} ----- metalake info
* ca_{ml_id}_{ca_id} ----- catalog_info
* ca_{ml_id}_{ca_id} ----- catalog_info
* sc_{ml_id}_{ca_id}_{sc_id} --- schema_info
* sc_{ml_id}_{ca_id}_{sc_id} --- schema_info
* br_{ml_id}_{ca_id}_{br_id} --- broker_info
* br_{ml_id}_{ca_id}_{br_id} --- broker_info
* ml/{ml_id} ----- metalake info
* ml/{ml_id} ----- metalake info
* ca/{ml_id}/{ca_id} ----- catalog_info
* ca/{ml_id}/{ca_id} ----- catalog_info
* sc/{ml_id}/{ca_id}/{sc_id} --- schema_info
* sc/{ml_id}/{ca_id}/{sc_id} --- schema_info
* br/{ml_id}/{ca_id}/{br_id} --- broker_info
* br/{ml_id}/{ca_id}/{br_id} --- broker_info
*
* ta_{ml_id}_{ca_id}_{sc_id}_{table_id} ----- table_info
* ta_{ml_id}_{ca_id}_{sc_id}_{table_id} ----- table_info
* to_{ml_id}_{ca_id}_{br_id}_{to_id} ----- topic_info
* to_{ml_id}_{ca_id}_{br_id}_{to_id} ----- topic_info
* ta/{ml_id}/{ca_id}/{sc_id}/{table_id} ----- table_info
* ta/{ml_id}/{ca_id}/{sc_id}/{table_id} ----- table_info
* to/{ml_id}/{ca_id}/{br_id}/{to_id} ----- topic_info
* to/{ml_id}/{ca_id}/{br_id}/{to_id} ----- topic_info
* </pre>
*/
public class BinaryEntityKeyEncoder implements EntityKeyEncoder<byte[]> {
public static final Logger LOG = LoggerFactory.getLogger(BinaryEntityKeyEncoder.class);

public static final String NAMESPACE_SEPARATOR = "/";

public static final String TYPE_AND_NAME_SEPARATOR = "_";

@VisibleForTesting
static final byte[] BYTABLE_NAMESPACE_SEPARATOR =
NAMESPACE_SEPARATOR.getBytes(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -92,14 +91,6 @@ public BinaryEntityKeyEncoder(NameMappingService nameMappingService) {
this.nameMappingService = nameMappingService;
}

private String generateMappingKey(long[] namespaceIds, String name) {
String context =
Joiner.on(NAMESPACE_SEPARATOR)
.join(
Arrays.stream(namespaceIds).mapToObj(String::valueOf).collect(Collectors.toList()));
return StringUtils.isBlank(context) ? name : context + NAMESPACE_SEPARATOR + name;
}

/**
* Encode entity key for KV backend, e.g., RocksDB. The key is used to store the entity in the
* backend.
Expand All @@ -114,8 +105,11 @@ private byte[] encodeEntity(
NameIdentifier identifier, EntityType entityType, boolean nullIfMissing) throws IOException {
String[] nameSpace = identifier.namespace().levels();
long[] namespaceIds = new long[nameSpace.length];
List<EntityType> parentEntityTypes = EntityType.getParentEntityTypes(entityType);
for (int i = 0; i < nameSpace.length; i++) {
String nameKey = generateMappingKey(ArrayUtils.subarray(namespaceIds, 0, i), nameSpace[i]);
String nameKey =
BinaryEntityEncoderUtil.concatIdAndName(
ArrayUtils.subarray(namespaceIds, 0, i), nameSpace[i], parentEntityTypes.get(i));
if (nullIfMissing && null == nameMappingService.getIdByName(nameKey)) {
return null;
}
Expand All @@ -135,7 +129,8 @@ private byte[] encodeEntity(
// This is for point query and need to use specific name
long[] namespaceAndNameIds = new long[namespaceIds.length + 1];
System.arraycopy(namespaceIds, 0, namespaceAndNameIds, 0, namespaceIds.length);
String nameKey = generateMappingKey(namespaceIds, identifier.name());
String nameKey =
BinaryEntityEncoderUtil.concatIdAndName(namespaceIds, identifier.name(), entityType);
if (nullIfMissing && null == nameMappingService.getIdByName(nameKey)) {
return null;
}
Expand Down Expand Up @@ -240,10 +235,18 @@ public Pair<NameIdentifier, EntityType> decode(byte[] key) throws IOException {
// Please review the id-name mapping content in KvNameMappingService.java and
// method generateMappingKey in this class.
String[] names = new String[ids.length];
List<EntityType> parents = EntityType.getParentEntityTypes(entityType);
for (int i = 0; i < ids.length; i++) {
// The format of name is like '{metalake_id}/{catalog_id}/schema_name'
// The format of name is like '{metalake_id}/{catalog_id}/sc_schema_name'
String name = nameMappingService.getNameById(ids[i]);
names[i] = name.split(NAMESPACE_SEPARATOR, i + 1)[i];
// extract the real name from the name mapping service
// The name for table is 'table' NOT 'ta_table' to make it backward compatible.
EntityType currentEntityType = i < parents.size() ? parents.get(i) : entityType;
if (BinaryEntityEncoderUtil.VERSION_0_4_COMPATIBLE_ENTITY_TYPES.contains(currentEntityType)) {
names[i] = name.split(NAMESPACE_SEPARATOR, i + 1)[i];
} else {
names[i] = name.split(NAMESPACE_SEPARATOR, i + 1)[i].substring(3);
}
}

NameIdentifier nameIdentifier = NameIdentifier.of(names);
Expand Down
Loading

0 comments on commit d2cde88

Please sign in to comment.