Skip to content

Commit

Permalink
issue #3437 fixed issues using Citus
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Arnold <robin.arnold@ibm.com>
  • Loading branch information
punktilious committed Jun 1, 2022
1 parent 246897f commit 24bb883
Show file tree
Hide file tree
Showing 19 changed files with 400 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,21 @@ public interface ISchemaAdapter {
* @param tablespaceName
* @param withs
* @param checkConstraints
* @param distributionRules
* @param distributionType
* @param distributionColumnName
*/
public void createTable(String schemaName, String name, String tenantColumnName, List<ColumnBase> columns,
PrimaryKeyDef primaryKey, IdentityDef identity, String tablespaceName, List<With> withs, List<CheckConstraint> checkConstraints,
DistributionType distributionRules);
DistributionType distributionType, String distributionColumnName);

/**
* Apply any distribution rules configured for the named table
* @param schemaName
* @param tableName
* @param distributionRules
* @param distributionType
* @param distributionColumnName
*/
public void applyDistributionRules(String schemaName, String tableName, DistributionType distributionRules);
public void applyDistributionRules(String schemaName, String tableName, DistributionType distributionType, String distributionColumnName);

/**
* Add a new column to an existing table
Expand Down Expand Up @@ -157,10 +159,12 @@ public void createTable(String schemaName, String name, String tenantColumnName,
* @param tenantColumnName
* @param indexColumns
* @param includeColumns
* @param distributionRules
* @param distributionType
* @param distributionColumnName
*/
public void createUniqueIndex(String schemaName, String tableName, String indexName, String tenantColumnName,
List<OrderedColumnDef> indexColumns, List<String> includeColumns, DistributionType distributionRules);
List<OrderedColumnDef> indexColumns, List<String> includeColumns,
DistributionType distributionType, String distributionColumnName);

/**
* Create a unique index
Expand All @@ -170,9 +174,10 @@ public void createUniqueIndex(String schemaName, String tableName, String indexN
* @param tenantColumnName
* @param indexColumns
* @param distributionRules
* @param distributionColumnName
*/
public void createUniqueIndex(String schemaName, String tableName, String indexName, String tenantColumnName,
List<OrderedColumnDef> indexColumns, DistributionType distributionRules);
List<OrderedColumnDef> indexColumns, DistributionType distributionType, String distributionColumnName);

/**
* Create an index on the named schema.table object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private String buildCitusCreateTableStatement(String schema, String name, List<C

final DistributionType distributionType = distributionContext.getDistributionType();
final String distributionColumnName = distributionContext.getDistributionColumnName();
if (identity != null && distributionType == DistributionType.DISTRIBUTED) {
if (identity != null && (distributionType == DistributionType.DISTRIBUTED || distributionType == DistributionType.REFERENCE)) {
logger.warning("Citus: Ignoring IDENTITY columns on distributed table: '" + name + "." + identity.getColumnName());
identity = null;
}
Expand Down Expand Up @@ -171,12 +171,16 @@ public void createUniqueIndex(String schemaName, String tableName, String indexN
@Override
public void applyDistributionRules(String schemaName, String tableName, DistributionContext distributionContext) {
// Apply the distribution rules. Tables without distribution rules are created
// only on Citus controller nodes and never distributed to the worker nodes. All
// the distribution changes are implemented in one transaction, which makes it much
// more efficient.
// only on Citus controller nodes and never distributed to the worker nodes.
final String fullName = DataDefinitionUtil.getQualifiedName(schemaName, tableName);
CitusDistributionCheckDAO distributionCheck = new CitusDistributionCheckDAO(schemaName, tableName);
if (runStatement(distributionCheck)) {
logger.info("Table '" + fullName + "' is already distributed");
return;
}

final DistributionType distributionType = distributionContext.getDistributionType();
final String distributionColumnName = distributionContext.getDistributionColumnName();
final String fullName = DataDefinitionUtil.getQualifiedName(schemaName, tableName);
if (distributionType == DistributionType.REFERENCE) {
// A table that is fully replicated for each worker node
logger.info("Citus: distributing reference table '" + fullName + "'");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.database.utils.citus;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.logging.Logger;

import com.ibm.fhir.database.utils.api.IDatabaseSupplier;
import com.ibm.fhir.database.utils.api.IDatabaseTranslator;
import com.ibm.fhir.database.utils.common.DataDefinitionUtil;

/**
* DAO to check if the table is already distributed
*/
public class CitusDistributionCheckDAO implements IDatabaseSupplier<Boolean> {
private static final Logger logger = Logger.getLogger(CitusDistributionCheckDAO.class.getName());

private final String schemaName;
private final String tableName;

/**
* Public constructor
*
* @param schemaName
* @param tableName
*/
public CitusDistributionCheckDAO(String schemaName, String tableName) {
DataDefinitionUtil.assertValidName(schemaName);
DataDefinitionUtil.assertValidName(tableName);
this.schemaName = schemaName.toLowerCase();
this.tableName = tableName.toLowerCase();
}

@Override
public Boolean run(IDatabaseTranslator translator, Connection c) {
Boolean result = Boolean.FALSE;

final String relname = DataDefinitionUtil.getQualifiedName(schemaName, this.tableName);
final String SQL = "SELECT 1 FROM pg_dist_partition WHERE logicalrelid = ?::regclass";

try (PreparedStatement ps = c.prepareStatement(SQL)) {
ps.setString(1, relname);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
result = Boolean.TRUE;
}
} catch (SQLException x) {
// Translate the exception into something a little more meaningful
// for this database type and application
logger.severe("select failed: " + SQL + " for logicalrelid = '" + relname + "'");
throw translator.translate(x);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public void detachPartition(String schemaName, String tableName, String partitio

@Override
public void createTable(String schemaName, String name, String tenantColumnName, List<ColumnBase> columns, PrimaryKeyDef primaryKey, IdentityDef identity,
String tablespaceName, List<With> withs, List<CheckConstraint> checkConstraints, DistributionType distributionRules) {
String tablespaceName, List<With> withs, List<CheckConstraint> checkConstraints, DistributionType distributionType, String distributionColumnName) {
databaseAdapter.createTable(schemaName, name, tenantColumnName, columns, primaryKey, identity, tablespaceName, withs, checkConstraints, null);
}

@Override
public void applyDistributionRules(String schemaName, String tableName, DistributionType distributionRules) {
public void applyDistributionRules(String schemaName, String tableName, DistributionType distributionType, String distributionColumnName) {
databaseAdapter.applyDistributionRules(schemaName, tableName, null);
}

Expand Down Expand Up @@ -112,13 +112,13 @@ public void dropProcedure(String schemaName, String procedureName) {

@Override
public void createUniqueIndex(String schemaName, String tableName, String indexName, String tenantColumnName, List<OrderedColumnDef> indexColumns,
List<String> includeColumns, DistributionType distributionRules) {
List<String> includeColumns, DistributionType distributionType, String distributionColumnName) {
databaseAdapter.createUniqueIndex(schemaName, tableName, indexName, tenantColumnName, indexColumns, includeColumns, null);
}

@Override
public void createUniqueIndex(String schemaName, String tableName, String indexName, String tenantColumnName, List<OrderedColumnDef> indexColumns,
DistributionType distributionRules) {
DistributionType distributionType, String distributionColumnName) {
databaseAdapter.createUniqueIndex(schemaName, tableName, indexName, tenantColumnName, indexColumns, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class CreateIndex extends BaseObject {

// Distribution rules if the associated table is distributed
private final DistributionType distributionType;
private final String distributionColumnName;

/**
* Protected constructor. Use the Builder to create instance.
Expand All @@ -43,12 +44,14 @@ public class CreateIndex extends BaseObject {
* @param distributionType
*/
protected CreateIndex(String schemaName, String versionTrackingName, String tableName, int version, IndexDef indexDef, String tenantColumnName,
DistributionType distributionType) {
DistributionType distributionType, String distributionColumnName) {
super(schemaName, versionTrackingName, DatabaseObjectType.INDEX, version);
this.tableName = tableName;
this.indexDef = indexDef;
this.tenantColumnName = tenantColumnName;
this.distributionType = distributionType;
this.distributionColumnName = distributionColumnName;

}

/**
Expand Down Expand Up @@ -95,7 +98,7 @@ public String getTypeNameVersion() {
@Override
public void apply(ISchemaAdapter target, SchemaApplyContext context) {
long start = System.nanoTime();
indexDef.apply(getSchemaName(), getTableName(), tenantColumnName, target, distributionType);
indexDef.apply(getSchemaName(), getTableName(), tenantColumnName, target, distributionType, distributionColumnName);

if (logger.isLoggable(Level.FINE)) {
long end = System.nanoTime();
Expand Down Expand Up @@ -162,6 +165,7 @@ public static class Builder {

// Set if the table is distributed
private DistributionType distributionType = DistributionType.NONE;
private String distributionColumnName;

/**
* @param schemaName the schemaName to set
Expand Down Expand Up @@ -204,6 +208,16 @@ public Builder setDistributionType(DistributionType dt) {
return this;
}

/**
* Setter for distributionColumnName
* @param distributionColumnName
* @return
*/
public Builder setDistributionColumnName(String distributionColumnName) {
this.distributionColumnName = distributionColumnName;
return this;
}

/**
* @param version the version to set
*/
Expand Down Expand Up @@ -258,7 +272,7 @@ public CreateIndex build() {
}

return new CreateIndex(schemaName, versionTrackingName, tableName, version,
new IndexDef(indexName, indexCols, unique), tenantColumnName, distributionType);
new IndexDef(indexName, indexCols, unique), tenantColumnName, distributionType, distributionColumnName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ public boolean isUnique() {
* @param distributionRules
*/
public void apply(String schemaName, String tableName, String tenantColumnName, ISchemaAdapter target,
DistributionType distributionType) {
DistributionType distributionType, String distributionColumn) {
if (includeColumns != null && includeColumns.size() > 0) {
target.createUniqueIndex(schemaName, tableName, indexName, tenantColumnName, indexColumns, includeColumns, distributionType);
target.createUniqueIndex(schemaName, tableName, indexName, tenantColumnName, indexColumns, includeColumns, distributionType, distributionColumn);
}
else if (unique) {
target.createUniqueIndex(schemaName, tableName, indexName, tenantColumnName, indexColumns, distributionType);
target.createUniqueIndex(schemaName, tableName, indexName, tenantColumnName, indexColumns, distributionType, distributionColumn);
}
else {
target.createIndex(schemaName, tableName, indexName, tenantColumnName, indexColumns, distributionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,52 @@ public void apply(ISchemaAdapter target, SchemaApplyContext context) {

/**
* Make a pass over all the objects and apply any distribution rules they
* may have (e.g. for Citus)
* may have (e.g. for Citus). We have to process a large number of tables,
* which can cause shared memory issues for Citus if we try and do this in
* a single transaction, hence the need for a transactionSupplier
* @param target
*/
public void applyDistributionRules(ISchemaAdapter target) {
public void applyDistributionRules(ISchemaAdapter target, Supplier<ITransaction> transactionSupplier) {

// takes a long time, so track progress
int total = allObjects.size() * 2;
int count = 0;
int objectsPerMessage = total / 100; // 1% increments
int nextCount = objectsPerMessage;
// make a first pass to apply reference rules
for (IDatabaseObject obj: allObjects) {
obj.applyDistributionRules(target, 0);
try (ITransaction tx = transactionSupplier.get()) {
try {
obj.applyDistributionRules(target, 0);

if (++count >= nextCount) {
int pc = 100 * nextCount / total;
logger.info("Progress: [" + pc + "% complete]");
nextCount += objectsPerMessage;
}
} catch (RuntimeException x) {
tx.setRollbackOnly();
throw x;
}
}
}

// and another pass to apply sharding rules
for (IDatabaseObject obj: allObjects) {
obj.applyDistributionRules(target, 1);
try (ITransaction tx = transactionSupplier.get()) {
try {
obj.applyDistributionRules(target, 1);

if (++count >= nextCount) {
int pc = 100 * nextCount / total;
logger.info("Progress: [" + pc + "% complete]");
nextCount += objectsPerMessage;
}
} catch (RuntimeException x) {
tx.setRollbackOnly();
throw x;
}
}
}
}

Expand Down Expand Up @@ -300,12 +333,42 @@ public void dropForeignKeyConstraints(ISchemaAdapter target, String tagGroup, St
* @param v
* @param tagGroup
* @param tag
*/
public void visit(DataModelVisitor v, final String tagGroup, final String tag) {
// visit just the matching subset of objects
this.allObjects.stream()
.filter(obj -> tag == null || obj.getTags().get(tagGroup) != null && tag.equals(obj.getTags().get(tagGroup)))
.forEach(obj -> obj.visit(v));
* @param transactionSupplier
*/
public void visit(DataModelVisitor v, final String tagGroup, final String tag, Supplier<ITransaction> transactionSupplier) {
// visit just the matching subset of objects. If a transactionSupplier has been provided, we break up the
// operation into multiple transactions to avoid transaction size limitations (e.g. with Citus FK creation)
if (transactionSupplier != null) {
ITransaction tx = transactionSupplier.get();
try {
int count = 0;
for (IDatabaseObject obj: allObjects) {
if (tag == null || obj.getTags().get(tagGroup) != null && tag.equals(obj.getTags().get(tagGroup))) {
if (++count == 10) {
// commit the current transaction and start a fresh one
tx.close();
tx = transactionSupplier.get();
count = 0;
}

try {
obj.visit(v);
} catch (RuntimeException x) {
tx.setRollbackOnly();
throw x;
}
}

}
} finally {
tx.close();
}
} else {
// the old way, which will visit everything in the scope of one transaction
this.allObjects.stream()
.filter(obj -> tag == null || obj.getTags().get(tagGroup) != null && tag.equals(obj.getTags().get(tagGroup)))
.forEach(obj -> obj.visit(v));
}
}

/**
Expand Down
Loading

0 comments on commit 24bb883

Please sign in to comment.