diff --git a/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/citus/ConfigureConnectionDAO.java b/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/citus/ConfigureConnectionDAO.java index c578a9f7c06..039c07682b0 100644 --- a/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/citus/ConfigureConnectionDAO.java +++ b/fhir-database-utils/src/main/java/com/ibm/fhir/database/utils/citus/ConfigureConnectionDAO.java @@ -35,7 +35,9 @@ public ConfigureConnectionDAO() { @Override public void run(IDatabaseTranslator translator, Connection c) { - final String SQL = "SET LOCAL citus.multi_shard_modify_mode TO 'sequential'"; + // we need this behavior for all transactions on this connection, so + // we use SET SESSION instead of SET LOCAL + final String SQL = "SET SESSION citus.multi_shard_modify_mode TO 'sequential'"; try (Statement s = c.createStatement()) { s.executeUpdate(SQL); diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/api/IdentityCache.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/api/IdentityCache.java index 1f731792db3..786a3e952d5 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/api/IdentityCache.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/api/IdentityCache.java @@ -49,6 +49,30 @@ public interface IdentityCache { */ Long getCommonCanonicalValueId(short shardKey, String url); + /** + * Add the common canonical value to the cache + * @param shardKey + * @param url + * @param commonCanonicalValueId + */ + void addCommonCanonicalValue(short shardKey, String url, long commonCanonicalValueId); + + /** + * Add the common token value to the cache + * @param shardKey + * @param codeSystem + * @param tokenValue + * @param commonTokenValueId + */ + void addCommonTokenValue(short shardKey, String codeSystem, String tokenValue, long commonTokenValueId); + + /** + * Add the code system value to the cache + * @param codeSystem + * @param codeSystemId + */ + void addCodeSystem(String codeSystem, int codeSystemId); + /** * Get the database resource_type_id value for the given resourceType value * @param resourceType @@ -56,4 +80,20 @@ public interface IdentityCache { * @throws IllegalArgumentException if resourceType is not a valid resource type name */ int getResourceTypeId(String resourceType); + + /** + * Get the database logical_resource_id for the given resourceType/logicalId tuple. + * @param resourceType + * @param logicalId + * @return + */ + Long getLogicalResourceIdentId(String resourceType, String logicalId); + + /** + * Add the logical_resource_ident mapping to the cache + * @param resourceType + * @param logicalId + * @param logicalResourceId + */ + void addLogicalResourceIdent(String resourceType, String logicalId, long logicalResourceId); } diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java index 6225ba5dedf..6f66f037a2f 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/app/Main.java @@ -262,10 +262,13 @@ public void run() throws FHIRPersistenceException { private void initIdentityCache() throws FHIRPersistenceException { logger.info("Initializing identity cache"); identityCache = new IdentityCacheImpl( - 1000, Duration.ofSeconds(3600), - 10000, Duration.ofSeconds(3600), - 1000, Duration.ofSeconds(3600)); + 1000, Duration.ofSeconds(86400), // code systems + 10000, Duration.ofSeconds(86400), // common token values + 1000, Duration.ofSeconds(86400), // common canonical values + 100000, Duration.ofSeconds(86400)); // logical resource idents CacheLoader loader = new CacheLoader(identityCache); + + // prefill the cache try (Connection connection = connectionProvider.getConnection()) { loader.apply(connection); connection.commit(); diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/cache/IdentityCacheImpl.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/cache/IdentityCacheImpl.java index a0a43164daf..9b73330e0f8 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/cache/IdentityCacheImpl.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/cache/IdentityCacheImpl.java @@ -15,6 +15,7 @@ import com.ibm.fhir.remote.index.api.IdentityCache; import com.ibm.fhir.remote.index.database.CommonCanonicalValueKey; import com.ibm.fhir.remote.index.database.CommonTokenValueKey; +import com.ibm.fhir.remote.index.database.LogicalResourceIdentKey; import com.ibm.fhir.remote.index.database.ResourceTypeValue; /** @@ -27,6 +28,7 @@ public class IdentityCacheImpl implements IdentityCache { private final Cache codeSystemCache; private final Cache commonTokenValueCache; private final Cache commonCanonicalValueCache; + private final Cache logicalResourceIdentCache; private static final Integer NULL_INT = null; private static final Long NULL_LONG = null; @@ -35,7 +37,8 @@ public class IdentityCacheImpl implements IdentityCache { */ public IdentityCacheImpl(int maxCodeSystemCacheSize, Duration codeSystemCacheDuration, long maxCommonTokenCacheSize, Duration commonTokenCacheDuration, - long maxCommonCanonicalCacheSize, Duration commonCanonicalCacheDuration) { + long maxCommonCanonicalCacheSize, Duration commonCanonicalCacheDuration, + long maxLogicalResourceIdentCacheSize, Duration logicalResourceIdentCacheDuration) { codeSystemCache = Caffeine.newBuilder() .maximumSize(maxCodeSystemCacheSize) .expireAfterWrite(codeSystemCacheDuration) @@ -48,6 +51,10 @@ public IdentityCacheImpl(int maxCodeSystemCacheSize, Duration codeSystemCacheDur .maximumSize(maxCommonCanonicalCacheSize) .expireAfterWrite(commonCanonicalCacheDuration) .build(); + logicalResourceIdentCache = Caffeine.newBuilder() + .maximumSize(maxLogicalResourceIdentCacheSize) + .expireAfterWrite(logicalResourceIdentCacheDuration) + .build(); } /** @@ -95,4 +102,29 @@ public int getResourceTypeId(String resourceType) { } return resourceTypeId; } + + @Override + public void addCommonCanonicalValue(short shardKey, String url, long commonCanonicalValueId) { + this.commonCanonicalValueCache.put(new CommonCanonicalValueKey(shardKey, url), commonCanonicalValueId); + } + + @Override + public void addCommonTokenValue(short shardKey, String codeSystem, String tokenValue, long commonTokenValueId) { + this.commonTokenValueCache.put(new CommonTokenValueKey(shardKey, codeSystem, tokenValue), commonTokenValueId); + } + + @Override + public void addCodeSystem(String codeSystem, int codeSystemId) { + this.codeSystemCache.put(codeSystem, codeSystemId); + } + + @Override + public Long getLogicalResourceIdentId(String resourceType, String logicalId) { + return logicalResourceIdentCache.get(new LogicalResourceIdentKey(resourceType, logicalId), k -> NULL_LONG); + } + + @Override + public void addLogicalResourceIdent(String resourceType, String logicalId, long logicalResourceId) { + logicalResourceIdentCache.put(new LogicalResourceIdentKey(resourceType, logicalId), logicalResourceId); + } } \ No newline at end of file diff --git a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainPostgresMessageHandler.java b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainPostgresMessageHandler.java index 1e13867e30b..ec707641708 100644 --- a/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainPostgresMessageHandler.java +++ b/fhir-remote-index/src/main/java/com/ibm/fhir/remote/index/database/PlainPostgresMessageHandler.java @@ -170,7 +170,7 @@ protected void endTransaction() throws FHIRPersistenceException { // any values from parameter_names, code_systems and common_token_values // are now committed to the database, so we can publish their record ids // to the shared cache which makes them accessible from other threads - publishCachedValues(); + publishValuesToCache(); } else { // something went wrong...try to roll back the transaction before we close // everything @@ -188,28 +188,41 @@ protected void endTransaction() throws FHIRPersistenceException { } catch (SQLException x) { throw new FHIRPersistenceException("commit failed", x); } finally { - if (!committed) { - // The maps may contain ids that were not committed to the database so - // we should clean them out in case we decide to reuse this consumer - this.logicalResourceIdentMap.clear(); - this.parameterNameMap.clear(); - this.codeSystemValueMap.clear(); - this.commonTokenValueMap.clear(); - this.commonCanonicalValueMap.clear(); - } + // always clear these maps because otherwise they could grow unbounded. Values + // are cached by the identityCache + this.logicalResourceIdentMap.clear(); + this.parameterNameMap.clear(); + this.codeSystemValueMap.clear(); + this.commonTokenValueMap.clear(); + this.commonCanonicalValueMap.clear(); } } /** * After the transaction has been committed, we can publish certain values to the - * shared identity caches + * shared identity caches allowing them to be used by other threads */ - private void publishCachedValues() { - // all the unresolvedParameterNames should be resolved at this point + private void publishValuesToCache() { for (ParameterNameValue pnv: this.unresolvedParameterNames) { logger.fine(() -> "Adding parameter-name to cache: '" + pnv.getParameterName() + "' -> " + pnv.getParameterNameId()); identityCache.addParameterName(pnv.getParameterName(), pnv.getParameterNameId()); } + + for (CommonCanonicalValue value: this.unresolvedCanonicalValues) { + identityCache.addCommonCanonicalValue(FIXED_SHARD, value.getUrl(), value.getCanonicalId()); + } + + for (CodeSystemValue value: this.unresolvedSystemValues) { + identityCache.addCodeSystem(value.getCodeSystem(), value.getCodeSystemId()); + } + + for (CommonTokenValue value: this.unresolvedTokenValues) { + identityCache.addCommonTokenValue(FIXED_SHARD, value.getCodeSystemValue().getCodeSystem(), value.getTokenValue(), value.getCommonTokenValueId()); + } + + for (LogicalResourceIdentValue value: this.unresolvedLogicalResourceIdents) { + identityCache.addLogicalResourceIdent(value.getResourceType(), value.getLogicalId(), value.getLogicalResourceId()); + } } @Override @@ -386,7 +399,7 @@ private CommonTokenValue lookupCommonTokenValue(String codeSystem, String tokenV } /** - * Get the LogicalReosurceIdentValue we've assigned for the given (resourceType, logicalId) + * Get the LogicalResourceIdentValue we've assigned for the given (resourceType, logicalId) * tuple. The returned value may not yet have the actual logical_resource_id yet - we fetch * these values later and create new database records as necessary * @param resourceType @@ -403,7 +416,15 @@ private LogicalResourceIdentValue lookupLogicalResourceIdentValue(String resourc .withLogicalId(logicalId) .build(); this.logicalResourceIdentMap.put(key, result); - this.unresolvedLogicalResourceIdents.add(result); + + // see if we can find the logical_resource_id from the cache + Long logicalResourceId = identityCache.getLogicalResourceIdentId(resourceType, logicalId); + if (logicalResourceId != null) { + result.setLogicalResourceId(logicalResourceId); + } else { + // Add to the unresolved list to look up later + this.unresolvedLogicalResourceIdents.add(result); + } } return result; }