Skip to content

Commit

Permalink
Issue #2195 - Add logicalResourceIds param to $reindex operation
Browse files Browse the repository at this point in the history
Signed-off-by: Troy Biesterfeld <tbieste@us.ibm.com>
  • Loading branch information
tbieste committed Jun 4, 2021
1 parent 66d41bc commit b9019e3
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public class ReindexResourceDAO extends ResourceDAOImpl {
// Note that currently the global logical_resources table does not carry
// the is_deleted flag. Until it does, the queries will return deleted
// resources, which can be skipped for reindex. (issue-2055)
private static final String PICK_SINGLE_LOGICAL_RESOURCE = ""
+ " SELECT lr.logical_resource_id, lr.resource_type_id, lr.logical_id, lr.reindex_txid "
+ " FROM logical_resources lr "
+ " WHERE lr.logical_resource_id = ? "
+ " AND lr.reindex_tstamp < ? "
;

private static final String PICK_SINGLE_RESOURCE = ""
+ " SELECT lr.logical_resource_id, lr.resource_type_id, lr.logical_id, lr.reindex_txid "
+ " FROM logical_resources lr "
Expand Down Expand Up @@ -111,15 +118,56 @@ public ReindexResourceDAO(Connection connection, IDatabaseTranslator translator,
* Getter for the translator currently held by this DAO
* @return
*/
@Override
protected IDatabaseTranslator getTranslator() {
return this.translator;
}

/**
* Pick the next resource to process resource and lock it. Specializations for different
* databases may use different techniques to optimize locking/concurrency control
* @param reindexTstamp
* @return
* Pick a specific resource to process by logicalResourceId (primary key).
* Since the logicalResourceId is specified, we avoid updating the record as the caller of $reindex operation
* is passing in an explicit list of resources, so no need to lock for the purpose of picking a random resource.
* This can improve performance (especially with PostgreSQL by avoid the generation of tombstones).
* @param reindexTstamp only get resource with a reindex_tstamp less than this
* @param logicalResourceId the logical resource ID (primary key) of a specific resource
* @return the resource record, or null when the resource is not found
* @throws Exception
*/
protected ResourceIndexRecord getResource(Instant reindexTstamp, Long logicalResourceId) throws Exception {
ResourceIndexRecord result = null;

// no need to close
Connection connection = getConnection();
IDatabaseTranslator translator = getTranslator();

final String select = PICK_SINGLE_LOGICAL_RESOURCE;

try (PreparedStatement stmt = connection.prepareStatement(select)) {
if (logicalResourceId != null) {
// specific resource by logical resource ID (primary key)
stmt.setLong(1, logicalResourceId);
stmt.setTimestamp(2, Timestamp.from(reindexTstamp));
}
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
result = new ResourceIndexRecord(rs.getLong(1), rs.getInt(2), rs.getString(3), rs.getLong(4));
}
} catch (SQLException x) {
logger.log(Level.SEVERE, select, x);
throw translator.translate(x);
}

return result;
}

/**
* Pick the next resource to process, then also lock it. Specializations for different databases may use
* different techniques to optimize locking/concurrency control.
* @param random used to generate a random number
* @param reindexTstamp only get resource with a reindex_tstamp less than this
* @param resourceTypeId the resource type ID of a specific resource type, or null
* @param logicalId the resource ID of a specific resource, or null
* @return the resource record, or null when there is nothing left to do
* @throws Exception
*/
protected ResourceIndexRecord getNextResource(SecureRandom random, Instant reindexTstamp, Integer resourceTypeId, String logicalId) throws Exception {
Expand All @@ -135,12 +183,13 @@ protected ResourceIndexRecord getNextResource(SecureRandom random, Instant reind
final String select;

if (resourceTypeId != null && logicalId != null) {
// Just pick the requested resource
// Just pick the requested resource by resource type and resource ID
select = PICK_SINGLE_RESOURCE;
} else if (resourceTypeId != null) {
// Limit to the given resource type
select = PICK_SINGLE_RESOURCE_TYPE;
} else if (resourceTypeId == null && logicalId == null) {
// Pick the next resource needing to be reindexed regardless of type
select = PICK_ANY_RESOURCE;
} else {
// programming error
Expand All @@ -156,14 +205,17 @@ protected ResourceIndexRecord getNextResource(SecureRandom random, Instant reind
int offset = random.nextInt(offsetRange);
try (PreparedStatement stmt = connection.prepareStatement(select)) {
if (resourceTypeId != null && logicalId != null) {
// specific resource by resource type and resource ID
stmt.setInt(1, resourceTypeId);
stmt.setString(2, logicalId);
stmt.setTimestamp(3, Timestamp.from(reindexTstamp));
} else if (resourceTypeId != null) {
// limit to resource type
stmt.setInt(1, resourceTypeId);
stmt.setTimestamp(2, Timestamp.from(reindexTstamp));
stmt.setInt(3, offset);
} else {
// any resource type
stmt.setTimestamp(1, Timestamp.from(reindexTstamp));
stmt.setInt(2, offset);
}
Expand Down Expand Up @@ -217,19 +269,27 @@ protected ResourceIndexRecord getNextResource(SecureRandom random, Instant reind
* Get the resource record we want to reindex. This might take a few attempts, because
* there could be hundreds of threads all trying to do the same thing, and we may see
* collisions which will cause the FOR UPDATE to block, then return no rows.
* @param reindexTstamp
* @param resourceCount
* @param reindexTstamp only get resource with an index_tstamp less than this
* @param logicalResourceId logical resource ID (primary key) of resource to reindex, or null
* @param resourceTypeId the resource type ID of a specific resource type, or null;
* this parameter is ignored if the logicalResourceId parameter value is non-null
* @param logicalId the resource ID of a specific resource, or null;
* this parameter is ignored if the logicalResourceId parameter value is non-null
* @return the resource record, or null when there is nothing left to do
* @throws Exception
*/
public ResourceIndexRecord getResourceToReindex(Instant reindexTstamp, Integer resourceTypeId, String logicalId) throws Exception {
public ResourceIndexRecord getResourceToReindex(Instant reindexTstamp, Long logicalResourceId, Integer resourceTypeId, String logicalId) throws Exception {
ResourceIndexRecord result = null;

// no need to close
Connection connection = getConnection();

// Get a resource which needs to be reindexed
result = getNextResource(RANDOM, reindexTstamp, resourceTypeId, logicalId);
if (logicalResourceId != null) {
result = getResource(reindexTstamp, logicalResourceId);
} else {
result = getNextResource(RANDOM, reindexTstamp, resourceTypeId, logicalId);
}

if (result != null) {

Expand All @@ -248,6 +308,10 @@ public ResourceIndexRecord getResourceToReindex(Instant reindexTstamp, Integer r
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
result.setResourceType(rs.getString(1));
} else if (logicalResourceId != null) {
// When logicalResourceId is specified, the resource is not locked, so it could disappear
logger.fine("Logical resource no longer exists: logical_resource_id=" + result.getLogicalResourceId());
result = null;
} else {
// Can't really happen, because the resource is selected for update, so it can't disappear
logger.severe("Logical resource no longer exists: logical_resource_id=" + result.getLogicalResourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2427,8 +2427,8 @@ public boolean isReindexSupported() {
}

@Override
public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder operationOutcomeResult, java.time.Instant tstamp, String resourceLogicalId)
throws FHIRPersistenceException {
public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder operationOutcomeResult, java.time.Instant tstamp, List<Long> logicalResourceIds,
String resourceLogicalId) throws FHIRPersistenceException {
final String METHODNAME = "reindex";
log.entering(CLASSNAME, METHODNAME);

Expand Down Expand Up @@ -2468,13 +2468,14 @@ public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oper
// Look up the optional resourceTypeId for the given resourceType parameter
resourceTypeId = cache.getResourceTypeCache().getId(resourceType);
}
int lrIdIndex = 0;

// Need to skip over deleted resources so we have to loop until we find something not
// deleted, or reach the end.
// If list of logicalResourceIds was specified, loop over those. Otherwise, since we skip over
// deleted resources we have to loop until we find something not deleted, or reach the end.
ResourceIndexRecord rir;
do {
long start = System.nanoTime();
rir = reindexDAO.getResourceToReindex(tstamp, resourceTypeId, logicalId);
rir = reindexDAO.getResourceToReindex(tstamp, logicalResourceIds != null ? logicalResourceIds.get(lrIdIndex++) : null, resourceTypeId, logicalId);
long end = System.nanoTime();

if (log.isLoggable(Level.FINER)) {
Expand All @@ -2497,7 +2498,7 @@ public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oper

// result is only 0 if getResourceToReindex doesn't give us anything because this indicates
// there's nothing left to do
result = 1;
result++;
} else {
// Skip this particular resource because it has been deleted
if (log.isLoggable(Level.FINE)) {
Expand All @@ -2506,7 +2507,7 @@ public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oper
rir.setDeleted(true);
}
}
} while (rir != null && rir.isDeleted());
} while ((logicalResourceIds != null && lrIdIndex < logicalResourceIds.size()) || (logicalResourceIds == null && rir != null && rir.isDeleted()));

} catch(FHIRPersistenceFKVException e) {
getTransaction().setRollbackOnly();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public ResourceIndexRecord getNextResource(SecureRandom random, Instant reindexT
// confuse the optimizer.
final String update;
if (resourceTypeId != null && logicalId != null) {
// Limit to one resource
// Just pick the requested resource by resource type and resource ID
update = PICK_SINGLE_RESOURCE;
} else if (resourceTypeId != null) {
// Limit to one type of resource
Expand All @@ -138,7 +138,7 @@ public ResourceIndexRecord getNextResource(SecureRandom random, Instant reindexT

try (PreparedStatement stmt = connection.prepareStatement(update)) {
if (resourceTypeId != null && logicalId != null) {
// specific resource
// specific resource by resource type and resource ID
stmt.setTimestamp(1, Timestamp.from(reindexTstamp));
stmt.setInt(2, resourceTypeId);
stmt.setString(3, logicalId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright IBM Corp. 2020,2021
* (C) Copyright IBM Corp. 2020, 2021
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -456,7 +456,8 @@ public String generateResourceId() {
}

@Override
public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, java.time.Instant tstamp, String resourceLogicalId) throws FHIRPersistenceException {
public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, java.time.Instant tstamp, List<Long> logicalResourceIds,
String resourceLogicalId) throws FHIRPersistenceException {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,20 @@ default boolean isReindexSupported() {
}

/**
* Initiates reindexing for resources not yet processed. Limits the number of resources
* processed to resourceCount. The number processed is returned in the OperationOutcome.
* Initiates reindexing for either a specified list of logicalResourceIds (primary keys),
* or a randomly chosen resource. The number of resources processed is returned.
* This can be used by a controller to continue processing until everything is complete.
* Increasing resourceCount reduces the number of calls required to reindex an entire
* database, but larger values risk exceeding the transaction timeout. Values around 100
* are a good starting point for most systems.
* @param context the FHIRPersistenceContext instance associated with the current request.
* @param operationOutcomeResult accumulate issues in this {@link Builder}
* @param tstamp reindex any resources with an index_tstamp less than this.
* @param resourceLogicalId optional resourceType/logicalId value to reindex a specific resource
* @param operationOutcomeResult accumulate issues in this {@link OperationOutcome.Builder}
* @param tstamp only reindex resources with a reindex_tstamp less than this
* @param logicalResourceIds list of logical resource IDs (primary keys) of resources to reindex, or null
* @param resourceLogicalId resourceType/logicalId value of a specific resource to reindex, or null;
* this parameter is ignored if the logicalResourceIds parameter value is non-null
* @return count of the number of resources reindexed by this call (0 or 1)
* @throws FHIRPersistenceException
*/
int reindex(FHIRPersistenceContext context, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, String resourceLogicalId)
throws FHIRPersistenceException;
int reindex(FHIRPersistenceContext context, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, List<Long> logicalResourceIds,
String resourceLogicalId) throws FHIRPersistenceException;

/**
* Special function for high speed export of resource payloads. The process
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2017,2021
* (C) Copyright IBM Corp. 2017, 2021
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -77,7 +77,8 @@ public OperationOutcome getHealth() throws FHIRPersistenceException {
}

@Override
public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, Instant tstamp, String resourceLogicalId) throws FHIRPersistenceException {
public int reindex(FHIRPersistenceContext context, OperationOutcome.Builder oob, Instant tstamp, List<Long> logicalResourceIds,
String resourceLogicalId) throws FHIRPersistenceException {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.ibm.fhir.server.operation.spi;

import java.time.Instant;
import java.util.List;

import javax.ws.rs.core.MultivaluedMap;

Expand Down Expand Up @@ -309,16 +310,19 @@ Resource doInvoke(FHIROperationContext operationContext, String resourceTypeName
FHIRPersistenceTransaction getTransaction() throws Exception;

/**
* Invoke the FHIR persistence reindex operation for a randomly chosen resource which was
* last reindexed before the given date
* @param operationContext
* @param operationOutcomeResult
* @param tstamp
* @param resourceLogicalId a reference to a resource e.g. "Patient/abc123". Can be null
* @return number of resources reindexed (0 if no resources were found to reindex)
* Invoke the FHIR persistence reindex operation for either a specified list of logicalResourceIds (primary keys),
* or a randomly chosen resource, last reindexed before the given timestamp.
* @param operationContext the operation context
* @param operationOutcomeResult accumulate issues in this {@link OperationOutcome.Builder}
* @param tstamp only reindex resources with a reindex_tstamp less than this
* @param logicalResourceIds list of logical resource IDs (primary keys) of resources to reindex, or null
* @param resourceLogicalId resourceType (e.g. "Patient"), or resourceType/logicalId a specific resource (e.g. "Patient/abc123"), to reindex, or null;
* this parameter is ignored if the logicalResourceIds parameter value is non-null
* @return count of the number of resources reindexed by this call
* @throws Exception
*/
int doReindex(FHIROperationContext operationContext, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, String resourceLogicalId) throws Exception;
int doReindex(FHIROperationContext operationContext, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, List<Long> logicalResourceIds,
String resourceLogicalId) throws Exception;

/**
* Invoke the FHIR Persistence erase operation for a specific instance of the erase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3069,7 +3069,8 @@ private void setOperationContextProperties(FHIROperationContext operationContext
}

@Override
public int doReindex(FHIROperationContext operationContext, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, String resourceLogicalId) throws Exception {
public int doReindex(FHIROperationContext operationContext, OperationOutcome.Builder operationOutcomeResult, Instant tstamp, List<Long> logicalResourceIds,
String resourceLogicalId) throws Exception {
int result = 0;
// handle some retries in case of deadlock exceptions
final int TX_ATTEMPTS = 5;
Expand All @@ -3079,7 +3080,7 @@ public int doReindex(FHIROperationContext operationContext, OperationOutcome.Bui
txn.begin();
try {
FHIRPersistenceContext persistenceContext = null;
result = persistence.reindex(persistenceContext, operationOutcomeResult, tstamp, resourceLogicalId);
result = persistence.reindex(persistenceContext, operationOutcomeResult, tstamp, logicalResourceIds, resourceLogicalId);
attempt = TX_ATTEMPTS; // end the retry loop
} catch (FHIRPersistenceDataAccessException x) {
if (x.isTransactionRetryable() && attempt < TX_ATTEMPTS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public String generateResourceId() {
}

@Override
public int reindex(FHIRPersistenceContext context, Builder operationOutcomeResult, java.time.Instant tstamp, String resourceLogicalId) throws FHIRPersistenceException {
public int reindex(FHIRPersistenceContext context, Builder operationOutcomeResult, java.time.Instant tstamp, List<Long> logicalResourceIds,
String resourceLogicalId) throws FHIRPersistenceException {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ public int reindex(
FHIRPersistenceContext context,
Builder operationOutcomeResult,
java.time.Instant tstamp,
List<Long> logicalResourceIds,
String resourceLogicalId) throws FHIRPersistenceException {
throw new UnsupportedOperationException();
}
Expand Down
Loading

0 comments on commit b9019e3

Please sign in to comment.