Skip to content

Commit

Permalink
issue #3437 distribute add_any_resource function in Citus
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Arnold <robin.arnold@ibm.com>
  • Loading branch information
punktilious committed Mar 18, 2022
1 parent a7f2379 commit 4398119
Show file tree
Hide file tree
Showing 9 changed files with 1,855 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,14 @@ public default boolean useSessionVariable() {
*/
public void createOrReplaceFunction(String schemaName, String objectName, Supplier<String> supplier);

/**
* For Citus, functions can be distributed by one of their parameters (typically the first)
* @param schemaName
* @param functionName
* @param distributeByParamNumber
*/
public void distributeFunction(String schemaName, String functionName, int distributeByParamNumber);

/**
* drops a given function
* @param schemaName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

package com.ibm.fhir.database.utils.citus;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;
Expand Down Expand Up @@ -172,4 +176,46 @@ public void applyDistributionRules(String schemaName, String tableName, Distribu
runStatement(dao);
}
}

@Override
public void distributeFunction(String schemaName, String functionName, int distributeByParamNumber) {
if (distributeByParamNumber < 1) {
throw new IllegalArgumentException("invalid distributeByParamNumber value: " + distributeByParamNumber);
}
// Need to get the signature text first in order to build the create_distribution_function
// statement
final String objectName = DataDefinitionUtil.getQualifiedName(schemaName, functionName);
final String SELECT =
"SELECT p.oid::regproc || '(' || pg_get_function_identity_arguments(p.oid) || ')' " +
" FROM pg_catalog.pg_proc p " +
" WHERE p.oid::regproc::text = LOWER(?)";

if (connectionProvider != null) {
try (Connection c = connectionProvider.getConnection()) {
String functionSig = null;
try (PreparedStatement ps = c.prepareStatement(SELECT)) {
ps.setString(1, objectName);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
functionSig = rs.getString(1);
}
}

if (functionSig != null) {
final String DISTRIBUTE = "SELECT create_distributed_function(?, ?)";
try (PreparedStatement ps = c.prepareStatement(DISTRIBUTE)) {
ps.setString(1, functionSig);
ps.setString(2, "$" + distributeByParamNumber);
ps.executeQuery(DISTRIBUTE);
}
} else {
logger.warning("No matching function found for '" + objectName + "'");
}
} catch (SQLException x) {
throw getTranslator().translate(x);
}
} else {
throw new IllegalStateException("distributeFunction requires a connectionProvider");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -753,4 +753,9 @@ public void reorgTable(String schemaName, String tableName) {
public void applyDistributionRules(String schemaName, String tableName, DistributionRules distributionRules) {
// NOP. Only used for distributed databases like Citus
}

@Override
public void distributeFunction(String schemaName, String functionName, int distributeByParamNumber) {
// NOP. Only used for distributed databases like Citus
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,30 @@ public class FunctionDef extends BaseObject {
// supplier provides the procedure body when requested
private Supplier<String> supplier;

// When >0, indicates that this function should be distributed
private final int distributeByParamNum;

/**
* Public constructor
* Public constructor. Supports distribution of the function by the given parameter number
*
* @param schemaName
* @param procedureName
* @param version
* @param supplier
* @param distributeByParamNum
*/
public FunctionDef(String schemaName, String procedureName, int version, Supplier<String> supplier) {
public FunctionDef(String schemaName, String procedureName, int version, Supplier<String> supplier, int distributeByParamNum) {
super(schemaName, procedureName, DatabaseObjectType.PROCEDURE, version);
this.supplier = supplier;
this.distributeByParamNum = distributeByParamNum;
}

@Override
public void apply(IDatabaseAdapter target, SchemaApplyContext context) {
target.createOrReplaceFunction(getSchemaName(), getObjectName(), supplier);
if (distributeByParamNum > 0) {
target.distributeFunction(getSchemaName(), getObjectName(), distributeByParamNum);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,24 @@ public ProcedureDef addProcedure(String schemaName, String objectName, int versi
*/
public FunctionDef addFunction(String schemaName, String objectName, int version, Supplier<String> templateProvider,
Collection<IDatabaseObject> dependencies, Collection<GroupPrivilege> privileges) {
FunctionDef func = new FunctionDef(schemaName, objectName, version, templateProvider);
return addFunction(schemaName, objectName, version, templateProvider, dependencies, privileges, 0);
}

/**
* adds the function to the model.
*
* @param schemaName
* @param objectName
* @param version
* @param templateProvider
* @param dependencies
* @param privileges
* @param distributeByParamNum
* @return
*/
public FunctionDef addFunction(String schemaName, String objectName, int version, Supplier<String> templateProvider,
Collection<IDatabaseObject> dependencies, Collection<GroupPrivilege> privileges, int distributeByParamNum) {
FunctionDef func = new FunctionDef(schemaName, objectName, version, templateProvider, distributeByParamNum);
privileges.forEach(p -> p.addToObject(func));

if (dependencies != null) {
Expand Down
Loading

0 comments on commit 4398119

Please sign in to comment.