From c3ab57481dca291e0d04c652042ecef694d38b1d Mon Sep 17 00:00:00 2001 From: Robin Arnold Date: Wed, 8 Jun 2022 12:06:58 +0100 Subject: [PATCH] issue #3437 use logical_resource_ident for read and vread with citus Signed-off-by: Robin Arnold --- .../jdbc/citus/CitusResourceDAO.java | 92 ++++++++++++++++++- 1 file changed, 88 insertions(+), 4 deletions(-) diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java index 94f5e4330d2..fc2c352abe4 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java @@ -7,7 +7,11 @@ package com.ibm.fhir.persistence.jdbc.citus; import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; +import java.util.logging.Level; import java.util.logging.Logger; import javax.transaction.TransactionSynchronizationRegistry; @@ -34,9 +38,18 @@ public class CitusResourceDAO extends PostgresResourceDAO { + "SELECT R.RESOURCE_ID, R.LOGICAL_RESOURCE_ID, R.VERSION_ID, R.LAST_UPDATED, R.IS_DELETED, R.DATA, LR.LOGICAL_ID, R.RESOURCE_PAYLOAD_KEY " + " FROM %s_RESOURCES R, " + " %s_LOGICAL_RESOURCES LR " - + " WHERE LR.LOGICAL_ID = ? " - + " AND R.RESOURCE_ID = LR.CURRENT_RESOURCE_ID " - + " AND R.LOGICAL_RESOURCE_ID = LR.LOGICAL_RESOURCE_ID"; // additional predicate using common Citus distribution column + + " WHERE R.RESOURCE_ID = LR.CURRENT_RESOURCE_ID " + + " AND R.LOGICAL_RESOURCE_ID = LR.LOGICAL_RESOURCE_ID " // join must use common Citus distribution column + + " AND LR.LOGICAL_RESOURCE_ID = ? "; // lookup using logical_resource_id + + // Read a specific version of the resource + private static final String SQL_VERSION_READ = "" + + "SELECT R.RESOURCE_ID, R.LOGICAL_RESOURCE_ID, R.VERSION_ID, R.LAST_UPDATED, R.IS_DELETED, R.DATA, LR.LOGICAL_ID, R.RESOURCE_PAYLOAD_KEY " + + " FROM %s_RESOURCES R, " + + " %s_LOGICAL_RESOURCES LR " + + " WHERE LR.LOGICAL_RESOURCE_ID = ? " + + " AND R.LOGICAL_RESOURCE_ID = LR.LOGICAL_RESOURCE_ID " + + " AND R.VERSION_ID = ?"; /** * Public constructor @@ -67,18 +80,87 @@ public CitusResourceDAO(Connection connection, String schemaName, FHIRDbFlavor f super(connection, schemaName, flavor, trxSynchRegistry, cache, rrd, ptdi, shardKey); } + /** + * Read the logical_resource_id value from logical_resource_ident + * @param resourceType + * @param logicalId + * @return + */ + private Long getLogicalResourceIdentId(String resourceType, String logicalId) throws FHIRPersistenceDataAccessException { + final int resourceTypeId = getCache().getResourceTypeCache().getId(resourceType); + final Long logicalResourceId; + final String selectLogicalResourceIdent = "" + + "SELECT logical_resource_id " + + " FROM logical_resource_ident " + + " WHERE resource_type_id = ? " + + " AND logical_id = ? "; // distribution key + try (PreparedStatement ps = getConnection().prepareStatement(selectLogicalResourceIdent)) { + ps.setInt(1, resourceTypeId); + ps.setString(2, logicalId); + ResultSet rs = ps.executeQuery(); + if (rs.next()) { + logicalResourceId = rs.getLong(1); + } else { + logicalResourceId = null; + } + } catch (SQLException x) { + log.log(Level.SEVERE, "read '" + resourceType + "/" + logicalId + "'", x); + throw new FHIRPersistenceDataAccessException("read failed for logical resource ident record"); + } + return logicalResourceId; + } + @Override public Resource read(String logicalId, String resourceType) throws FHIRPersistenceDataAccessException, FHIRPersistenceDBConnectException { final String METHODNAME = "read"; log.entering(CLASSNAME, METHODNAME); + // For Citus we want to first query the logical_resource_ident table because it is + // distributed by the logicalId. This gets us the logical_resource_id value which + // we can then use to access the logical_resource tables which are distributed by + // logical_resource_id + Long logicalResourceId = getLogicalResourceIdentId(resourceType, logicalId); + if (logicalResourceId == null) { + return null; + } + Resource resource = null; List resources; String stmtString = null; try { stmtString = String.format(SQL_READ, resourceType, resourceType); - resources = this.runQuery(stmtString, logicalId); + resources = this.runQuery(stmtString, logicalResourceId); + if (!resources.isEmpty()) { + resource = resources.get(0); + } + } finally { + log.exiting(CLASSNAME, METHODNAME); + } + return resource; + } + + @Override + public Resource versionRead(String logicalId, String resourceType, int versionId) throws FHIRPersistenceDataAccessException, FHIRPersistenceDBConnectException { + final String METHODNAME = "versionRead"; + log.entering(CLASSNAME, METHODNAME); + + // For Citus we want to first query the logical_resource_ident table because it is + // distributed by the logicalId. This gets us the logical_resource_id value which + // we can then use to access the logical_resource tables which are distributed by + // logical_resource_id + Long logicalResourceId = getLogicalResourceIdentId(resourceType, logicalId); + if (logicalResourceId == null) { + return null; + } + + Resource resource = null; + List resources; + String stmtString = null; + + try { + stmtString = String.format(SQL_VERSION_READ, resourceType, resourceType); + resources = this.runQuery(stmtString, logicalResourceId, versionId); if (!resources.isEmpty()) { resource = resources.get(0); } @@ -86,5 +168,7 @@ public Resource read(String logicalId, String resourceType) throws FHIRPersisten log.exiting(CLASSNAME, METHODNAME); } return resource; + } + } \ No newline at end of file