From 1a22afee5c54e4752816d26f8b0b4786cc0a520a Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Mon, 23 Dec 2024 12:12:26 +0800 Subject: [PATCH] [#5794] feat(core): Add ModelOperationDispatcher logic (#5908) ### What changes were proposed in this pull request? This PR adds the ModelOperationDispatcher logic in core. ### Why are the changes needed? This is a part of work to support model management. Fix: #5794 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added UTs to test. --- .../gravitino/catalog/CatalogManager.java | 15 + .../catalog/EntityCombinedFileset.java | 2 +- .../catalog/EntityCombinedModel.java | 94 +++++++ .../catalog/EntityCombinedModelVersion.java | 101 +++++++ .../catalog/EntityCombinedSchema.java | 2 +- .../catalog/EntityCombinedTable.java | 2 +- .../catalog/EntityCombinedTopic.java | 2 +- .../catalog/FilesetOperationDispatcher.java | 6 +- .../catalog/ModelOperationDispatcher.java | 166 ++++++++++- .../catalog/SchemaOperationDispatcher.java | 18 +- .../catalog/TableOperationDispatcher.java | 16 +- .../catalog/TopicOperationDispatcher.java | 10 +- .../org/apache/gravitino/TestCatalog.java | 5 + .../java/org/apache/gravitino/TestModel.java | 44 +++ .../apache/gravitino/TestModelVersion.java | 45 +++ .../catalog/TestModelOperationDispatcher.java | 264 ++++++++++++++++++ .../connector/TestCatalogOperations.java | 248 +++++++++++++++- 17 files changed, 1000 insertions(+), 40 deletions(-) create mode 100644 core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java create mode 100644 core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java create mode 100644 core/src/test/java/org/apache/gravitino/TestModel.java create mode 100644 core/src/test/java/org/apache/gravitino/TestModelVersion.java create mode 100644 core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java diff --git a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java index 2e77b8e162a..43bc74bb2a1 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java +++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java @@ -95,6 +95,7 @@ import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.meta.CatalogEntity; import org.apache.gravitino.meta.SchemaEntity; +import org.apache.gravitino.model.ModelCatalog; import org.apache.gravitino.rel.SupportsPartitions; import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.TableCatalog; @@ -178,6 +179,16 @@ public R doWithTopicOps(ThrowableFunction fn) throws Except }); } + public R doWithModelOps(ThrowableFunction fn) throws Exception { + return classLoader.withClassLoader( + cl -> { + if (asModels() == null) { + throw new UnsupportedOperationException("Catalog does not support model operations"); + } + return fn.apply(asModels()); + }); + } + public R doWithCatalogOps(ThrowableFunction fn) throws Exception { return classLoader.withClassLoader(cl -> fn.apply(catalog.ops())); } @@ -236,6 +247,10 @@ private FilesetCatalog asFilesets() { private TopicCatalog asTopics() { return catalog.ops() instanceof TopicCatalog ? (TopicCatalog) catalog.ops() : null; } + + private ModelCatalog asModels() { + return catalog.ops() instanceof ModelCatalog ? (ModelCatalog) catalog.ops() : null; + } } private final Config config; diff --git a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java index 2a6b55a2ddd..c7b847fc9c6 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java +++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java @@ -48,7 +48,7 @@ public static EntityCombinedFileset of(Fileset fileset) { return new EntityCombinedFileset(fileset, null); } - public EntityCombinedFileset withHiddenPropertiesSet(Set hiddenProperties) { + public EntityCombinedFileset withHiddenProperties(Set hiddenProperties) { this.hiddenProperties = hiddenProperties; return this; } diff --git a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java new file mode 100644 index 00000000000..4aeefa0be59 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.gravitino.Audit; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.ModelEntity; +import org.apache.gravitino.model.Model; + +public final class EntityCombinedModel implements Model { + + private final Model model; + + private final ModelEntity modelEntity; + + private Set hiddenProperties = Collections.emptySet(); + + private EntityCombinedModel(Model model, ModelEntity modelEntity) { + this.model = model; + this.modelEntity = modelEntity; + } + + public static EntityCombinedModel of(Model model, ModelEntity modelEntity) { + return new EntityCombinedModel(model, modelEntity); + } + + public static EntityCombinedModel of(Model model) { + return new EntityCombinedModel(model, null); + } + + public EntityCombinedModel withHiddenProperties(Set hiddenProperties) { + this.hiddenProperties = hiddenProperties; + return this; + } + + @Override + public String name() { + return model.name(); + } + + @Override + public String comment() { + return model.comment(); + } + + @Override + public Map properties() { + return model.properties() == null + ? null + : model.properties().entrySet().stream() + .filter(e -> !hiddenProperties.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public int latestVersion() { + return model.latestVersion(); + } + + @Override + public Audit auditInfo() { + AuditInfo mergedAudit = + AuditInfo.builder() + .withCreator(model.auditInfo().creator()) + .withCreateTime(model.auditInfo().createTime()) + .withLastModifier(model.auditInfo().lastModifier()) + .withLastModifiedTime(model.auditInfo().lastModifiedTime()) + .build(); + + return modelEntity == null + ? mergedAudit + : mergedAudit.merge(modelEntity.auditInfo(), true /* overwrite */); + } +} diff --git a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java new file mode 100644 index 00000000000..b41e2889de3 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.gravitino.Audit; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.ModelVersionEntity; +import org.apache.gravitino.model.ModelVersion; + +public final class EntityCombinedModelVersion implements ModelVersion { + + private final ModelVersion modelVersion; + + private final ModelVersionEntity modelVersionEntity; + + private Set hiddenProperties = Collections.emptySet(); + + private EntityCombinedModelVersion( + ModelVersion modelVersion, ModelVersionEntity modelVersionEntity) { + this.modelVersion = modelVersion; + this.modelVersionEntity = modelVersionEntity; + } + + public static EntityCombinedModelVersion of( + ModelVersion modelVersion, ModelVersionEntity modelVersionEntity) { + return new EntityCombinedModelVersion(modelVersion, modelVersionEntity); + } + + public static EntityCombinedModelVersion of(ModelVersion modelVersion) { + return new EntityCombinedModelVersion(modelVersion, null); + } + + public EntityCombinedModelVersion withHiddenProperties(Set hiddenProperties) { + this.hiddenProperties = hiddenProperties; + return this; + } + + @Override + public int version() { + return modelVersion.version(); + } + + @Override + public String comment() { + return modelVersion.comment(); + } + + @Override + public Map properties() { + return modelVersion.properties() == null + ? null + : modelVersion.properties().entrySet().stream() + .filter(e -> !hiddenProperties.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public String uri() { + return modelVersion.uri(); + } + + @Override + public String[] aliases() { + return modelVersion.aliases(); + } + + @Override + public Audit auditInfo() { + AuditInfo mergedAudit = + AuditInfo.builder() + .withCreator(modelVersion.auditInfo().creator()) + .withCreateTime(modelVersion.auditInfo().createTime()) + .withLastModifier(modelVersion.auditInfo().lastModifier()) + .withLastModifiedTime(modelVersion.auditInfo().lastModifiedTime()) + .build(); + + return modelVersionEntity == null + ? mergedAudit + : mergedAudit.merge(modelVersionEntity.auditInfo(), true /* overwrite */); + } +} diff --git a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java index 79a4b12a10c..ce3d0a3be72 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java +++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java @@ -61,7 +61,7 @@ public static EntityCombinedSchema of(Schema schema) { return of(schema, null); } - public EntityCombinedSchema withHiddenPropertiesSet(Set hiddenProperties) { + public EntityCombinedSchema withHiddenProperties(Set hiddenProperties) { this.hiddenProperties = hiddenProperties; return this; } diff --git a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java index 4b0da1568b9..70cbd0ace4a 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java +++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java @@ -67,7 +67,7 @@ public static EntityCombinedTable of(Table table) { return new EntityCombinedTable(table, null); } - public EntityCombinedTable withHiddenPropertiesSet(Set hiddenProperties) { + public EntityCombinedTable withHiddenProperties(Set hiddenProperties) { this.hiddenProperties = hiddenProperties; return this; } diff --git a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java index 2360f31ae74..972df622b3d 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java +++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java @@ -60,7 +60,7 @@ public static EntityCombinedTopic of(Topic topic) { return new EntityCombinedTopic(topic, null); } - public EntityCombinedTopic withHiddenPropertiesSet(Set hiddenProperties) { + public EntityCombinedTopic withHiddenProperties(Set hiddenProperties) { this.hiddenProperties = hiddenProperties; return this; } diff --git a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java index 98c6311bd7c..828e981380a 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java @@ -81,7 +81,7 @@ public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException { // Currently we only support maintaining the Fileset in the Gravitino's store. return EntityCombinedFileset.of(fileset) - .withHiddenPropertiesSet( + .withHiddenProperties( getHiddenPropertyNames( catalogIdent, HasPropertyMetadata::filesetPropertiesMetadata, @@ -137,7 +137,7 @@ public Fileset createFileset( NoSuchSchemaException.class, FilesetAlreadyExistsException.class); return EntityCombinedFileset.of(createdFileset) - .withHiddenPropertiesSet( + .withHiddenProperties( getHiddenPropertyNames( catalogIdent, HasPropertyMetadata::filesetPropertiesMetadata, @@ -172,7 +172,7 @@ public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes) NoSuchFilesetException.class, IllegalArgumentException.class); return EntityCombinedFileset.of(alteredFileset) - .withHiddenPropertiesSet( + .withHiddenProperties( getHiddenPropertyNames( catalogIdent, HasPropertyMetadata::filesetPropertiesMetadata, diff --git a/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java index eb1f17c96da..1c5291d51a2 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java @@ -18,15 +18,23 @@ */ package org.apache.gravitino.catalog; +import static org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate; +import static org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier; + import java.util.Map; +import java.util.function.Supplier; import org.apache.gravitino.EntityStore; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.connector.HasPropertyMetadata; import org.apache.gravitino.exceptions.ModelAlreadyExistsException; import org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException; import org.apache.gravitino.exceptions.NoSuchModelException; import org.apache.gravitino.exceptions.NoSuchModelVersionException; import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.lock.LockType; +import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.model.Model; import org.apache.gravitino.model.ModelVersion; import org.apache.gravitino.storage.IdGenerator; @@ -40,40 +48,114 @@ public ModelOperationDispatcher( @Override public NameIdentifier[] listModels(Namespace namespace) throws NoSuchSchemaException { - throw new UnsupportedOperationException("Not implemented"); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(namespace.levels()), + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(NameIdentifier.of(namespace.levels())), + c -> c.doWithModelOps(m -> m.listModels(namespace)), + NoSuchSchemaException.class)); } @Override public Model getModel(NameIdentifier ident) throws NoSuchModelException { - throw new UnsupportedOperationException("Not implemented"); + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Model model = + TreeLockUtils.doWithTreeLock( + ident, + LockType.READ, + () -> + doWithCatalog( + catalogIdent, + c -> c.doWithModelOps(m -> m.getModel(ident)), + NoSuchModelException.class)); + + return EntityCombinedModel.of(model) + .withHiddenProperties( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::modelPropertiesMetadata, model.properties())); } @Override public Model registerModel(NameIdentifier ident, String comment, Map properties) throws NoSuchModelException, ModelAlreadyExistsException { - throw new UnsupportedOperationException("Not implemented"); + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Map updatedProperties = checkAndUpdateProperties(catalogIdent, properties); + + Model registeredModel = + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> + doWithCatalog( + catalogIdent, + c -> c.doWithModelOps(m -> m.registerModel(ident, comment, updatedProperties)), + NoSuchSchemaException.class, + ModelAlreadyExistsException.class)); + + return EntityCombinedModel.of(registeredModel) + .withHiddenProperties( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::modelPropertiesMetadata, + registeredModel.properties())); } @Override public boolean deleteModel(NameIdentifier ident) { - throw new UnsupportedOperationException("Not implemented"); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithModelOps(m -> m.deleteModel(ident)), + RuntimeException.class)); } @Override public int[] listModelVersions(NameIdentifier ident) throws NoSuchModelException { - throw new UnsupportedOperationException("Not implemented"); + return TreeLockUtils.doWithTreeLock( + ident, + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithModelOps(m -> m.listModelVersions(ident)), + NoSuchModelException.class)); } @Override public ModelVersion getModelVersion(NameIdentifier ident, int version) throws NoSuchModelVersionException { - throw new UnsupportedOperationException("Not implemented"); + return internalGetModelVersion( + ident, + () -> + TreeLockUtils.doWithTreeLock( + ident, + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithModelOps(m -> m.getModelVersion(ident, version)), + NoSuchModelVersionException.class))); } @Override public ModelVersion getModelVersion(NameIdentifier ident, String alias) throws NoSuchModelVersionException { - throw new UnsupportedOperationException("Not implemented"); + return internalGetModelVersion( + ident, + () -> + TreeLockUtils.doWithTreeLock( + ident, + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithModelOps(m -> m.getModelVersion(ident, alias)), + NoSuchModelVersionException.class))); } @Override @@ -84,16 +166,80 @@ public void linkModelVersion( String comment, Map properties) throws NoSuchModelException, ModelVersionAliasesAlreadyExistException { - throw new UnsupportedOperationException("Not implemented"); + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Map updatedProperties = checkAndUpdateProperties(catalogIdent, properties); + + TreeLockUtils.doWithTreeLock( + ident, + LockType.WRITE, + () -> + doWithCatalog( + catalogIdent, + c -> + c.doWithModelOps( + m -> { + m.linkModelVersion(ident, uri, aliases, comment, updatedProperties); + return null; + }), + NoSuchModelException.class, + ModelVersionAliasesAlreadyExistException.class)); } @Override public boolean deleteModelVersion(NameIdentifier ident, int version) { - throw new UnsupportedOperationException("Not implemented"); + return TreeLockUtils.doWithTreeLock( + ident, + LockType.WRITE, + () -> + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithModelOps(m -> m.deleteModelVersion(ident, version)), + RuntimeException.class)); } @Override public boolean deleteModelVersion(NameIdentifier ident, String alias) { - throw new UnsupportedOperationException("Not implemented"); + return TreeLockUtils.doWithTreeLock( + ident, + LockType.WRITE, + () -> + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithModelOps(m -> m.deleteModelVersion(ident, alias)), + RuntimeException.class)); + } + + private ModelVersion internalGetModelVersion( + NameIdentifier ident, Supplier supplier) { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + + ModelVersion modelVersion = supplier.get(); + return EntityCombinedModelVersion.of(modelVersion) + .withHiddenProperties( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::modelPropertiesMetadata, + modelVersion.properties())); + } + + private Map checkAndUpdateProperties( + NameIdentifier catalogIdent, Map properties) { + TreeLockUtils.doWithTreeLock( + catalogIdent, + LockType.READ, + () -> + doWithCatalog( + catalogIdent, + c -> + c.doWithPropertiesMeta( + p -> { + validatePropertyForCreate(p.modelPropertiesMetadata(), properties); + return null; + }), + IllegalArgumentException.class)); + + long uid = idGenerator.nextId(); + StringIdentifier stringId = StringIdentifier.fromId(uid); + return StringIdentifier.newPropertiesWithId(stringId, properties); } } diff --git a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java index ce870523a14..789e5e47155 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java @@ -125,7 +125,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map { + + private Builder() {} + + @Override + protected TestModel internalBuild() { + TestModel model = new TestModel(); + model.name = name; + model.comment = comment; + model.properties = properties; + model.latestVersion = latestVersion; + model.auditInfo = auditInfo; + return model; + } + } + + public static Builder builder() { + return new Builder(); + } +} diff --git a/core/src/test/java/org/apache/gravitino/TestModelVersion.java b/core/src/test/java/org/apache/gravitino/TestModelVersion.java new file mode 100644 index 00000000000..487496c5fb0 --- /dev/null +++ b/core/src/test/java/org/apache/gravitino/TestModelVersion.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino; + +import org.apache.gravitino.connector.BaseModelVersion; + +public class TestModelVersion extends BaseModelVersion { + + public static class Builder extends BaseModelVersionBuilder { + + private Builder() {} + + @Override + protected TestModelVersion internalBuild() { + TestModelVersion modelVersion = new TestModelVersion(); + modelVersion.version = version; + modelVersion.comment = comment; + modelVersion.aliases = aliases; + modelVersion.uri = uri; + modelVersion.properties = properties; + modelVersion.auditInfo = auditInfo; + return modelVersion; + } + } + + public static Builder builder() { + return new Builder(); + } +} diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java new file mode 100644 index 00000000000..10bb85a1e11 --- /dev/null +++ b/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog; + +import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; +import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; +import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; +import static org.apache.gravitino.StringIdentifier.ID_KEY; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.gravitino.Config; +import org.apache.gravitino.GravitinoEnv; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.exceptions.NoSuchModelException; +import org.apache.gravitino.exceptions.NoSuchModelVersionException; +import org.apache.gravitino.lock.LockManager; +import org.apache.gravitino.model.Model; +import org.apache.gravitino.model.ModelVersion; +import org.apache.gravitino.utils.NameIdentifierUtil; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestModelOperationDispatcher extends TestOperationDispatcher { + + static ModelOperationDispatcher modelOperationDispatcher; + + static SchemaOperationDispatcher schemaOperationDispatcher; + + @BeforeAll + public static void initialize() throws IOException, IllegalAccessException { + Config config = Mockito.mock(Config.class); + Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY); + Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY); + Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL); + FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true); + + modelOperationDispatcher = + new ModelOperationDispatcher(catalogManager, entityStore, idGenerator); + schemaOperationDispatcher = + new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator); + } + + @Test + public void testRegisterAndGetModel() { + String schemaName = randomSchemaName(); + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, schemaName); + schemaOperationDispatcher.createSchema(schemaIdent, "comment", null); + + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + String modelName = randomModelName(); + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName); + + Model model = modelOperationDispatcher.registerModel(modelIdent, "comment", props); + Assertions.assertEquals(modelName, model.name()); + Assertions.assertEquals("comment", model.comment()); + Assertions.assertEquals(props, model.properties()); + Assertions.assertFalse(model.properties().containsKey(ID_KEY)); + + Model registeredModel = modelOperationDispatcher.getModel(modelIdent); + Assertions.assertEquals(modelName, registeredModel.name()); + Assertions.assertEquals("comment", registeredModel.comment()); + Assertions.assertEquals(props, registeredModel.properties()); + Assertions.assertFalse(registeredModel.properties().containsKey(ID_KEY)); + + // Test register model with illegal property + Map illegalProps = ImmutableMap.of("k1", "v1", ID_KEY, "test"); + testPropertyException( + () -> modelOperationDispatcher.registerModel(modelIdent, "comment", illegalProps), + "Properties are reserved and cannot be set", + ID_KEY); + } + + @Test + public void testRegisterAndListModels() { + String schemaName = randomSchemaName(); + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, schemaName); + schemaOperationDispatcher.createSchema(schemaIdent, "comment", null); + + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + String modelName1 = randomModelName(); + NameIdentifier modelIdent1 = + NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName1); + modelOperationDispatcher.registerModel(modelIdent1, "comment", props); + + String modelName2 = randomModelName(); + NameIdentifier modelIdent2 = + NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName2); + modelOperationDispatcher.registerModel(modelIdent2, "comment", props); + + NameIdentifier[] modelIdents = modelOperationDispatcher.listModels(modelIdent1.namespace()); + Assertions.assertEquals(2, modelIdents.length); + Set modelIdentSet = Sets.newHashSet(modelIdents); + Assertions.assertTrue(modelIdentSet.contains(modelIdent1)); + Assertions.assertTrue(modelIdentSet.contains(modelIdent2)); + } + + @Test + public void testRegisterAndDeleteModel() { + String schemaName = randomSchemaName(); + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, schemaName); + schemaOperationDispatcher.createSchema(schemaIdent, "comment", null); + + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + String modelName = randomModelName(); + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName); + + modelOperationDispatcher.registerModel(modelIdent, "comment", props); + Assertions.assertTrue(modelOperationDispatcher.deleteModel(modelIdent)); + Assertions.assertFalse(modelOperationDispatcher.deleteModel(modelIdent)); + Assertions.assertThrows( + NoSuchModelException.class, () -> modelOperationDispatcher.getModel(modelIdent)); + + // Test delete in-existent model + Assertions.assertFalse( + modelOperationDispatcher.deleteModel(NameIdentifier.of(metalake, catalog, "inexistent"))); + } + + @Test + public void testLinkAndGetModelVersion() { + String schemaName = randomSchemaName(); + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, schemaName); + schemaOperationDispatcher.createSchema(schemaIdent, "comment", null); + + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + String modelName = randomModelName(); + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName); + + Model model = modelOperationDispatcher.registerModel(modelIdent, "comment", props); + Assertions.assertEquals(0, model.latestVersion()); + + String[] aliases = new String[] {"alias1", "alias2"}; + modelOperationDispatcher.linkModelVersion(modelIdent, "path", aliases, "comment", props); + + ModelVersion linkedModelVersion = modelOperationDispatcher.getModelVersion(modelIdent, 0); + Assertions.assertEquals(0, linkedModelVersion.version()); + Assertions.assertEquals("path", linkedModelVersion.uri()); + Assertions.assertArrayEquals(aliases, linkedModelVersion.aliases()); + Assertions.assertEquals("comment", linkedModelVersion.comment()); + Assertions.assertEquals(props, linkedModelVersion.properties()); + Assertions.assertFalse(linkedModelVersion.properties().containsKey(ID_KEY)); + + // Test get model version with alias + ModelVersion linkedModelVersionWithAlias = + modelOperationDispatcher.getModelVersion(modelIdent, "alias1"); + Assertions.assertEquals(0, linkedModelVersionWithAlias.version()); + Assertions.assertEquals("path", linkedModelVersionWithAlias.uri()); + Assertions.assertArrayEquals(aliases, linkedModelVersionWithAlias.aliases()); + Assertions.assertFalse(linkedModelVersionWithAlias.properties().containsKey(ID_KEY)); + + ModelVersion linkedModelVersionWithAlias2 = + modelOperationDispatcher.getModelVersion(modelIdent, "alias2"); + Assertions.assertEquals(0, linkedModelVersionWithAlias2.version()); + Assertions.assertEquals("path", linkedModelVersionWithAlias2.uri()); + Assertions.assertArrayEquals(aliases, linkedModelVersionWithAlias2.aliases()); + Assertions.assertFalse(linkedModelVersionWithAlias2.properties().containsKey(ID_KEY)); + + // Test Link model version with illegal property + Map illegalProps = ImmutableMap.of("k1", "v1", ID_KEY, "test"); + testPropertyException( + () -> + modelOperationDispatcher.linkModelVersion( + modelIdent, "path", aliases, "comment", illegalProps), + "Properties are reserved and cannot be set", + ID_KEY); + } + + @Test + public void testLinkAndListModelVersion() { + String schemaName = randomSchemaName(); + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, schemaName); + schemaOperationDispatcher.createSchema(schemaIdent, "comment", null); + + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + String modelName = randomModelName(); + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName); + + Model model = modelOperationDispatcher.registerModel(modelIdent, "comment", props); + Assertions.assertEquals(0, model.latestVersion()); + + String[] aliases1 = new String[] {"alias1"}; + String[] aliases2 = new String[] {"alias2"}; + modelOperationDispatcher.linkModelVersion(modelIdent, "path1", aliases1, "comment", props); + modelOperationDispatcher.linkModelVersion(modelIdent, "path2", aliases2, "comment", props); + + int[] versions = modelOperationDispatcher.listModelVersions(modelIdent); + Assertions.assertEquals(2, versions.length); + Set versionSet = Arrays.stream(versions).boxed().collect(Collectors.toSet()); + Assertions.assertTrue(versionSet.contains(0)); + Assertions.assertTrue(versionSet.contains(1)); + } + + @Test + public void testLinkAndDeleteModelVersion() { + String schemaName = randomSchemaName(); + NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, schemaName); + schemaOperationDispatcher.createSchema(schemaIdent, "comment", null); + + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + String modelName = randomModelName(); + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName); + + Model model = modelOperationDispatcher.registerModel(modelIdent, "comment", props); + Assertions.assertEquals(0, model.latestVersion()); + + String[] aliases = new String[] {"alias1"}; + modelOperationDispatcher.linkModelVersion(modelIdent, "path", aliases, "comment", props); + Assertions.assertTrue(modelOperationDispatcher.deleteModelVersion(modelIdent, 0)); + Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent, 0)); + Assertions.assertThrows( + NoSuchModelVersionException.class, + () -> modelOperationDispatcher.getModelVersion(modelIdent, 0)); + + // Test delete in-existent model version + Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent, 1)); + + // Tet delete model version with alias + String[] aliases2 = new String[] {"alias2"}; + modelOperationDispatcher.linkModelVersion(modelIdent, "path2", aliases2, "comment", props); + Assertions.assertTrue(modelOperationDispatcher.deleteModelVersion(modelIdent, "alias2")); + Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent, "alias2")); + Assertions.assertThrows( + NoSuchModelVersionException.class, + () -> modelOperationDispatcher.getModelVersion(modelIdent, "alias2")); + } + + private String randomSchemaName() { + return "schema_" + UUID.randomUUID().toString().replace("-", ""); + } + + private String randomModelName() { + return "model_" + UUID.randomUUID().toString().replace("-", ""); + } +} diff --git a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java index 4fb98c596b8..f7775ef32e7 100644 --- a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java +++ b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java @@ -26,11 +26,13 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; @@ -38,6 +40,8 @@ import org.apache.gravitino.SchemaChange; import org.apache.gravitino.TestColumn; import org.apache.gravitino.TestFileset; +import org.apache.gravitino.TestModel; +import org.apache.gravitino.TestModelVersion; import org.apache.gravitino.TestSchema; import org.apache.gravitino.TestTable; import org.apache.gravitino.TestTopic; @@ -47,8 +51,12 @@ import org.apache.gravitino.exceptions.ConnectionFailedException; import org.apache.gravitino.exceptions.FilesetAlreadyExistsException; import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.apache.gravitino.exceptions.ModelAlreadyExistsException; +import org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException; import org.apache.gravitino.exceptions.NoSuchCatalogException; import org.apache.gravitino.exceptions.NoSuchFilesetException; +import org.apache.gravitino.exceptions.NoSuchModelException; +import org.apache.gravitino.exceptions.NoSuchModelVersionException; import org.apache.gravitino.exceptions.NoSuchSchemaException; import org.apache.gravitino.exceptions.NoSuchTableException; import org.apache.gravitino.exceptions.NoSuchTopicException; @@ -64,6 +72,9 @@ import org.apache.gravitino.messaging.TopicCatalog; import org.apache.gravitino.messaging.TopicChange; import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.model.Model; +import org.apache.gravitino.model.ModelCatalog; +import org.apache.gravitino.model.ModelVersion; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.TableCatalog; @@ -76,7 +87,12 @@ import org.slf4j.LoggerFactory; public class TestCatalogOperations - implements CatalogOperations, TableCatalog, FilesetCatalog, TopicCatalog, SupportsSchemas { + implements CatalogOperations, + TableCatalog, + FilesetCatalog, + TopicCatalog, + ModelCatalog, + SupportsSchemas { private static final Logger LOG = LoggerFactory.getLogger(TestCatalogOperations.class); private final Map tables; @@ -87,6 +103,12 @@ public class TestCatalogOperations private final Map topics; + private final Map models; + + private final Map, TestModelVersion> modelVersions; + + private final Map, Integer> modelAliasToVersion; + public static final String FAIL_CREATE = "fail-create"; public static final String FAIL_TEST = "need-fail"; @@ -98,6 +120,9 @@ public TestCatalogOperations(Map config) { schemas = Maps.newHashMap(); filesets = Maps.newHashMap(); topics = Maps.newHashMap(); + models = Maps.newHashMap(); + modelVersions = Maps.newHashMap(); + modelAliasToVersion = Maps.newHashMap(); } @Override @@ -649,6 +674,227 @@ public void testConnection( } } + @Override + public NameIdentifier[] listModels(Namespace namespace) throws NoSuchSchemaException { + NameIdentifier modelSchemaIdent = NameIdentifier.of(namespace.levels()); + if (!schemas.containsKey(modelSchemaIdent)) { + throw new NoSuchSchemaException("Schema %s does not exist", modelSchemaIdent); + } + + return models.keySet().stream() + .filter(ident -> ident.namespace().equals(namespace)) + .toArray(NameIdentifier[]::new); + } + + @Override + public Model getModel(NameIdentifier ident) throws NoSuchModelException { + if (models.containsKey(ident)) { + return models.get(ident); + } else { + throw new NoSuchModelException("Model %s does not exist", ident); + } + } + + @Override + public Model registerModel(NameIdentifier ident, String comment, Map properties) + throws NoSuchSchemaException, ModelAlreadyExistsException { + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + if (!schemas.containsKey(schemaIdent)) { + throw new NoSuchSchemaException("Schema %s does not exist", schemaIdent); + } + + AuditInfo auditInfo = + AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build(); + TestModel model = + TestModel.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(properties) + .withLatestVersion(0) + .withAuditInfo(auditInfo) + .build(); + + if (models.containsKey(ident)) { + throw new ModelAlreadyExistsException("Model %s already exists", ident); + } else { + models.put(ident, model); + } + + return model; + } + + @Override + public boolean deleteModel(NameIdentifier ident) { + if (!models.containsKey(ident)) { + return false; + } + + models.remove(ident); + + List> deletedVersions = + modelVersions.entrySet().stream() + .filter(e -> e.getKey().getLeft().equals(ident)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + deletedVersions.forEach(modelVersions::remove); + + List> deletedAliases = + modelAliasToVersion.entrySet().stream() + .filter(e -> e.getKey().getLeft().equals(ident)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + deletedAliases.forEach(modelAliasToVersion::remove); + + return true; + } + + @Override + public int[] listModelVersions(NameIdentifier ident) throws NoSuchModelException { + if (!models.containsKey(ident)) { + throw new NoSuchModelException("Model %s does not exist", ident); + } + + return modelVersions.entrySet().stream() + .filter(e -> e.getKey().getLeft().equals(ident)) + .mapToInt(e -> e.getValue().version()) + .toArray(); + } + + @Override + public ModelVersion getModelVersion(NameIdentifier ident, int version) + throws NoSuchModelVersionException { + if (!models.containsKey(ident)) { + throw new NoSuchModelVersionException("Model %s does not exist", ident); + } + + Pair versionPair = Pair.of(ident, version); + if (!modelVersions.containsKey(versionPair)) { + throw new NoSuchModelVersionException("Model version %s does not exist", versionPair); + } + + return modelVersions.get(versionPair); + } + + @Override + public ModelVersion getModelVersion(NameIdentifier ident, String alias) + throws NoSuchModelVersionException { + if (!models.containsKey(ident)) { + throw new NoSuchModelVersionException("Model %s does not exist", ident); + } + + Pair aliasPair = Pair.of(ident, alias); + if (!modelAliasToVersion.containsKey(aliasPair)) { + throw new NoSuchModelVersionException("Model version %s does not exist", alias); + } + + int version = modelAliasToVersion.get(aliasPair); + Pair versionPair = Pair.of(ident, version); + if (!modelVersions.containsKey(versionPair)) { + throw new NoSuchModelVersionException("Model version %s does not exist", versionPair); + } + + return modelVersions.get(versionPair); + } + + @Override + public void linkModelVersion( + NameIdentifier ident, + String uri, + String[] aliases, + String comment, + Map properties) + throws NoSuchModelException, ModelVersionAliasesAlreadyExistException { + if (!models.containsKey(ident)) { + throw new NoSuchModelException("Model %s does not exist", ident); + } + + String[] aliasArray = aliases != null ? aliases : new String[0]; + for (String alias : aliasArray) { + Pair aliasPair = Pair.of(ident, alias); + if (modelAliasToVersion.containsKey(aliasPair)) { + throw new ModelVersionAliasesAlreadyExistException( + "Model version alias %s already exists", alias); + } + } + + int version = models.get(ident).latestVersion(); + TestModelVersion modelVersion = + TestModelVersion.builder() + .withVersion(version) + .withAliases(aliases) + .withComment(comment) + .withUri(uri) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build()) + .build(); + Pair versionPair = Pair.of(ident, version); + modelVersions.put(versionPair, modelVersion); + for (String alias : aliasArray) { + Pair aliasPair = Pair.of(ident, alias); + modelAliasToVersion.put(aliasPair, version); + } + + TestModel model = models.get(ident); + TestModel updatedModel = + TestModel.builder() + .withName(model.name()) + .withComment(model.comment()) + .withProperties(model.properties()) + .withLatestVersion(version + 1) + .withAuditInfo(model.auditInfo()) + .build(); + models.put(ident, updatedModel); + } + + @Override + public boolean deleteModelVersion(NameIdentifier ident, int version) { + if (!models.containsKey(ident)) { + return false; + } + + Pair versionPair = Pair.of(ident, version); + if (!modelVersions.containsKey(versionPair)) { + return false; + } + + TestModelVersion modelVersion = modelVersions.remove(versionPair); + if (modelVersion.aliases() != null) { + for (String alias : modelVersion.aliases()) { + Pair aliasPair = Pair.of(ident, alias); + modelAliasToVersion.remove(aliasPair); + } + } + + return true; + } + + @Override + public boolean deleteModelVersion(NameIdentifier ident, String alias) { + if (!models.containsKey(ident)) { + return false; + } + + Pair aliasPair = Pair.of(ident, alias); + if (!modelAliasToVersion.containsKey(aliasPair)) { + return false; + } + + int version = modelAliasToVersion.remove(aliasPair); + Pair versionPair = Pair.of(ident, version); + if (!modelVersions.containsKey(versionPair)) { + return false; + } + + TestModelVersion modelVersion = modelVersions.remove(versionPair); + for (String modelVersionAlias : modelVersion.aliases()) { + Pair modelAliasPair = Pair.of(ident, modelVersionAlias); + modelAliasToVersion.remove(modelAliasPair); + } + + return true; + } + private boolean hasCallerContext() { return CallerContext.CallerContextHolder.get() != null && CallerContext.CallerContextHolder.get().context() != null