Skip to content

Commit

Permalink
Add SPI support for declaring and analyzing TableFunctions
Browse files Browse the repository at this point in the history
  • Loading branch information
kasiafi committed May 10, 2022
1 parent ee0bb3b commit 4291913
Show file tree
Hide file tree
Showing 14 changed files with 1,003 additions and 0 deletions.
37 changes: 37 additions & 0 deletions core/trino-spi/src/main/java/io/trino/spi/ptf/Argument.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.ptf;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.trino.spi.expression.ConnectorExpression;

/**
* This class represents the three types of arguments passed to a Table Function:
* scalar arguments, descriptor arguments, and table arguments.
* <p>
* This representation should be considered experimental. Eventually, {@link ConnectorExpression}
* should be extended to include the Table Function arguments.
*/
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "@type")
@JsonSubTypes({
@JsonSubTypes.Type(value = DescriptorArgument.class, name = "descriptor"),
@JsonSubTypes.Type(value = ScalarArgument.class, name = "scalar"),
@JsonSubTypes.Type(value = TableArgument.class, name = "table"),
})
public abstract class Argument
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.ptf;

import javax.annotation.Nullable;

import static io.trino.spi.ptf.ConnectorTableFunction.checkArgument;
import static io.trino.spi.ptf.ConnectorTableFunction.checkNotNullOrEmpty;

/**
* Abstract class to capture the three supported argument types for a table function:
* - Table arguments
* - Descriptor arguments
* - SQL scalar arguments
* <p>
* Each argument is named, and either passed positionally or in a `arg_name => value` convention.
* <p>
* Default values are allowed for all arguments except Table arguments.
*/
public abstract class ArgumentSpecification
{
private final String name;
private final boolean required;

// native representation
private final Object defaultValue;

public ArgumentSpecification(String name, boolean required, @Nullable Object defaultValue)
{
this.name = checkNotNullOrEmpty(name, "name");
checkArgument(!required || defaultValue == null, "non-null default value for a required argument");
this.required = required;
this.defaultValue = defaultValue;
}

public String getName()
{
return name;
}

public boolean isRequired()
{
return required;
}

public Object getDefaultValue()
{
return defaultValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.ptf;

import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public abstract class ConnectorTableFunction
{
private final String schema;
private final String name;
private final List<ArgumentSpecification> arguments;
private final ReturnTypeSpecification returnTypeSpecification;

public ConnectorTableFunction(String schema, String name, List<ArgumentSpecification> arguments, ReturnTypeSpecification returnTypeSpecification)
{
this.schema = checkNotNullOrEmpty(schema, "schema");
this.name = checkNotNullOrEmpty(name, "name");
requireNonNull(arguments, "arguments is null");
Set<String> argumentNames = new HashSet<>();
for (ArgumentSpecification specification : arguments) {
if (!argumentNames.add(specification.getName())) {
throw new IllegalArgumentException("duplicate argument name: " + specification.getName());
}
}
long tableArgumentsWithRowSemantics = arguments.stream()
.filter(specification -> specification instanceof TableArgumentSpecification)
.map(TableArgumentSpecification.class::cast)
.filter(TableArgumentSpecification::isRowSemantics)
.count();
checkArgument(tableArgumentsWithRowSemantics <= 1, "more than one table argument with row semantics");
this.arguments = List.copyOf(arguments);
this.returnTypeSpecification = requireNonNull(returnTypeSpecification, "returnTypeSpecification is null");
}

public String getSchema()
{
return schema;
}

public String getName()
{
return name;
}

public List<ArgumentSpecification> getArguments()
{
return arguments;
}

public ReturnTypeSpecification getReturnTypeSpecification()
{
return returnTypeSpecification;
}

/**
* This method is called by the Analyzer. Its main purposes are to:
* 1. Determine the resulting relation type of the Table Function in case when the declared return type is GENERIC_TABLE.
* 2. Declare the dependencies between the input descriptors and the input tables.
* 3. Perform function-specific validation and pre-processing of the input arguments.
* As part of function-specific validation, the Table Function's author might want to:
* - check if the descriptors which reference input tables contain a correct number of column references
* - check if the referenced input columns have appropriate types to fit the function's logic // TODO return request for coercions to the Analyzer in the Analysis object
* - if there is a descriptor which describes the function's output, check if it matches the shape of the actual function's output
* - for table arguments, check the number and types of ordering columns
* <p>
* The actual argument values, and the pre-processing results can be stored in an ConnectorTableFunctionHandle
* object, which will be passed along with the Table Function invocation through subsequent phases of planning.
*
* @param arguments actual invocation arguments, mapped by argument names
*/
public Analysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map<String, Argument> arguments)
{
throw new UnsupportedOperationException("analyze method not implemented for Table Function " + name);
}

/**
* The `analyze()` method should produce an object of this class, containing all the analysis results:
* <p>
* The `returnedType` field is used to inform the Analyzer of the proper columns returned by the Table
* Function, that is, the columns produced by the function, as opposed to the columns passed from the
* input tables. The `returnedType` should only be set if the declared returned type is GENERIC_TABLE.
* <p>
* The `descriptorsToTables` field is used to inform the Analyzer of the semantics of descriptor arguments.
* Some descriptor arguments (or some of their fields) might be references to columns of the input tables.
* In such case, the Analyzer must be informed of those dependencies. It allows to pass the right values
* (input channels) to the Table Function during execution. It also allows to prune unused input columns
* during the optimization phase.
* <p>
* The `handle` field can be used to carry all information necessary to execute the table function,
* gathered at analysis time. Typically, these are the values of the constant arguments, and results
* of pre-processing arguments.
*/
public static class Analysis
{
private final Optional<Descriptor> returnedType;
private final DescriptorMapping descriptorsToTables;
private final ConnectorTableFunctionHandle handle;

public Analysis(Optional<Descriptor> returnedType, DescriptorMapping descriptorsToTables, ConnectorTableFunctionHandle handle)
{
this.returnedType = requireNonNull(returnedType, "returnedType is null");
returnedType.ifPresent(descriptor -> checkArgument(descriptor.isTyped(), "field types not specified"));
this.descriptorsToTables = requireNonNull(descriptorsToTables, "descriptorsToTables is null");
this.handle = requireNonNull(handle, "handle is null");
}

public Optional<Descriptor> getReturnedType()
{
return returnedType;
}

public DescriptorMapping getDescriptorsToTables()
{
return descriptorsToTables;
}

public ConnectorTableFunctionHandle getHandle()
{
return handle;
}
}

static String checkNotNullOrEmpty(String value, String name)
{
requireNonNull(value, name + " is null");
checkArgument(!value.isEmpty(), name + " is empty");
return value;
}

static void checkArgument(boolean assertion, String message)
{
if (!assertion) {
throw new IllegalArgumentException(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.ptf;

/**
* An area to store all information necessary to execute the table function, gathered at analysis time
*/
public interface ConnectorTableFunctionHandle
{
}
97 changes: 97 additions & 0 deletions core/trino-spi/src/main/java/io/trino/spi/ptf/Descriptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.ptf;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.type.Type;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static io.trino.spi.ptf.ConnectorTableFunction.checkArgument;
import static io.trino.spi.ptf.ConnectorTableFunction.checkNotNullOrEmpty;
import static java.util.Objects.requireNonNull;

public class Descriptor
{
private final List<Field> fields;

@JsonCreator
public Descriptor(@JsonProperty("fields") List<Field> fields)
{
requireNonNull(fields, "fields is null");
checkArgument(!fields.isEmpty(), "descriptor has no fields");
this.fields = List.copyOf(fields);
}

public static Descriptor descriptor(String... names)
{
List<Field> fields = Arrays.stream(names)
.map(name -> new Field(name, Optional.empty()))
.collect(Collectors.toList());
return new Descriptor(fields);
}

public static Descriptor descriptor(List<String> names, List<Type> types)
{
requireNonNull(names, "names is null");
requireNonNull(types, "types is null");
checkArgument(names.size() == types.size(), "names and types lists do not match");
List<Field> fields = new ArrayList<>();
for (int i = 0; i < names.size(); i++) {
fields.add(new Field(names.get(i), Optional.of(types.get(i))));
}
return new Descriptor(fields);
}

@JsonProperty
public List<Field> getFields()
{
return fields;
}

public boolean isTyped()
{
return fields.stream().allMatch(field -> field.type.isPresent());
}

public static class Field
{
private final String name;
private final Optional<Type> type;

@JsonCreator
public Field(@JsonProperty("name") String name, @JsonProperty("type") Optional<Type> type)
{
this.name = checkNotNullOrEmpty(name, "name");
this.type = requireNonNull(type, "type is null");
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public Optional<Type> getType()
{
return type;
}
}
}
Loading

0 comments on commit 4291913

Please sign in to comment.