Skip to content

Commit

Permalink
SHOW Catalogs Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: vamsi-amazon <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Oct 24, 2022
1 parent 30a2d27 commit bfad32c
Show file tree
Hide file tree
Showing 22 changed files with 481 additions and 67 deletions.
28 changes: 23 additions & 5 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP;
import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD;
import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
Expand All @@ -25,6 +26,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -60,6 +62,7 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.exception.SemanticCheckException;
Expand Down Expand Up @@ -89,6 +92,7 @@
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.physical.catalog.CatalogTable;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.ParseUtils;

Expand Down Expand Up @@ -129,14 +133,24 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
.stream()
.map(Catalog::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierName catalogSchemaIdentifierName
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), catalogService.getCatalogs());
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), allowedCatalogNames);
String tableName = catalogSchemaIdentifierName.getIdentifierName();
context.push();
TypeEnvironment curEnv = context.peek();
Table table = catalogService
.getStorageEngine(catalogSchemaIdentifierName.getCatalogName())
.getTable(catalogSchemaIdentifierName.getIdentifierName());
Table table;
if (CATALOGS_TABLE_NAME.equals(tableName)) {
table = new CatalogTable(catalogService);
} else {
table = catalogService
.getCatalog(catalogSchemaIdentifierName.getCatalogName())
.getStorageEngine()
.getTable(tableName);
}
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));

// Put index name or its alias in index namespace on type environment so qualifier
Expand All @@ -163,8 +177,12 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
.stream()
.map(Catalog::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierName catalogSchemaIdentifierName
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), catalogService.getCatalogs());
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), allowedCatalogNames);

FunctionName functionName = FunctionName.of(catalogSchemaIdentifierName.getIdentifierName());
List<Expression> arguments = node.getArguments().stream()
Expand Down
29 changes: 22 additions & 7 deletions core/src/main/java/org/opensearch/sql/catalog/CatalogService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,35 @@
package org.opensearch.sql.catalog;

import java.util.Set;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.storage.StorageEngine;

/**
* Catalog Service defines api for
* providing and managing storage engines and execution engines
* for all the catalogs.
* The storage and execution indirectly make connections to the underlying datastore catalog.
* Catalog Service manages catalogs.
*/
public interface CatalogService {

StorageEngine getStorageEngine(String catalog);
/**
* Returns all catalog objects.
*
* @return Catalog Catalogs.
*/
Set<Catalog> getCatalogs();

Set<String> getCatalogs();
/**
* Returns Catalog with corresponding to the catalog name.
*
* @param catalogName Name of the catalog.
* @return Catalog catalog.
*/
Catalog getCatalog(String catalogName);

void registerOpenSearchStorageEngine(StorageEngine storageEngine);
/**
* Default opensearch engine is not defined in catalog.json.
* So the registration of default catalog happens separately.
*
* @param storageEngine StorageEngine.
*/
void registerDefaultOpenSearchCatalog(StorageEngine storageEngine);

}
27 changes: 27 additions & 0 deletions core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.catalog.model;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.storage.StorageEngine;

@Getter
@RequiredArgsConstructor
@EqualsAndHashCode
public class Catalog {

private final String name;

private final ConnectorType connectorType;

@EqualsAndHashCode.Exclude
private final StorageEngine storageEngine;

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.tree.RareTopN.CommandType;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.data.model.ExprCollectionValue;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.LiteralExpression;
import org.opensearch.sql.expression.NamedExpression;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.planner.physical.catalog;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.planner.DefaultImplementor;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.Table;


/**
* Table implementation to handle show catalogs command.
* Since catalog information is not tied to any storage engine, this info
* is handled via Catalog Table.
*
*/
@RequiredArgsConstructor
@EqualsAndHashCode
public class CatalogTable implements Table {

private final CatalogService catalogService;

@Override
public Map<String, ExprType> getFieldTypes() {
return CatalogTableSchema.CATALOG_TABLE_SCHEMA.getMapping();
}

@Override
public PhysicalPlan implement(LogicalPlan plan) {
return plan.accept(new CatalogTableDefaultImplementor(catalogService), null);
}

@VisibleForTesting
@RequiredArgsConstructor
public static class CatalogTableDefaultImplementor
extends DefaultImplementor<Object> {

private final CatalogService catalogService;

@Override
public PhysicalPlan visitRelation(LogicalRelation node, Object context) {
return new CatalogTableScan(catalogService);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.planner.physical.catalog;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.storage.TableScanOperator;

/**
* This class handles table scan of catalog table.
* Right now these are derived from catalogService thorough static fields.
* In future this might scan data from underlying datastore if we start
* persisting catalog info somewhere.
*
*/
public class CatalogTableScan extends TableScanOperator {

private final CatalogService catalogService;

private Iterator<ExprValue> iterator;

public CatalogTableScan(CatalogService catalogService) {
this.catalogService = catalogService;
this.iterator = Collections.emptyIterator();
}

@Override
public String explain() {
return "GetCatalogRequestRequest{}";
}

@Override
public void open() {
List<ExprValue> exprValues = new ArrayList<>();
Set<Catalog> catalogs = catalogService.getCatalogs();
for (Catalog catalog : catalogs) {
exprValues.add(
new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of(
"CATALOG_NAME", ExprValueUtils.stringValue(catalog.getName()),
"CONNECTOR_TYPE", ExprValueUtils.stringValue(catalog.getConnectorType().name())))));
}
iterator = exprValues.iterator();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public ExprValue next() {
return iterator.next();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.physical.catalog;

import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import java.util.LinkedHashMap;
import java.util.Map;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.type.ExprType;

/**
* Definition of the system table schema.
*/
@Getter
@RequiredArgsConstructor
public enum CatalogTableSchema {

CATALOG_TABLE_SCHEMA(new LinkedHashMap<>() {
{
put("CATALOG_NAME", STRING);
put("CONNECTOR_TYPE", STRING);
}
}
);
private final Map<String, ExprType> mapping;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class SystemIndexUtils {
*/
public static final String TABLE_INFO = SYS_META_PREFIX + ".ALL";

public static final String CATALOGS_TABLE_NAME = ".CATALOGS";


public static Boolean isSystemIndex(String indexName) {
return indexName.startsWith(SYS_TABLES_PREFIX);
Expand Down
12 changes: 10 additions & 2 deletions core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

package org.opensearch.sql.analysis;

import static java.lang.Boolean.TRUE;
import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.opensearch.sql.ast.dsl.AstDSL.aggregate;
import static org.opensearch.sql.ast.dsl.AstDSL.alias;
import static org.opensearch.sql.ast.dsl.AstDSL.argument;
Expand All @@ -29,7 +31,6 @@
import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder;
import static org.opensearch.sql.data.model.ExprValueUtils.integerValue;
import static org.opensearch.sql.data.type.ExprCoreType.ARRAY;
import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE;
import static org.opensearch.sql.data.type.ExprCoreType.INTEGER;
import static org.opensearch.sql.data.type.ExprCoreType.LONG;
Expand Down Expand Up @@ -67,8 +68,10 @@
import org.opensearch.sql.expression.window.WindowDefinition;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanDSL;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.physical.catalog.CatalogTable;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -953,7 +956,6 @@ public void ad_fitRCF_relation() {
);
}


@Test
public void table_function() {
assertAnalyzeEqual(new LogicalRelation("query_range", table),
Expand Down Expand Up @@ -998,5 +1000,11 @@ public void table_function_with_wrong_table_function() {
assertEquals("unsupported function name: queryrange", exception.getMessage());
}

@Test
public void show_catalogs() {
assertAnalyzeEqual(new LogicalRelation(".CATALOGS", new CatalogTable(catalogService)),
AstDSL.relation(qualifiedName(".CATALOGS")));

}

}
Loading

0 comments on commit bfad32c

Please sign in to comment.