Skip to content

Commit

Permalink
[#5794] feat(core): Add ModelOperationDispatcher logic (#5908)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds the ModelOperationDispatcher logic in core.

### Why are the changes needed?

This is a part of work to support model management.

Fix: #5794 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added UTs to test.
  • Loading branch information
jerryshao authored Dec 23, 2024
1 parent 441e6ed commit 1a22afe
Show file tree
Hide file tree
Showing 17 changed files with 1,000 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.model.ModelCatalog;
import org.apache.gravitino.rel.SupportsPartitions;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
Expand Down Expand Up @@ -178,6 +179,16 @@ public <R> R doWithTopicOps(ThrowableFunction<TopicCatalog, R> fn) throws Except
});
}

public <R> R doWithModelOps(ThrowableFunction<ModelCatalog, R> fn) throws Exception {
return classLoader.withClassLoader(
cl -> {
if (asModels() == null) {
throw new UnsupportedOperationException("Catalog does not support model operations");
}
return fn.apply(asModels());
});
}

public <R> R doWithCatalogOps(ThrowableFunction<CatalogOperations, R> fn) throws Exception {
return classLoader.withClassLoader(cl -> fn.apply(catalog.ops()));
}
Expand Down Expand Up @@ -236,6 +247,10 @@ private FilesetCatalog asFilesets() {
private TopicCatalog asTopics() {
return catalog.ops() instanceof TopicCatalog ? (TopicCatalog) catalog.ops() : null;
}

private ModelCatalog asModels() {
return catalog.ops() instanceof ModelCatalog ? (ModelCatalog) catalog.ops() : null;
}
}

private final Config config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static EntityCombinedFileset of(Fileset fileset) {
return new EntityCombinedFileset(fileset, null);
}

public EntityCombinedFileset withHiddenPropertiesSet(Set<String> hiddenProperties) {
public EntityCombinedFileset withHiddenProperties(Set<String> hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.gravitino.Audit;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.model.Model;

public final class EntityCombinedModel implements Model {

private final Model model;

private final ModelEntity modelEntity;

private Set<String> hiddenProperties = Collections.emptySet();

private EntityCombinedModel(Model model, ModelEntity modelEntity) {
this.model = model;
this.modelEntity = modelEntity;
}

public static EntityCombinedModel of(Model model, ModelEntity modelEntity) {
return new EntityCombinedModel(model, modelEntity);
}

public static EntityCombinedModel of(Model model) {
return new EntityCombinedModel(model, null);
}

public EntityCombinedModel withHiddenProperties(Set<String> hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}

@Override
public String name() {
return model.name();
}

@Override
public String comment() {
return model.comment();
}

@Override
public Map<String, String> properties() {
return model.properties() == null
? null
: model.properties().entrySet().stream()
.filter(e -> !hiddenProperties.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public int latestVersion() {
return model.latestVersion();
}

@Override
public Audit auditInfo() {
AuditInfo mergedAudit =
AuditInfo.builder()
.withCreator(model.auditInfo().creator())
.withCreateTime(model.auditInfo().createTime())
.withLastModifier(model.auditInfo().lastModifier())
.withLastModifiedTime(model.auditInfo().lastModifiedTime())
.build();

return modelEntity == null
? mergedAudit
: mergedAudit.merge(modelEntity.auditInfo(), true /* overwrite */);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.gravitino.Audit;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.ModelVersionEntity;
import org.apache.gravitino.model.ModelVersion;

public final class EntityCombinedModelVersion implements ModelVersion {

private final ModelVersion modelVersion;

private final ModelVersionEntity modelVersionEntity;

private Set<String> hiddenProperties = Collections.emptySet();

private EntityCombinedModelVersion(
ModelVersion modelVersion, ModelVersionEntity modelVersionEntity) {
this.modelVersion = modelVersion;
this.modelVersionEntity = modelVersionEntity;
}

public static EntityCombinedModelVersion of(
ModelVersion modelVersion, ModelVersionEntity modelVersionEntity) {
return new EntityCombinedModelVersion(modelVersion, modelVersionEntity);
}

public static EntityCombinedModelVersion of(ModelVersion modelVersion) {
return new EntityCombinedModelVersion(modelVersion, null);
}

public EntityCombinedModelVersion withHiddenProperties(Set<String> hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}

@Override
public int version() {
return modelVersion.version();
}

@Override
public String comment() {
return modelVersion.comment();
}

@Override
public Map<String, String> properties() {
return modelVersion.properties() == null
? null
: modelVersion.properties().entrySet().stream()
.filter(e -> !hiddenProperties.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public String uri() {
return modelVersion.uri();
}

@Override
public String[] aliases() {
return modelVersion.aliases();
}

@Override
public Audit auditInfo() {
AuditInfo mergedAudit =
AuditInfo.builder()
.withCreator(modelVersion.auditInfo().creator())
.withCreateTime(modelVersion.auditInfo().createTime())
.withLastModifier(modelVersion.auditInfo().lastModifier())
.withLastModifiedTime(modelVersion.auditInfo().lastModifiedTime())
.build();

return modelVersionEntity == null
? mergedAudit
: mergedAudit.merge(modelVersionEntity.auditInfo(), true /* overwrite */);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static EntityCombinedSchema of(Schema schema) {
return of(schema, null);
}

public EntityCombinedSchema withHiddenPropertiesSet(Set<String> hiddenProperties) {
public EntityCombinedSchema withHiddenProperties(Set<String> hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static EntityCombinedTable of(Table table) {
return new EntityCombinedTable(table, null);
}

public EntityCombinedTable withHiddenPropertiesSet(Set<String> hiddenProperties) {
public EntityCombinedTable withHiddenProperties(Set<String> hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static EntityCombinedTopic of(Topic topic) {
return new EntityCombinedTopic(topic, null);
}

public EntityCombinedTopic withHiddenPropertiesSet(Set<String> hiddenProperties) {
public EntityCombinedTopic withHiddenProperties(Set<String> hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException {

// Currently we only support maintaining the Fileset in the Gravitino's store.
return EntityCombinedFileset.of(fileset)
.withHiddenPropertiesSet(
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
Expand Down Expand Up @@ -137,7 +137,7 @@ public Fileset createFileset(
NoSuchSchemaException.class,
FilesetAlreadyExistsException.class);
return EntityCombinedFileset.of(createdFileset)
.withHiddenPropertiesSet(
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
Expand Down Expand Up @@ -172,7 +172,7 @@ public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
NoSuchFilesetException.class,
IllegalArgumentException.class);
return EntityCombinedFileset.of(alteredFileset)
.withHiddenPropertiesSet(
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
Expand Down
Loading

0 comments on commit 1a22afe

Please sign in to comment.