Skip to content

Commit

Permalink
issue #3713 use distributed add_any_resource function for citus (#3718)
Browse files Browse the repository at this point in the history
* issue #3713 use distributed add_any_resource function for citus

Signed-off-by: Robin Arnold <robin.arnold@ibm.com>

* issue #3437 fix usage of logicalResourceId and resourceId for remote index

Signed-off-by: Robin Arnold <robin.arnold@ibm.com>
  • Loading branch information
punktilious authored Jun 21, 2022
1 parent c446a15 commit 583903a
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,12 @@ public void distributeFunction(String schemaName, String functionName, int distr
throw new IllegalArgumentException("invalid distributeByParamNumber value: " + distributeByParamNumber);
}
// Need to get the signature text first in order to build the create_distribution_function
// statement
// statement. Note the cast to ::regprocedure will return a string like this:
// "fhirdata.add_logical_resource_ident(integer,character varying)"
// which can be passed in to the Citus create_distributed_function procedure
final String objectName = DataDefinitionUtil.getQualifiedName(schemaName, functionName);
final String SELECT =
"SELECT p.oid::regproc || '(' || pg_get_function_identity_arguments(p.oid) || ')' " +
"SELECT p.oid::regprocedure " +
" FROM pg_catalog.pg_proc p " +
" WHERE p.oid::regproc::text = LOWER(?)";

Expand All @@ -216,14 +218,23 @@ public void distributeFunction(String schemaName, String functionName, int distr
if (rs.next()) {
functionSig = rs.getString(1);
}

if (rs.next()) {
final String fn = DataDefinitionUtil.getQualifiedName(schemaName, functionName);
logger.severe("Overloaded function signature: " + fn + " " + functionSig);
functionSig = rs.getString(1);
logger.severe("Overloaded function signature: " + fn + " " + functionSig);
throw new IllegalStateException("Overloading not supported for function '" + fn + "'");
}
}

if (functionSig != null) {
final String DISTRIBUTE = "SELECT create_distributed_function(?, ?)";
logger.info("Distributing function: " + functionSig);
final String DISTRIBUTE = "SELECT create_distributed_function(?::regprocedure, ?::text)";
try (PreparedStatement ps = c.prepareStatement(DISTRIBUTE)) {
ps.setString(1, functionSig);
ps.setString(2, "$" + distributeByParamNumber);
ps.executeQuery(DISTRIBUTE);
ps.execute();
}
} else {
logger.warning("No matching function found for '" + objectName + "'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package com.ibm.fhir.database.utils.common;

import java.io.InputStream;
import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
Expand Down Expand Up @@ -101,6 +103,23 @@ public PreparedStatementHelper setString(String value) throws SQLException {
return this;
}

/**
* Set the (possibly null) InputStream value at the current position
* and increment the position by 1
* @param value
* @return this instance
* @throws SQLException
*/
public PreparedStatementHelper setBinaryStream(InputStream value) throws SQLException {
if (value != null) {
ps.setBinaryStream(index, value);
} else {
ps.setNull(index, Types.BINARY);
}
index++;
return this;
}

/**
* Set the (possibly null) int value at the current position
* and increment the position by 1
Expand All @@ -118,6 +137,23 @@ public PreparedStatementHelper setTimestamp(Timestamp value) throws SQLException
return this;
}

/**
* Register an OUT parameter, assuming the delegate is a CallableStatement
* @param parameterType from {@link java.sql.Types}
* @return the parameter index of the OUT parameter
* @throws SQLException
*/
public int registerOutParameter(int parameterType) throws SQLException {
int idx = index++;
if (ps instanceof CallableStatement) {
CallableStatement cs = (CallableStatement)ps;
cs.registerOutParameter(idx, parameterType);
} else {
throw new IllegalStateException("Delegate is not a CallableStatement");
}
return idx;
}

/**
* Add a new batch entry based on the current state of the {@link PreparedStatement}.
* Note that we don't return this on purpose...because addBatch should be last in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,39 @@

package com.ibm.fhir.persistence.jdbc.citus;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Types;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.transaction.TransactionSynchronizationRegistry;

import com.ibm.fhir.database.utils.common.PreparedStatementHelper;
import com.ibm.fhir.persistence.InteractionStatus;
import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException;
import com.ibm.fhir.persistence.exception.FHIRPersistenceException;
import com.ibm.fhir.persistence.exception.FHIRPersistenceVersionIdMismatchException;
import com.ibm.fhir.persistence.index.FHIRRemoteIndexService;
import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache;
import com.ibm.fhir.persistence.jdbc.connection.FHIRDbFlavor;
import com.ibm.fhir.persistence.jdbc.dao.api.FHIRDAOConstants;
import com.ibm.fhir.persistence.jdbc.dao.api.IResourceReferenceDAO;
import com.ibm.fhir.persistence.jdbc.dao.api.JDBCIdentityCache;
import com.ibm.fhir.persistence.jdbc.dao.api.ParameterDAO;
import com.ibm.fhir.persistence.jdbc.dao.impl.JDBCIdentityCacheImpl;
import com.ibm.fhir.persistence.jdbc.dao.impl.ParameterVisitorBatchDAO;
import com.ibm.fhir.persistence.jdbc.dto.ExtractedParameterValue;
import com.ibm.fhir.persistence.jdbc.dto.Resource;
import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException;
import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceFKVException;
import com.ibm.fhir.persistence.jdbc.impl.ParameterTransactionDataImpl;
import com.ibm.fhir.persistence.jdbc.postgres.PostgresResourceDAO;

Expand All @@ -33,6 +50,14 @@ public class CitusResourceDAO extends PostgresResourceDAO {
private static final String CLASSNAME = CitusResourceDAO.class.getName();
private static final Logger log = Logger.getLogger(CLASSNAME);

// @formatter:off
// 0 1
// 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
// @formatter:on
// Don't forget that we must account for IN and OUT parameters.
private static final String SQL_INSERT_WITH_PARAMETERS = "{ CALL %s.add_any_resource(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) }";
private static final String SQL_LOGICAL_RESOURCE_IDENT = "{ CALL %s.add_logical_resource_ident(?,?,?) }";

// Read the current version of the resource (even if the resource has been deleted)
private static final String SQL_READ = ""
+ "SELECT R.RESOURCE_ID, R.LOGICAL_RESOURCE_ID, R.VERSION_ID, R.LAST_UPDATED, R.IS_DELETED, R.DATA, LR.LOGICAL_ID, R.RESOURCE_PAYLOAD_KEY "
Expand Down Expand Up @@ -171,4 +196,129 @@ public Resource versionRead(String logicalId, String resourceType, int versionId

}

@Override
public Resource insert(Resource resource, List<ExtractedParameterValue> parameters, String parameterHashB64,
ParameterDAO parameterDao, Integer ifNoneMatch)
throws FHIRPersistenceException {
final String METHODNAME = "insert(Resource, List<ExtractedParameterValue, ParameterDAO>";
log.entering(CLASSNAME, METHODNAME);

final Connection connection = getConnection(); // do not close
long dbCallStartTime;
double dbCallDuration;

try {
// Just make sure this resource type is known to the database before we
// hit the procedure
Integer resourceTypeId = getResourceTypeId(resource.getResourceType());
Objects.requireNonNull(resourceTypeId);

// For Citus, we first make a call to establish the logical_resource_ident record
long logicalResourceId = createOrLockLogicalResourceIdent(resourceTypeId, resource.getLogicalId());

final String stmtString = String.format(SQL_INSERT_WITH_PARAMETERS, getSchemaName());
try (CallableStatement stmt = connection.prepareCall(stmtString)) {
PreparedStatementHelper psh = new PreparedStatementHelper(stmt);

psh.setLong(logicalResourceId);
psh.setInt(resourceTypeId);
psh.setString(resource.getResourceType());
psh.setString(resource.getLogicalId());
psh.setBinaryStream(resource.getDataStream() != null ? resource.getDataStream().inputStream() : null);
psh.setTimestamp(resource.getLastUpdated());
psh.setString(resource.isDeleted() ? "Y": "N");
psh.setString(UUID.randomUUID().toString());
psh.setInt(resource.getVersionId());
psh.setString(parameterHashB64);
psh.setInt(ifNoneMatch);
psh.setString(resource.getResourcePayloadKey());

final int oldParameterHashIndex = psh.registerOutParameter(Types.VARCHAR);
final int interactionStatusIndex = psh.registerOutParameter(Types.INTEGER);
final int ifNoneMatchVersionIndex = psh.registerOutParameter(Types.INTEGER);

dbCallStartTime = System.nanoTime();
stmt.execute();
dbCallDuration = (System.nanoTime()-dbCallStartTime)/1e6;

resource.setLogicalResourceId(logicalResourceId);
if (stmt.getInt(interactionStatusIndex) == 1) { // interaction status
// no change, so skip parameter updates
resource.setInteractionStatus(InteractionStatus.IF_NONE_MATCH_EXISTED);
resource.setIfNoneMatchVersion(stmt.getInt(ifNoneMatchVersionIndex)); // current version
} else {
resource.setInteractionStatus(InteractionStatus.MODIFIED);

// Parameter time
// To keep things simple for the postgresql use-case, we just use a visitor to
// handle inserts of parameters directly in the resource parameter tables.
// Note we don't get any parameters for the resource soft-delete operation
// Bypass the parameter insert here if we have the remoteIndexService configured
FHIRRemoteIndexService remoteIndexService = FHIRRemoteIndexService.getServiceInstance();
final String currentParameterHash = stmt.getString(oldParameterHashIndex);
if (remoteIndexService == null
&& parameters != null && (parameterHashB64 == null || parameterHashB64.isEmpty()
|| !parameterHashB64.equals(currentParameterHash))) {
// postgresql doesn't support partitioned multi-tenancy, so we disable it on the DAO:
JDBCIdentityCache identityCache = new JDBCIdentityCacheImpl(getCache(), this, parameterDao, getResourceReferenceDAO());
try (ParameterVisitorBatchDAO pvd = new ParameterVisitorBatchDAO(connection, null, resource.getResourceType(), false, resource.getLogicalResourceId(), 100,
identityCache, getResourceReferenceDAO(), getTransactionData())) {
for (ExtractedParameterValue p: parameters) {
p.accept(pvd);
}
}
}
}
if (log.isLoggable(Level.FINE)) {
log.fine("Successfully inserted Resource. logicalResourceId=" + resource.getLogicalResourceId() + " executionTime=" + dbCallDuration + "ms");
}
}
} catch(FHIRPersistenceDBConnectException | FHIRPersistenceDataAccessException e) {
throw e;
} catch(SQLIntegrityConstraintViolationException e) {
FHIRPersistenceFKVException fx = new FHIRPersistenceFKVException("Encountered FK violation while inserting Resource.");
throw severe(log, fx, e);
} catch(SQLException e) {
if (FHIRDAOConstants.SQLSTATE_WRONG_VERSION.equals(e.getSQLState())) {
// this is just a concurrency update, so there's no need to log the SQLException here
throw new FHIRPersistenceVersionIdMismatchException("Encountered version id mismatch while inserting Resource");
} else {
FHIRPersistenceDataAccessException fx = new FHIRPersistenceDataAccessException("SQLException encountered while inserting Resource.");
throw severe(log, fx, e);
}
} catch(Throwable e) {
FHIRPersistenceDataAccessException fx = new FHIRPersistenceDataAccessException("Failure inserting Resource.");
throw severe(log, fx, e);
} finally {
log.exiting(CLASSNAME, METHODNAME);
}

return resource;
}

/**
* Call the ADD_LOGICAL_RESOURCE_IDENT procedure to create or lock (select for update)
* the logical_resource_ident record. For Citus we run this step first because this
* function is distributed by the logical_id parameter.
* @param resourceTypeId
* @param logicalId
* @return
* @throws SQLException
*/
protected long createOrLockLogicalResourceIdent(int resourceTypeId, String logicalId) throws SQLException {
long logicalResourceId;

final String stmtString = String.format(SQL_LOGICAL_RESOURCE_IDENT, getSchemaName());
try (CallableStatement cs = getConnection().prepareCall(stmtString)) {
PreparedStatementHelper psh = new PreparedStatementHelper(cs);
psh.setInt(resourceTypeId);
psh.setString(logicalId);
int idxLogicalResourceId = psh.registerOutParameter(Types.BIGINT);
cs.execute();
logicalResourceId = cs.getLong(idxLogicalResourceId);
}

// At this point the logical_resource_ident record will be locked for update
return logicalResourceId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ protected Resource createDTO(ResultSet resultSet, boolean hasResourceTypeId) thr
if (payloadData != null) {
resource.setDataStream(new InputOutputByteStream(payloadData, payloadData.length));
}
resource.setId(resultSet.getLong(IDX_RESOURCE_ID));
resource.setResourceId(resultSet.getLong(IDX_RESOURCE_ID));
resource.setLogicalResourceId(resultSet.getLong(IDX_LOGICAL_RESOURCE_ID));
resource.setLastUpdated(resultSet.getTimestamp(IDX_LAST_UPDATED, CalendarHelper.getCalendarForUTC()));
resource.setLogicalId(resultSet.getString(IDX_LOGICAL_ID));
Expand Down Expand Up @@ -541,7 +541,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
long latestTime = System.nanoTime();
double dbCallDuration = (latestTime-dbCallStartTime)/1e6;

resource.setId(stmt.getLong(10));
resource.setLogicalResourceId(stmt.getLong(10));
final long versionedResourceRowId = stmt.getLong(11);
final String currentHash = stmt.getString(12);
final int interactionStatus = stmt.getInt(13);
Expand Down Expand Up @@ -580,7 +580,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
|| !parameterHashB64.equals(currentHash))) {
JDBCIdentityCache identityCache = new JDBCIdentityCacheImpl(cache, this, parameterDao, getResourceReferenceDAO());
try (ParameterVisitorBatchDAO pvd = new ParameterVisitorBatchDAO(connection, "FHIR_ADMIN", resource.getResourceType(), true,
resource.getId(), 100, identityCache, resourceReferenceDAO, this.transactionData)) {
resource.getLogicalResourceId(), 100, identityCache, resourceReferenceDAO, this.transactionData)) {
for (ExtractedParameterValue p: parameters) {
p.accept(pvd);
}
Expand All @@ -591,7 +591,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
latestTime = System.nanoTime();
double totalDuration = (latestTime - dbCallStartTime) / 1e6;
double paramInsertDuration = (latestTime-paramInsertStartTime)/1e6;
log.fine("Successfully inserted Resource. id=" + resource.getId() + " total=" + totalDuration + "ms, proc=" + dbCallDuration + "ms, param=" + paramInsertDuration + "ms");
log.fine("Successfully inserted Resource. logicalResourceId=" + resource.getLogicalResourceId() + " total=" + totalDuration + "ms, proc=" + dbCallDuration + "ms, param=" + paramInsertDuration + "ms");
}
}
} catch (FHIRPersistenceDBConnectException |
Expand Down
Loading

0 comments on commit 583903a

Please sign in to comment.