diff --git a/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/common/ResultSetReader.java b/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/common/ResultSetReader.java new file mode 100644 index 00000000000..ec847754aa3 --- /dev/null +++ b/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/common/ResultSetReader.java @@ -0,0 +1,103 @@ +/* + * (C) Copyright IBM Corp. 2022 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.database.utils.common; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Calendar; + +/** + * Simplifies reading values from a {@link ResultSet} + */ +public class ResultSetReader { + private final Calendar UTC = CalendarHelper.getCalendarForUTC(); + + // The ResultSet we're reading from + private final ResultSet rs; + + private int index = 0; + + /** + * Canonical constructor + * @param rs + */ + public ResultSetReader(ResultSet rs) { + this.rs = rs; + } + + /** + * Invoke {@link ResultSet#next()} + * @return true if the ResultSet has a row + * @throws SQLException + */ + public boolean next() throws SQLException { + index = 1; + return rs.next(); + } + + /** + * Get a string column value and increment the column index + * @return + * @throws SQLException + */ + public String getString() throws SQLException { + return rs.getString(index++); + } + + /** + * Get a Short column value and increment the column index + * @return + * @throws SQLException + */ + public Short getShort() throws SQLException { + Short result = rs.getShort(index++); + if (rs.wasNull()) { + result = null; + } + return result; + } + + /** + * Get an Integer column value and increment the column index + * @return + * @throws SQLException + */ + public Integer getInt() throws SQLException { + Integer result = rs.getInt(index++); + if (rs.wasNull()) { + result = null; + } + return result; + } + + /** + * Get a Long column value and increment the column index + * @return + * @throws SQLException + */ + public Long getLong() throws SQLException { + Long result = rs.getLong(index++); + if (rs.wasNull()) { + result = null; + } + return result; + } + + /** + * Get a Timestamp column value and increment the column index + * @return + * @throws SQLException + */ + public Timestamp getTimestamp() throws SQLException { + Timestamp result = rs.getTimestamp(index++, UTC); + if (rs.wasNull()) { + result = null; + } + return result; + } +} diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java index 56aac5545f5..94f5e4330d2 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/citus/CitusResourceDAO.java @@ -12,12 +12,12 @@ import javax.transaction.TransactionSynchronizationRegistry; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache; import com.ibm.fhir.persistence.jdbc.connection.FHIRDbFlavor; import com.ibm.fhir.persistence.jdbc.dao.api.IResourceReferenceDAO; import com.ibm.fhir.persistence.jdbc.dto.Resource; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.impl.ParameterTransactionDataImpl; import com.ibm.fhir.persistence.jdbc.postgres.PostgresResourceDAO; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbConnectionStrategy.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbConnectionStrategy.java index 71ae4223cf5..3855bff078e 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbConnectionStrategy.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbConnectionStrategy.java @@ -9,9 +9,9 @@ import java.sql.Connection; import com.ibm.fhir.config.FHIRRequestContext; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.impl.FHIRDbDAOImpl; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * Abstraction used to obtain JDBC connections. The database being connected * is determined by the datasource currently referenced by the {@link FHIRRequestContext} diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbConnectionStrategyBase.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbConnectionStrategyBase.java index f038aae225a..9dabc9bf543 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbConnectionStrategyBase.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbConnectionStrategyBase.java @@ -20,8 +20,8 @@ import com.ibm.fhir.config.PropertyGroup; import com.ibm.fhir.database.utils.api.SchemaType; import com.ibm.fhir.database.utils.model.DbType; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.postgresql.SetPostgresOptimizerOptions; /** diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbHelper.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbHelper.java index 6a97cd6a08c..093089dcee2 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbHelper.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbHelper.java @@ -13,10 +13,10 @@ import com.ibm.fhir.model.resource.OperationOutcome.Issue; import com.ibm.fhir.model.type.code.IssueType; import com.ibm.fhir.model.util.FHIRUtil; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBCleanupException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * Helper functions used for managing FHIR database interactions diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbTenantDatasourceConnectionStrategy.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbTenantDatasourceConnectionStrategy.java index ba1ee36d426..9f73a16a182 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbTenantDatasourceConnectionStrategy.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbTenantDatasourceConnectionStrategy.java @@ -23,10 +23,10 @@ import com.ibm.fhir.database.utils.api.SchemaType; import com.ibm.fhir.database.utils.model.DbType; import com.ibm.fhir.exception.FHIRException; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.dao.impl.FHIRDbDAOImpl; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbTestConnectionStrategy.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbTestConnectionStrategy.java index 6aa5aa7bbf6..5583613a409 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbTestConnectionStrategy.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRDbTestConnectionStrategy.java @@ -12,8 +12,8 @@ import com.ibm.fhir.database.utils.api.IConnectionProvider; import com.ibm.fhir.database.utils.api.SchemaType; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * Hides the logic behind obtaining a JDBC {@link Connection} from the DAO code. diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRTestTransactionAdapter.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRTestTransactionAdapter.java index 65789cca91c..19d7df6eb81 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRTestTransactionAdapter.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRTestTransactionAdapter.java @@ -14,8 +14,8 @@ import com.ibm.fhir.database.utils.api.ITransaction; import com.ibm.fhir.database.utils.transaction.SimpleTransactionProvider; import com.ibm.fhir.persistence.FHIRPersistenceTransaction; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * Hides the logic behind obtaining a JDBC {@link Connection} from the DAO code. diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionAdapter.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionAdapter.java index ed242e55fd6..318ac3b4100 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionAdapter.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionAdapter.java @@ -17,9 +17,9 @@ import com.ibm.fhir.config.FHIRRequestContext; import com.ibm.fhir.persistence.FHIRPersistenceTransaction; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.impl.CacheTransactionSync; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/CodeSystemDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/CodeSystemDAO.java index 6bc1b3b3dd5..05412f3b01c 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/CodeSystemDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/CodeSystemDAO.java @@ -8,8 +8,8 @@ import java.util.Map; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * This Data Access Object interface defines APIs specific to parameter_names table. diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ParameterDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ParameterDAO.java index 16437517034..c2a24c1f7b5 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ParameterDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ParameterDAO.java @@ -8,9 +8,9 @@ import java.util.Map; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * This Data Access Object interface defines methods for creating, updating, diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ParameterNameDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ParameterNameDAO.java index 68247ab66c4..36607b0394b 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ParameterNameDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ParameterNameDAO.java @@ -8,7 +8,7 @@ import java.util.Map; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; /** * This Data Access Object interface defines APIs specific to parameter_names table. diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ResourceDAO.java index 34d50abd725..f9b3b8fd72d 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/api/ResourceDAO.java @@ -12,12 +12,12 @@ import com.ibm.fhir.database.utils.query.Select; import com.ibm.fhir.persistence.context.FHIRPersistenceContext; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.exception.FHIRPersistenceVersionIdMismatchException; import com.ibm.fhir.persistence.jdbc.dto.ExtractedParameterValue; import com.ibm.fhir.persistence.jdbc.dto.Resource; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * This Data Access Object interface provides methods creating, updating, and retrieving rows in the FHIR Resource tables. diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/CodeSystemDAOImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/CodeSystemDAOImpl.java index bc0916f5a8a..0eb1b8983dc 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/CodeSystemDAOImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/CodeSystemDAOImpl.java @@ -16,8 +16,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.api.CodeSystemDAO; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * This DAO uses a connection provided to its constructor. It's therefore diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/FHIRDbDAOImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/FHIRDbDAOImpl.java index faa10d7ad16..0b741a5fdda 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/FHIRDbDAOImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/FHIRDbDAOImpl.java @@ -25,13 +25,13 @@ import com.ibm.fhir.model.resource.OperationOutcome.Issue; import com.ibm.fhir.model.type.code.IssueType; import com.ibm.fhir.model.util.FHIRUtil; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.connection.FHIRDbFlavor; import com.ibm.fhir.persistence.jdbc.dao.api.FHIRDbDAO; import com.ibm.fhir.persistence.jdbc.dto.Resource; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBCleanupException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * This class is a root Data Access Object for managing JDBC access to the FHIR database. diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/FetchResourcePayloadsDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/FetchResourcePayloadsDAO.java index ff67d2317f7..d3526c2dc53 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/FetchResourcePayloadsDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/FetchResourcePayloadsDAO.java @@ -24,8 +24,8 @@ import com.ibm.fhir.database.utils.api.IDatabaseTranslator; import com.ibm.fhir.database.utils.common.CalendarHelper; import com.ibm.fhir.persistence.ResourcePayload; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * DAO to fetch resource ids using a time range and optional current resource id as a filter. diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/JDBCIdentityCacheImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/JDBCIdentityCacheImpl.java index 61ce7545ba5..e81f8974b2a 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/JDBCIdentityCacheImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/JDBCIdentityCacheImpl.java @@ -16,6 +16,7 @@ import java.util.logging.Logger; import java.util.stream.Collectors; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache; import com.ibm.fhir.persistence.jdbc.dao.api.IResourceReferenceDAO; @@ -24,7 +25,6 @@ import com.ibm.fhir.persistence.jdbc.dao.api.ResourceDAO; import com.ibm.fhir.persistence.jdbc.dto.CommonTokenValue; import com.ibm.fhir.persistence.jdbc.dto.CommonTokenValueResult; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterDAOImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterDAOImpl.java index 189c82b44ca..ca7908b81e5 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterDAOImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterDAOImpl.java @@ -12,6 +12,7 @@ import javax.transaction.TransactionSynchronizationRegistry; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.connection.FHIRDbFlavor; import com.ibm.fhir.persistence.jdbc.dao.api.CodeSystemDAO; @@ -20,7 +21,6 @@ import com.ibm.fhir.persistence.jdbc.derby.DerbyCodeSystemDAO; import com.ibm.fhir.persistence.jdbc.derby.DerbyParameterNamesDAO; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.postgres.PostgresCodeSystemDAO; import com.ibm.fhir.persistence.jdbc.postgres.PostgresParameterNamesDAO; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterNameDAOImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterNameDAOImpl.java index 952d81ba666..443f9ba5d58 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterNameDAOImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterNameDAOImpl.java @@ -16,8 +16,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.api.ParameterNameDAO; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * Database interaction for parameter_names. Caching etc is handled diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterVisitorBatchDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterVisitorBatchDAO.java index 0aa713b84e7..5647e32ab51 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterVisitorBatchDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ParameterVisitorBatchDAO.java @@ -25,6 +25,7 @@ import com.ibm.fhir.config.FHIRConfigHelper; import com.ibm.fhir.database.utils.common.CalendarHelper; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.JDBCConstants; import com.ibm.fhir.persistence.jdbc.dao.api.IResourceReferenceDAO; @@ -39,7 +40,6 @@ import com.ibm.fhir.persistence.jdbc.dto.ReferenceParmVal; import com.ibm.fhir.persistence.jdbc.dto.StringParmVal; import com.ibm.fhir.persistence.jdbc.dto.TokenParmVal; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.impl.ParameterTransactionDataImpl; import com.ibm.fhir.persistence.jdbc.util.CanonicalSupport; import com.ibm.fhir.schema.control.FhirSchemaConstants; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceDAOImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceDAOImpl.java index d1420edec3b..65dde69f7de 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceDAOImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceDAOImpl.java @@ -35,6 +35,7 @@ import com.ibm.fhir.database.utils.query.Select; import com.ibm.fhir.persistence.InteractionStatus; import com.ibm.fhir.persistence.context.FHIRPersistenceContext; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.exception.FHIRPersistenceVersionIdMismatchException; import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache; @@ -47,7 +48,6 @@ import com.ibm.fhir.persistence.jdbc.dto.ExtractedParameterValue; import com.ibm.fhir.persistence.jdbc.dto.Resource; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceFKVException; import com.ibm.fhir.persistence.jdbc.impl.ParameterTransactionDataImpl; import com.ibm.fhir.persistence.util.InputOutputByteStream; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceReferenceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceReferenceDAO.java index 3abca7a9001..475df62f6f7 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceReferenceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/dao/impl/ResourceReferenceDAO.java @@ -27,6 +27,7 @@ import com.ibm.fhir.database.utils.api.IDatabaseTranslator; import com.ibm.fhir.database.utils.common.DataDefinitionUtil; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.jdbc.dao.api.ICommonTokenValuesCache; import com.ibm.fhir.persistence.jdbc.dao.api.INameIdCache; @@ -34,7 +35,6 @@ import com.ibm.fhir.persistence.jdbc.dto.CommonTokenValue; import com.ibm.fhir.persistence.jdbc.dto.CommonTokenValueResult; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.schema.control.FhirSchemaConstants; /** diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/db2/Db2ResourceReferenceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/db2/Db2ResourceReferenceDAO.java index 15f737af91e..ecb16929ec6 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/db2/Db2ResourceReferenceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/db2/Db2ResourceReferenceDAO.java @@ -18,6 +18,7 @@ import com.ibm.fhir.database.utils.api.IDatabaseTranslator; import com.ibm.fhir.database.utils.common.DataDefinitionUtil; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.api.ICommonTokenValuesCache; import com.ibm.fhir.persistence.jdbc.dao.api.INameIdCache; import com.ibm.fhir.persistence.jdbc.dao.api.JDBCIdentityCache; @@ -28,7 +29,6 @@ import com.ibm.fhir.persistence.jdbc.dao.impl.ResourceTokenValueRec; import com.ibm.fhir.persistence.jdbc.dto.CommonTokenValue; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyCodeSystemDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyCodeSystemDAO.java index 831f084e3b6..33ec752c1f6 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyCodeSystemDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyCodeSystemDAO.java @@ -11,9 +11,9 @@ import java.sql.ResultSet; import java.sql.SQLException; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.api.FhirRefSequenceDAO; import com.ibm.fhir.persistence.jdbc.dao.impl.CodeSystemDAOImpl; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * Derby variant DAO used to manage code_systems records. Uses diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyParameterNamesDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyParameterNamesDAO.java index 996965a81f3..450772adb2f 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyParameterNamesDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyParameterNamesDAO.java @@ -11,9 +11,9 @@ import java.sql.ResultSet; import java.sql.SQLException; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.api.FhirRefSequenceDAO; import com.ibm.fhir.persistence.jdbc.dao.impl.ParameterNameDAOImpl; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * For R4 we have replaced the old Derby (Java) stored procedure with diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceDAO.java index 77478df554b..980ca60195a 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceDAO.java @@ -28,6 +28,7 @@ import com.ibm.fhir.database.utils.derby.DerbyMaster; import com.ibm.fhir.database.utils.derby.DerbyTranslator; import com.ibm.fhir.persistence.InteractionStatus; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.exception.FHIRPersistenceResourceDeletedException; import com.ibm.fhir.persistence.exception.FHIRPersistenceVersionIdMismatchException; @@ -43,7 +44,6 @@ import com.ibm.fhir.persistence.jdbc.dto.ExtractedParameterValue; import com.ibm.fhir.persistence.jdbc.dto.Resource; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceFKVException; import com.ibm.fhir.persistence.jdbc.impl.ParameterTransactionDataImpl; import com.ibm.fhir.persistence.jdbc.util.ParameterTableSupport; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceReferenceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceReferenceDAO.java index 4eada2d250a..9a256dfc2a8 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceReferenceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/derby/DerbyResourceReferenceDAO.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import com.ibm.fhir.database.utils.api.IDatabaseTranslator; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.api.ICommonTokenValuesCache; import com.ibm.fhir.persistence.jdbc.dao.api.INameIdCache; import com.ibm.fhir.persistence.jdbc.dao.api.ParameterNameDAO; @@ -33,7 +34,6 @@ import com.ibm.fhir.persistence.jdbc.dto.CommonTokenValue; import com.ibm.fhir.persistence.jdbc.dto.CommonTokenValueResult; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.postgres.PostgresResourceReferenceDAO; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/exception/FHIRPersistenceFKVException.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/exception/FHIRPersistenceFKVException.java index f2cfe4275a5..6dc29ec453f 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/exception/FHIRPersistenceFKVException.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/exception/FHIRPersistenceFKVException.java @@ -9,6 +9,7 @@ import java.util.Collection; import com.ibm.fhir.model.resource.OperationOutcome; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; /** * This exception class is thrown when Foreign Key violations are encountered while attempting to access data in the FHIR DB. diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java index ace21a63db7..12317282d4b 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java @@ -102,6 +102,7 @@ import com.ibm.fhir.persistence.context.FHIRPersistenceContext; import com.ibm.fhir.persistence.context.FHIRPersistenceContextFactory; import com.ibm.fhir.persistence.erase.EraseDTO; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.exception.FHIRPersistenceNotSupportedException; import com.ibm.fhir.persistence.exception.FHIRPersistenceResourceDeletedException; @@ -152,7 +153,6 @@ import com.ibm.fhir.persistence.jdbc.dto.StringParmVal; import com.ibm.fhir.persistence.jdbc.dto.TokenParmVal; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceFKVException; import com.ibm.fhir.persistence.jdbc.util.ExtractedSearchParameters; import com.ibm.fhir.persistence.jdbc.util.JDBCParameterBuildingVisitor; @@ -421,7 +421,8 @@ public SingleResourceResult create(FHIRPersistenceContex + ", version=" + resourceDTO.getVersionId()); } - sendParametersToRemoteIndexService(resourceDTO.getResourceType(), resourceDTO.getLogicalId(), resourceDTO.getId(), context.getRequestShard(), searchParameters); + sendParametersToRemoteIndexService(resourceDTO.getResourceType(), resourceDTO.getLogicalId(), resourceDTO.getId(), + resourceDTO.getVersionId(), resourceDTO.getLastUpdated().toInstant(), context.getRequestShard(), searchParameters); SingleResourceResult.Builder resultBuilder = new SingleResourceResult.Builder() .success(true) .interactionStatus(resourceDTO.getInteractionStatus()) @@ -461,15 +462,19 @@ public SingleResourceResult create(FHIRPersistenceContex * @param resourceType * @param logicalId * @param logicalResourceId - * @param shardKey + * @param versionId + * @param lastUpdated + * @param requestShard * @param searchParameters */ - private void sendParametersToRemoteIndexService(String resourceType, String logicalId, long logicalResourceId, String requestShard, + private void sendParametersToRemoteIndexService(String resourceType, String logicalId, long logicalResourceId, + int versionId, java.time.Instant lastUpdated, String requestShard, ExtractedSearchParameters searchParameters) throws FHIRPersistenceException { FHIRRemoteIndexService remoteIndexService = FHIRRemoteIndexService.getServiceInstance(); if (remoteIndexService != null) { // convert the parameters into a form that will be easy to ship to a remote service - SearchParametersTransportAdapter adapter = new SearchParametersTransportAdapter(resourceType, logicalId, logicalResourceId, requestShard); + SearchParametersTransportAdapter adapter = new SearchParametersTransportAdapter(resourceType, logicalId, logicalResourceId, + versionId, lastUpdated, requestShard, searchParameters.getParameterHashB64()); ParameterTransportVisitor visitor = new ParameterTransportVisitor(adapter); for (ExtractedParameterValue pv: searchParameters.getParameters()) { pv.accept(visitor); @@ -478,8 +483,8 @@ private void sendParametersToRemoteIndexService(String resourceType, String logi // Note that the remote index service is supposed to be multi-tenant, using // the tenantId from the request context on this thread, so we don't need // to pass that here - final String partitionKey = resourceType + "/" + logicalId; - IndexProviderResponse ipr = remoteIndexService.submit(new RemoteIndexData(partitionKey, adapter.build())); + final String kafkaPartitionKey = resourceType + "/" + logicalId; + IndexProviderResponse ipr = remoteIndexService.submit(new RemoteIndexData(kafkaPartitionKey, adapter.build())); remoteIndexMessageList.add(ipr); // we'll check for an ACK just before we commit the transaction } } @@ -672,7 +677,8 @@ public SingleResourceResult update(FHIRPersistenceContex } // If configured, send the extracted parameters to the remote indexing service - sendParametersToRemoteIndexService(resourceDTO.getResourceType(), resourceDTO.getLogicalId(), resourceDTO.getId(), context.getRequestShard(), searchParameters); + sendParametersToRemoteIndexService(resourceDTO.getResourceType(), resourceDTO.getLogicalId(), resourceDTO.getId(), + resourceDTO.getVersionId(), resourceDTO.getLastUpdated().toInstant(), context.getRequestShard(), searchParameters); SingleResourceResult.Builder resultBuilder = new SingleResourceResult.Builder() .success(true) @@ -2862,6 +2868,7 @@ private void transactionCompleted(Boolean committed) { // important to clear this list after each transaction because batch bundles // use the same FHIRPersistenceJDBCImpl instance for each entry + remoteIndexMessageList.clear(); payloadPersistenceResponses.clear(); eraseResourceRecs.clear(); } diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/SearchParametersTransportAdapter.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/SearchParametersTransportAdapter.java index c2fa972ff8a..c982a072c41 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/SearchParametersTransportAdapter.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/SearchParametersTransportAdapter.java @@ -8,6 +8,7 @@ import java.math.BigDecimal; import java.sql.Timestamp; +import java.time.Instant; import com.ibm.fhir.persistence.index.DateParameter; import com.ibm.fhir.persistence.index.LocationParameter; @@ -39,14 +40,21 @@ public class SearchParametersTransportAdapter implements ParameterValueVisitorAd * @param resourceType * @param logicalId * @param logicalResourceId + * @param versionId + * @param lastUpdated * @param requestShard + * @param parameterHash */ - public SearchParametersTransportAdapter(String resourceType, String logicalId, long logicalResourceId, String requestShard) { + public SearchParametersTransportAdapter(String resourceType, String logicalId, long logicalResourceId, + int versionId, Instant lastUpdated, String requestShard, String parameterHash) { builder = SearchParametersTransport.builder() .withResourceType(resourceType) .withLogicalId(logicalId) .withLogicalResourceId(logicalResourceId) - .withRequestShard(requestShard); + .withVersionId(versionId) + .withLastUpdated(lastUpdated) + .withRequestShard(requestShard) + .withParameterHash(parameterHash); } /** diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresCodeSystemDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresCodeSystemDAO.java index 3dfc8f7f7c0..01b4019f598 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresCodeSystemDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresCodeSystemDAO.java @@ -12,8 +12,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.impl.CodeSystemDAOImpl; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * PostgreSql variant DAO used to manage code_systems records. Uses diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresParameterNamesDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresParameterNamesDAO.java index ffa9161402d..35d1d9a4916 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresParameterNamesDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresParameterNamesDAO.java @@ -12,8 +12,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.impl.ParameterNameDAOImpl; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; public class PostgresParameterNamesDAO extends ParameterNameDAOImpl { private static final String CLASSNAME = PostgresParameterNamesDAO.class.getName(); diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceDAO.java index c5f160fe19e..3ec8b0c441b 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceDAO.java @@ -25,6 +25,7 @@ import com.ibm.fhir.database.utils.api.SchemaType; import com.ibm.fhir.database.utils.common.CalendarHelper; import com.ibm.fhir.persistence.InteractionStatus; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.exception.FHIRPersistenceVersionIdMismatchException; import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache; @@ -40,7 +41,6 @@ import com.ibm.fhir.persistence.jdbc.dto.ExtractedParameterValue; import com.ibm.fhir.persistence.jdbc.dto.Resource; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceFKVException; import com.ibm.fhir.persistence.jdbc.impl.ParameterTransactionDataImpl; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceNoProcDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceNoProcDAO.java index 5e24825a8ef..34fde9eeb23 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceNoProcDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceNoProcDAO.java @@ -27,6 +27,7 @@ import com.ibm.fhir.database.utils.common.CalendarHelper; import com.ibm.fhir.persistence.InteractionStatus; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.exception.FHIRPersistenceVersionIdMismatchException; import com.ibm.fhir.persistence.jdbc.FHIRPersistenceJDBCCache; @@ -42,7 +43,6 @@ import com.ibm.fhir.persistence.jdbc.dto.ExtractedParameterValue; import com.ibm.fhir.persistence.jdbc.dto.Resource; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceFKVException; import com.ibm.fhir.persistence.jdbc.impl.ParameterTransactionDataImpl; import com.ibm.fhir.persistence.jdbc.util.ParameterTableSupport; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceReferenceDAO.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceReferenceDAO.java index 6a3ccf2746b..c5c578fb614 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceReferenceDAO.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/postgres/PostgresResourceReferenceDAO.java @@ -14,13 +14,13 @@ import java.util.logging.Logger; import com.ibm.fhir.database.utils.api.IDatabaseTranslator; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.jdbc.dao.api.ICommonTokenValuesCache; import com.ibm.fhir.persistence.jdbc.dao.api.INameIdCache; import com.ibm.fhir.persistence.jdbc.dao.api.ParameterNameDAO; import com.ibm.fhir.persistence.jdbc.dao.impl.ResourceReferenceDAO; import com.ibm.fhir.persistence.jdbc.dto.CommonTokenValue; import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDBConnectException; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; /** * Postgres-specific extension of the {@link ResourceReferenceDAO} to work around diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/exception/FHIRPersistenceDataAccessException.java b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/exception/FHIRPersistenceDataAccessException.java similarity index 91% rename from fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/exception/FHIRPersistenceDataAccessException.java rename to fhir-persistence/src/main/java/com/ibm/fhir/persistence/exception/FHIRPersistenceDataAccessException.java index c97e5fa5fcc..c59849c0cb7 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/exception/FHIRPersistenceDataAccessException.java +++ b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/exception/FHIRPersistenceDataAccessException.java @@ -1,15 +1,14 @@ /* - * (C) Copyright IBM Corp. 2017,2019 + * (C) Copyright IBM Corp. 2017, 2022 * * SPDX-License-Identifier: Apache-2.0 */ -package com.ibm.fhir.persistence.jdbc.exception; +package com.ibm.fhir.persistence.exception; import java.util.Collection; import com.ibm.fhir.model.resource.OperationOutcome; -import com.ibm.fhir.persistence.exception.FHIRPersistenceException; /** * This exception class represents failures encountered while attempting to access (read, write) data in the FHIR DB. diff --git a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/FHIRRemoteIndexService.java b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/FHIRRemoteIndexService.java index b37a5c89d3d..35188481b27 100644 --- a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/FHIRRemoteIndexService.java +++ b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/FHIRRemoteIndexService.java @@ -20,7 +20,6 @@ public abstract class FHIRRemoteIndexService { // TODO we should be injecting these services to something like the request context private static FHIRRemoteIndexService serviceInstance; -// private ConcurrentHashMap /** * Initialize the serviceInstance value * @param instance diff --git a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/RemoteIndexMessage.java b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/RemoteIndexMessage.java index 7b09b92bc51..8cfbdf248fb 100644 --- a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/RemoteIndexMessage.java +++ b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/RemoteIndexMessage.java @@ -13,7 +13,17 @@ public class RemoteIndexMessage { private String tenantId; private int messageVersion; private SearchParametersTransport data; - + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append("tenant["); + result.append(tenantId); + result.append("] "); + result.append(data.toString()); + return result.toString(); + } + /** * @return the tenantId */ diff --git a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/SearchParametersTransport.java b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/SearchParametersTransport.java index 6fc40b9494b..d4fb995daa3 100644 --- a/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/SearchParametersTransport.java +++ b/fhir-persistence/src/main/java/com/ibm/fhir/persistence/index/SearchParametersTransport.java @@ -6,6 +6,7 @@ package com.ibm.fhir.persistence.index; +import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -26,6 +27,15 @@ public class SearchParametersTransport { // The database identifier assigned to this resource private long logicalResourceId; + + // The current version of the resource + private int versionId; + + // The parameter hash computed for this set of parameters + private String parameterHash; + + // The last_updated time in a fixed format for transport + private String lastUpdated; // The key value used for sharding the data when using a distributed database private String requestShard; @@ -48,6 +58,27 @@ public static Builder builder() { return new Builder(); } + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append("resourceType["); + result.append(resourceType); + result.append("] "); + result.append("logicalId["); + result.append(logicalId); + result.append("] "); + result.append("versionId["); + result.append(versionId); + result.append("] "); +// result.append("parameterHash["); +// result.append(parameterHash); +// result.append("] "); +// result.append("lastUpdated["); +// result.append(lastUpdated); +// result.append("] "); + return result.toString(); + } + /** * A builder to make it easier to construct a {@link SearchParametersTransport} */ @@ -66,6 +97,9 @@ public static class Builder { private String logicalId; private long logicalResourceId = -1; private String requestShard; + private int versionId; + private String parameterHash; + private String lastUpdated; /** * Set the resourceType @@ -77,6 +111,21 @@ public Builder withResourceType(String resourceType) { return this; } + /** + * Set the parameterHash + * @param hash + * @return + */ + public Builder withParameterHash(String hash) { + this.parameterHash = hash; + return this; + } + + public Builder withLastUpdated(Instant lastUpdated) { + this.lastUpdated = lastUpdated.toString(); + return this; + } + /** * Set the logicalId * @param logicalId @@ -87,6 +136,16 @@ public Builder withLogicalId(String logicalId) { return this; } + /** + * Set the versionId + * @param versionId + * @return + */ + public Builder withVersionId(int versionId) { + this.versionId = versionId; + return this; + } + /** * Set the logicalResourceId * @param logicalResourceId @@ -217,7 +276,10 @@ public SearchParametersTransport build() { result.resourceType = this.resourceType; result.logicalId = this.logicalId; result.logicalResourceId = this.logicalResourceId; + result.setVersionId(this.versionId); result.setRequestShard(this.requestShard); + result.setParameterHash(this.parameterHash); + result.setLastUpdated(this.lastUpdated); if (this.stringValues.size() > 0) { result.stringValues = new ArrayList<>(this.stringValues); @@ -454,4 +516,48 @@ public List getSecurityValues() { public void setSecurityValues(List securityValues) { this.securityValues = securityValues; } + + + /** + * @return the versionId + */ + public int getVersionId() { + return versionId; + } + + + /** + * @param versionId the versionId to set + */ + public void setVersionId(int versionId) { + this.versionId = versionId; + } + + /** + * @return the parameterHash + */ + public String getParameterHash() { + return parameterHash; + } + + /** + * @param parameterHash the parameterHash to set + */ + public void setParameterHash(String parameterHash) { + this.parameterHash = parameterHash; + } + + /** + * @return the lastUpdated + */ + public String getLastUpdated() { + return lastUpdated; + } + + /** + * @param lastUpdated the lastUpdated to set + */ + public void setLastUpdated(String lastUpdated) { + this.lastUpdated = lastUpdated; + } } diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java index 013b881d13e..891467bc8c8 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java @@ -60,6 +60,10 @@ public class Main { private Duration pollDuration = Duration.ofSeconds(10); private long maxBatchCollectTimeMs = 5000; + // The max time we wait for the database to catch up with what was sent to Kafka + // Must be a little longer than the the Liberty transaction timeout + private long maxReadyTimeMs = 180000; + // the list of consumers private final List consumers = new ArrayList<>(); @@ -121,6 +125,13 @@ public void parseArgs(String[] args) { throw new IllegalArgumentException("Missing value for --consumer-count"); } break; + case "--max-ready-time-ms": + if (a < args.length && !args[a].startsWith("--")) { + maxReadyTimeMs = Long.parseLong(args[a++]); + } else { + throw new IllegalArgumentException("Missing value for --max-ready-time-ms"); + } + break; default: throw new IllegalArgumentException("Bad arg: '" + arg + "'"); } @@ -283,7 +294,7 @@ private IMessageHandler buildHandler() throws FHIRPersistenceException { try { // Each handler gets a dedicated database connection so we don't have // to deal with contention when grabbing connections from a pool - return new DistributedPostgresMessageHandler(connectionProvider.getConnection(), getSchemaName(), identityCache); + return new DistributedPostgresMessageHandler(connectionProvider.getConnection(), getSchemaName(), identityCache, maxReadyTimeMs); } catch (SQLException x) { throw new FHIRPersistenceException("get connection failed", x); } diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/BaseMessageHandler.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/BaseMessageHandler.java index 11ee02c0fc6..7e0362e0bf4 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/BaseMessageHandler.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/BaseMessageHandler.java @@ -6,11 +6,15 @@ package com.ibm.fhir.remote.index.database; +import java.security.SecureRandom; +import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import com.google.gson.Gson; +import com.ibm.fhir.database.utils.thread.ThreadHandler; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.index.DateParameter; import com.ibm.fhir.persistence.index.LocationParameter; @@ -34,30 +38,75 @@ public abstract class BaseMessageHandler implements IMessageHandler { private final Logger logger = Logger.getLogger(BaseMessageHandler.class.getName()); private static final int MIN_SUPPORTED_MESSAGE_VERSION = 1; + // If we fail 10 times due to deadlocks, then something is seriously wrong + private static final int MAX_TX_ATTEMPTS = 10; + private SecureRandom random = new SecureRandom(); + + private final long maxReadyWaitMs; + /** + * Protected constructor + * @param maxReadyWaitMs the max time in ms to wait for the upstream transaction to make the data ready + */ + protected BaseMessageHandler(long maxReadyWaitMs) { + this.maxReadyWaitMs = maxReadyWaitMs; + } + @Override public void process(List messages) throws FHIRPersistenceException { - try { - startBatch(); - for (String payload: messages) { - if (logger.isLoggable(Level.FINEST)) { - logger.finest("Processing message payload: " + payload); - } - RemoteIndexMessage message = unmarshall(payload); - if (message != null) { - if (message.getMessageVersion() >= MIN_SUPPORTED_MESSAGE_VERSION) { - process(message); - } else { - logger.warning("Message version [" + message.getMessageVersion() + "] not supported, ignoring payload=[" + payload + "]"); - } + List unmarshalled = new ArrayList<>(messages.size()); + for (String payload: messages) { + if (logger.isLoggable(Level.FINEST)) { + logger.finest("Processing message payload: " + payload); + } + RemoteIndexMessage message = unmarshall(payload); + if (message != null) { + if (message.getMessageVersion() >= MIN_SUPPORTED_MESSAGE_VERSION) { + unmarshalled.add(message); + } else { + logger.warning("Message version [" + message.getMessageVersion() + "] not supported, ignoring payload=[" + payload + "]"); } } - pushBatch(); - } catch (Throwable t) { - setRollbackOnly(); - throw t; - } finally { - endTransaction(); } + processWithRetry(unmarshalled); + } + + /** + * Process the batch of messages with support for retries in the case + * of a retryable error such as a database deadlock + * + * @param messages + * @throws FHIRPersistenceException + */ + private void processWithRetry(List messages) throws FHIRPersistenceException { + int attempt = 1; + do { + try { + if (attempt > 1) { + // introduce a random delay before we re-attempt to process the batch. This + // may help to avoid subsequent deadlocks if there are multiple transactions + // involved + final long delay = random.nextInt(10) * 1000l; + logger.fine(() -> "Deadlock retry backoff ms: " + delay); + ThreadHandler.safeSleep(delay); + } + startBatch(); + processMessages(messages); + pushBatch(); + + attempt = MAX_TX_ATTEMPTS; // exit our do...while + } catch (FHIRPersistenceDataAccessException x) { + setRollbackOnly(); + // see if this is a retryable error + if (x.isTransactionRetryable() && attempt++ < MAX_TX_ATTEMPTS) { + logger.warning("tx failed, but retry permitted: " + x.getMessage()); + resetBatch(); // clear up any cruft from the previous attempt + } else { + throw x; + } + } finally { + endTransaction(); + } + } while (attempt < MAX_TX_ATTEMPTS); } /** @@ -70,6 +119,12 @@ public void process(List messages) throws FHIRPersistenceException { */ protected abstract void setRollbackOnly(); + /** + * Reset the state of the handler following a failure so that the batch can + * be retried + */ + protected abstract void resetBatch(); + /** * Push any data we've accumulated from processing messages. */ @@ -92,6 +147,71 @@ private RemoteIndexMessage unmarshall(String jsonPayload) { } return null; } + + /** + * Process the list of messages + * @param messages + * @throws FHIRPersistenceException + */ + private void processMessages(List messages) throws FHIRPersistenceException { + // We need to do a quick scan of all the messages to make sure that + // the logical resource records for each already exist. If prepare + // returns false, it means one of two things: + // 1. we received the message before the server transaction committed + // 2. the server transaction failed/rolled back, so we'll never be ready + long timeoutTime = System.nanoTime() + this.maxReadyWaitMs * 1000000; + + // Messages which match the current version info in the database + List okToProcess = new ArrayList<>(); + + // resources which don't yet exist of their version is older than the message + List notReady = new ArrayList<>(); + + // make at least one attempt + do { + if (okToProcess.size() > 0) { + okToProcess.clear(); // reset ready for next prepare call + } + if (notReady.size() > 0) { + notReady.clear(); // reset ready for next prepare call + } + + // Ask the handle to check which messages match the database + // and are therefore ready to be processed + prepare(messages, okToProcess, notReady); + + // If the ready check fails just sleep for a bit because we need + // to wait until the upstream transaction commits. This means we + // may need to keep waiting for a long time which unfortunately + // stalls processing this partition + if (notReady.size() > 0) { + long snoozeMs = Math.min(1000l, (timeoutTime - System.nanoTime()) / 1000000); + // short sleep to wait for the upstream transaction to complete + ThreadHandler.safeSleep(snoozeMs); + } + } while (notReady.size() > 0 && System.nanoTime() < timeoutTime); + + // okToProcess contains those messages for which we see the upstream transaction + // has committed. + for (RemoteIndexMessage message: okToProcess) { + process(message); + } + + // Make a note of which messages we were unable to process because the upstream + // transaction did not commit before our maxReadyWaitMs timeout + for (RemoteIndexMessage message: notReady) { + logger.warning("Timed out waiting for upstream transaction to commit data for: " + message.toString()); + } + } + + /** + * Check to see if the database is ready to process the messages + * @param IN: messages to check + * @param OUT: okToMessages the messages matching the current database + * @param OUT: notReady the messages for which the upstream transaction has yet to commit + */ + protected abstract void prepare(List messages, List okToProcess, List notReady) throws FHIRPersistenceException; + /** * Process the data * @param message diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresMessageHandler.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresMessageHandler.java index 5e4185aaf84..7dcbc907100 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresMessageHandler.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresMessageHandler.java @@ -15,13 +15,16 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; import com.ibm.fhir.database.utils.api.IDatabaseTranslator; +import com.ibm.fhir.database.utils.common.ResultSetReader; import com.ibm.fhir.database.utils.postgres.PostgresTranslator; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.index.DateParameter; @@ -29,6 +32,7 @@ import com.ibm.fhir.persistence.index.NumberParameter; import com.ibm.fhir.persistence.index.ProfileParameter; import com.ibm.fhir.persistence.index.QuantityParameter; +import com.ibm.fhir.persistence.index.RemoteIndexMessage; import com.ibm.fhir.persistence.index.SearchParameterValue; import com.ibm.fhir.persistence.index.SecurityParameter; import com.ibm.fhir.persistence.index.StringParameter; @@ -104,8 +108,12 @@ public class DistributedPostgresMessageHandler extends BaseMessageHandler { * Public constructor * * @param connection + * @param schemaName + * @param cache + * @param maxReadyTimeMs */ - public DistributedPostgresMessageHandler(Connection connection, String schemaName, IdentityCache cache) { + public DistributedPostgresMessageHandler(Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) { + super(maxReadyTimeMs); this.connection = connection; this.schemaName = schemaName; this.identityCache = cache; @@ -902,4 +910,127 @@ private Integer createParameterName(String parameterName) throws SQLException { return parameterNameId; } + @Override + protected void resetBatch() { + // Called when a transaction has been rolled back because of a deadlock + // or other retryable error and we want to try and process the batch again + batchProcessor.reset(); + } + + /** + * Build the check ready query + * @param messagesByResourceType + * @return + */ + private String buildCheckReadyQuery(Map> messagesByResourceType) { + StringBuilder select = new StringBuilder(); + // SELECT lr.shard_key, lr.logical_resource_id, lr.resource_type, lr.logical_id, xlr.version_id, lr.last_updated, lr.parameter_hash + // FROM logical_resources AS lr, + // patient_logical_resources AS xlr + // WHERE lr.logical_resource_id = xlr.logical_resource_id + // AND xlr.logical_resource_id IN (1,2,3,4) + // UNION ALL + // SELECT lr.shard_key, lr.logical_resource_id, lr.resource_type, lr.logical_id, xlr.version_id, lr.last_updated, lr.parameter_hash + // FROM logical_resources AS lr, + // observation_logical_resources AS xlr + // WHERE lr.logical_resource_id = xlr.logical_resource_id + // AND xlr.logical_resource_id IN (5,6,7) + boolean first = true; + for (Map.Entry> entry: messagesByResourceType.entrySet()) { + final String resourceType = entry.getKey(); + final List messages = entry.getValue(); + final String inlist = messages.stream().map(m -> Long.toString(m.getData().getLogicalResourceId())).collect(Collectors.joining(",")); + if (first) { + first = false; + } else { + select.append(" UNION ALL "); + } + select.append(" SELECT lr.shard_key, lr.logical_resource_id, '" + resourceType + "' AS resource_type, lr.logical_id, xlr.version_id, lr.last_updated, lr.parameter_hash "); + select.append(" FROM logical_resources AS lr, "); + select.append(resourceType).append("_logical_resources AS xlr "); + select.append(" WHERE lr.logical_resource_id = xlr.logical_resource_id "); + select.append(" AND xlr.logical_resource_id IN (").append(inlist).append(")"); + } + + return select.toString(); + } + + @Override + protected void prepare(List messages, List okToProcess, List notReady) throws FHIRPersistenceException { + // Get a list of all the resources for which we can see the current logical resource data. + // If the resource doesn't yet exist or its version meta doesn't the message + // then we add to the notReady list. If the resource version meta already + // exceeds the message, then we'll skip processing altogether because it + // means that there should be another message in the queue with more + // up-to-date parameters + Map messageMap = new HashMap<>(); + Map> messagesByResourceType = new HashMap<>(); + for (RemoteIndexMessage msg: messages) { + Long logicalResourceId = msg.getData().getLogicalResourceId(); + messageMap.put(logicalResourceId, msg); + + // split out the messages per resource type because we need to read from xx_logical_resources + List values = messagesByResourceType.computeIfAbsent(msg.getData().getResourceType(), k -> new ArrayList<>()); + values.add(msg); + } + + Set found = new HashSet<>(); + final String checkReadyQuery = buildCheckReadyQuery(messagesByResourceType); + logger.fine(() -> "check ready query: " + checkReadyQuery); + try (PreparedStatement ps = connection.prepareStatement(checkReadyQuery)) { + ResultSet rs = ps.executeQuery(); + // wrap the ResultSet in a reader for easier consumption + ResultSetReader rsReader = new ResultSetReader(rs); + while (rsReader.next()) { + LogicalResourceValue lrv = LogicalResourceValue.builder() + .withShardKey(rsReader.getShort()) + .withLogicalResourceId(rsReader.getLong()) + .withResourceType(rsReader.getString()) + .withLogicalId(rsReader.getString()) + .withVersionId(rsReader.getInt()) + .withLastUpdated(rsReader.getTimestamp()) + .withParameterHash(rsReader.getString()) + .build(); + RemoteIndexMessage m = messageMap.get(lrv.getLogicalResourceId()); + if (m == null) { + throw new IllegalStateException("query returned a logical resource which we didn't request"); + } + + // Check the values from the database to see if they match + // the information in the message. + if (m.getData().getVersionId() == lrv.getVersionId()) { + // only process this message if the parameter hash and lastUpdated + // times match - which is a good check that we're storing parameters + // from the correct transaction. If these don't match, we can simply + // say we found the data but don't need to process the message. + final String lastUpdated = lrv.getLastUpdated().toString(); + if (lrv.getParameterHash().equals(m.getData().getParameterHash()) + && lastUpdated.equals(m.getData().getLastUpdated())) { + okToProcess.add(m); + } + found.add(lrv.getLogicalResourceId()); // won't be marked as missing + } else if (m.getData().getVersionId() > lrv.getVersionId()) { + // we can skip processing this record because the database has already + // been updated with a newer version. Identify the record as having been + // found so we don't keep waiting for it + found.add(lrv.getLogicalResourceId()); + } + // if the version in the database is prior to version in the message we + // received it means that the server transaction hasn't been committed... + // so we have to wait just as though it were missing altogether + } + } catch (SQLException x) { + logger.log(Level.SEVERE, "prepare failed: " + checkReadyQuery, x); + throw new FHIRPersistenceException("prepare query failed"); + } + + if (found.size() < messages.size()) { + // identify the missing records and add to the notReady list + for (RemoteIndexMessage m: messages) { + if (!found.contains(m.getData().getLogicalResourceId())) { + notReady.add(m); + } + } + } + } } \ No newline at end of file diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresParameterBatch.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresParameterBatch.java index ece3b5558fe..feebe7fb947 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresParameterBatch.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/DistributedPostgresParameterBatch.java @@ -107,7 +107,7 @@ public void pushBatch() throws SQLException { * Resets the state of the DAO by closing all statements and * setting any batch counts to 0 */ - public void reset() { + public void close() { if (strings != null) { try { strings.close(); diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/JDBCBatchParameterProcessor.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/JDBCBatchParameterProcessor.java index 55ccbc8f3ed..de559a51db4 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/JDBCBatchParameterProcessor.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/JDBCBatchParameterProcessor.java @@ -61,7 +61,7 @@ public JDBCBatchParameterProcessor(Connection connection) { */ public void close() { for (Map.Entry entry: daoMap.entrySet()) { - entry.getValue().reset(); + entry.getValue().close(); } systemDao.close(); } @@ -73,6 +73,17 @@ public void startBatch() { resourceTypesInBatch.clear(); } + /** + * Make sure that each statement that may contain data is cleared before we + * retry a batch + */ + public void reset() { + for (String resourceType: resourceTypesInBatch) { + DistributedPostgresParameterBatch dao = daoMap.get(resourceType); + dao.close(); + } + systemDao.close(); + } /** * Push any statements that have been batched but not yet executed * @throws FHIRPersistenceException diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/LogicalResourceValue.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/LogicalResourceValue.java new file mode 100644 index 00000000000..7491e2f8d79 --- /dev/null +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/LogicalResourceValue.java @@ -0,0 +1,151 @@ +/* + * (C) Copyright IBM Corp. 2022 + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.ibm.fhir.remote.index.database; + +import java.sql.Timestamp; + +/** + * A DTO representing a record from logical_resources + */ +public class LogicalResourceValue { + private final short shardKey; + private final long logicalResourceId; + private final String resourceType; + private final String logicalId; + private final int versionId; + private final Timestamp lastUpdated; + private final String parameterHash; + public static class Builder { + private short shardKey; + private long logicalResourceId; + private String resourceType; + private String logicalId; + private int versionId; + private Timestamp lastUpdated; + private String parameterHash; + + public Builder withShardKey(short shardKey) { + this.shardKey = shardKey; + return this; + } + public Builder withLogicalResourceId(long logicalResourceId) { + this.logicalResourceId = logicalResourceId; + return this; + } + public Builder withResourceType(String resourceType) { + this.resourceType = resourceType; + return this; + } + public Builder withLogicalId(String logicalId) { + this.logicalId = logicalId; + return this; + } + public Builder withVersionId(int versionId) { + this.versionId = versionId; + return this; + } + public Builder withLastUpdated(Timestamp lastUpdated) { + this.lastUpdated = lastUpdated; + return this; + } + public Builder withParameterHash(String parameterHash) { + this.parameterHash = parameterHash; + return this; + } + + /** + * Create a new {@link LogicalResourceValue} using the current state of this {@link Builder} + * @return + */ + public LogicalResourceValue build() { + return new LogicalResourceValue(shardKey, logicalResourceId, resourceType, logicalId, versionId, lastUpdated, parameterHash); + } + } + + /** + * Factor function to create a fresh instance of a {@link Builder} + * @return + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Public constructor + * @param shardKey + * @param logicalResourceId + * @param resourceType + * @param logicalId + * @param lastUpdated + * @param parameterHash + */ + public LogicalResourceValue(short shardKey, long logicalResourceId, String resourceType, String logicalId, int versionId, Timestamp lastUpdated, String parameterHash) { + this.shardKey = shardKey; + this.logicalResourceId = logicalResourceId; + this.resourceType = resourceType; + this.logicalId = logicalId; + this.versionId = versionId; + this.lastUpdated = lastUpdated; + this.parameterHash = parameterHash; + } + + + /** + * @return the shardKey + */ + public short getShardKey() { + return shardKey; + } + + + /** + * @return the logicalResourceId + */ + public long getLogicalResourceId() { + return logicalResourceId; + } + + + /** + * @return the resourceType + */ + public String getResourceType() { + return resourceType; + } + + + /** + * @return the logicalId + */ + public String getLogicalId() { + return logicalId; + } + + + /** + * @return the lastUpdated + */ + public Timestamp getLastUpdated() { + return lastUpdated; + } + + + /** + * @return the parameterHash + */ + public String getParameterHash() { + return parameterHash; + } + + /** + * @return the versionId + */ + public int getVersionId() { + return versionId; + } + +} diff --git a/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIRRestHelper.java b/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIRRestHelper.java index 4a389423df1..c7be5d311cd 100644 --- a/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIRRestHelper.java +++ b/fhir-server/src/main/java/com/ibm/fhir/server/util/FHIRRestHelper.java @@ -107,11 +107,11 @@ import com.ibm.fhir.persistence.context.FHIRSystemHistoryContext; import com.ibm.fhir.persistence.context.impl.FHIRPersistenceContextImpl; import com.ibm.fhir.persistence.erase.EraseDTO; +import com.ibm.fhir.persistence.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.exception.FHIRPersistenceException; import com.ibm.fhir.persistence.exception.FHIRPersistenceIfNoneMatchException; import com.ibm.fhir.persistence.exception.FHIRPersistenceResourceNotFoundException; import com.ibm.fhir.persistence.helper.FHIRTransactionHelper; -import com.ibm.fhir.persistence.jdbc.exception.FHIRPersistenceDataAccessException; import com.ibm.fhir.persistence.payload.PayloadPersistenceResponse; import com.ibm.fhir.persistence.util.FHIRPersistenceUtil; import com.ibm.fhir.profile.ProfileSupport;