Skip to content

Commit

Permalink
issue #3437 use identity cache for more ids
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Arnold <robin.arnold@ibm.com>
  • Loading branch information
punktilious committed Jun 1, 2022
1 parent 24bb883 commit 3e6699d
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public ConfigureConnectionDAO() {

@Override
public void run(IDatabaseTranslator translator, Connection c) {
final String SQL = "SET LOCAL citus.multi_shard_modify_mode TO 'sequential'";
// we need this behavior for all transactions on this connection, so
// we use SET SESSION instead of SET LOCAL
final String SQL = "SET SESSION citus.multi_shard_modify_mode TO 'sequential'";

try (Statement s = c.createStatement()) {
s.executeUpdate(SQL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,51 @@ public interface IdentityCache {
*/
Long getCommonCanonicalValueId(short shardKey, String url);

/**
* Add the common canonical value to the cache
* @param shardKey
* @param url
* @param commonCanonicalValueId
*/
void addCommonCanonicalValue(short shardKey, String url, long commonCanonicalValueId);

/**
* Add the common token value to the cache
* @param shardKey
* @param codeSystem
* @param tokenValue
* @param commonTokenValueId
*/
void addCommonTokenValue(short shardKey, String codeSystem, String tokenValue, long commonTokenValueId);

/**
* Add the code system value to the cache
* @param codeSystem
* @param codeSystemId
*/
void addCodeSystem(String codeSystem, int codeSystemId);

/**
* Get the database resource_type_id value for the given resourceType value
* @param resourceType
* @return
* @throws IllegalArgumentException if resourceType is not a valid resource type name
*/
int getResourceTypeId(String resourceType);

/**
* Get the database logical_resource_id for the given resourceType/logicalId tuple.
* @param resourceType
* @param logicalId
* @return
*/
Long getLogicalResourceIdentId(String resourceType, String logicalId);

/**
* Add the logical_resource_ident mapping to the cache
* @param resourceType
* @param logicalId
* @param logicalResourceId
*/
void addLogicalResourceIdent(String resourceType, String logicalId, long logicalResourceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,13 @@ public void run() throws FHIRPersistenceException {
private void initIdentityCache() throws FHIRPersistenceException {
logger.info("Initializing identity cache");
identityCache = new IdentityCacheImpl(
1000, Duration.ofSeconds(3600),
10000, Duration.ofSeconds(3600),
1000, Duration.ofSeconds(3600));
1000, Duration.ofSeconds(86400), // code systems
10000, Duration.ofSeconds(86400), // common token values
1000, Duration.ofSeconds(86400), // common canonical values
100000, Duration.ofSeconds(86400)); // logical resource idents
CacheLoader loader = new CacheLoader(identityCache);

// prefill the cache
try (Connection connection = connectionProvider.getConnection()) {
loader.apply(connection);
connection.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.ibm.fhir.remote.index.api.IdentityCache;
import com.ibm.fhir.remote.index.database.CommonCanonicalValueKey;
import com.ibm.fhir.remote.index.database.CommonTokenValueKey;
import com.ibm.fhir.remote.index.database.LogicalResourceIdentKey;
import com.ibm.fhir.remote.index.database.ResourceTypeValue;

/**
Expand All @@ -27,6 +28,7 @@ public class IdentityCacheImpl implements IdentityCache {
private final Cache<String, Integer> codeSystemCache;
private final Cache<CommonTokenValueKey, Long> commonTokenValueCache;
private final Cache<CommonCanonicalValueKey, Long> commonCanonicalValueCache;
private final Cache<LogicalResourceIdentKey, Long> logicalResourceIdentCache;
private static final Integer NULL_INT = null;
private static final Long NULL_LONG = null;

Expand All @@ -35,7 +37,8 @@ public class IdentityCacheImpl implements IdentityCache {
*/
public IdentityCacheImpl(int maxCodeSystemCacheSize, Duration codeSystemCacheDuration,
long maxCommonTokenCacheSize, Duration commonTokenCacheDuration,
long maxCommonCanonicalCacheSize, Duration commonCanonicalCacheDuration) {
long maxCommonCanonicalCacheSize, Duration commonCanonicalCacheDuration,
long maxLogicalResourceIdentCacheSize, Duration logicalResourceIdentCacheDuration) {
codeSystemCache = Caffeine.newBuilder()
.maximumSize(maxCodeSystemCacheSize)
.expireAfterWrite(codeSystemCacheDuration)
Expand All @@ -48,6 +51,10 @@ public IdentityCacheImpl(int maxCodeSystemCacheSize, Duration codeSystemCacheDur
.maximumSize(maxCommonCanonicalCacheSize)
.expireAfterWrite(commonCanonicalCacheDuration)
.build();
logicalResourceIdentCache = Caffeine.newBuilder()
.maximumSize(maxLogicalResourceIdentCacheSize)
.expireAfterWrite(logicalResourceIdentCacheDuration)
.build();
}

/**
Expand Down Expand Up @@ -95,4 +102,29 @@ public int getResourceTypeId(String resourceType) {
}
return resourceTypeId;
}

@Override
public void addCommonCanonicalValue(short shardKey, String url, long commonCanonicalValueId) {
this.commonCanonicalValueCache.put(new CommonCanonicalValueKey(shardKey, url), commonCanonicalValueId);
}

@Override
public void addCommonTokenValue(short shardKey, String codeSystem, String tokenValue, long commonTokenValueId) {
this.commonTokenValueCache.put(new CommonTokenValueKey(shardKey, codeSystem, tokenValue), commonTokenValueId);
}

@Override
public void addCodeSystem(String codeSystem, int codeSystemId) {
this.codeSystemCache.put(codeSystem, codeSystemId);
}

@Override
public Long getLogicalResourceIdentId(String resourceType, String logicalId) {
return logicalResourceIdentCache.get(new LogicalResourceIdentKey(resourceType, logicalId), k -> NULL_LONG);
}

@Override
public void addLogicalResourceIdent(String resourceType, String logicalId, long logicalResourceId) {
logicalResourceIdentCache.put(new LogicalResourceIdentKey(resourceType, logicalId), logicalResourceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected void endTransaction() throws FHIRPersistenceException {
// any values from parameter_names, code_systems and common_token_values
// are now committed to the database, so we can publish their record ids
// to the shared cache which makes them accessible from other threads
publishCachedValues();
publishValuesToCache();
} else {
// something went wrong...try to roll back the transaction before we close
// everything
Expand All @@ -188,28 +188,41 @@ protected void endTransaction() throws FHIRPersistenceException {
} catch (SQLException x) {
throw new FHIRPersistenceException("commit failed", x);
} finally {
if (!committed) {
// The maps may contain ids that were not committed to the database so
// we should clean them out in case we decide to reuse this consumer
this.logicalResourceIdentMap.clear();
this.parameterNameMap.clear();
this.codeSystemValueMap.clear();
this.commonTokenValueMap.clear();
this.commonCanonicalValueMap.clear();
}
// always clear these maps because otherwise they could grow unbounded. Values
// are cached by the identityCache
this.logicalResourceIdentMap.clear();
this.parameterNameMap.clear();
this.codeSystemValueMap.clear();
this.commonTokenValueMap.clear();
this.commonCanonicalValueMap.clear();
}
}

/**
* After the transaction has been committed, we can publish certain values to the
* shared identity caches
* shared identity caches allowing them to be used by other threads
*/
private void publishCachedValues() {
// all the unresolvedParameterNames should be resolved at this point
private void publishValuesToCache() {
for (ParameterNameValue pnv: this.unresolvedParameterNames) {
logger.fine(() -> "Adding parameter-name to cache: '" + pnv.getParameterName() + "' -> " + pnv.getParameterNameId());
identityCache.addParameterName(pnv.getParameterName(), pnv.getParameterNameId());
}

for (CommonCanonicalValue value: this.unresolvedCanonicalValues) {
identityCache.addCommonCanonicalValue(FIXED_SHARD, value.getUrl(), value.getCanonicalId());
}

for (CodeSystemValue value: this.unresolvedSystemValues) {
identityCache.addCodeSystem(value.getCodeSystem(), value.getCodeSystemId());
}

for (CommonTokenValue value: this.unresolvedTokenValues) {
identityCache.addCommonTokenValue(FIXED_SHARD, value.getCodeSystemValue().getCodeSystem(), value.getTokenValue(), value.getCommonTokenValueId());
}

for (LogicalResourceIdentValue value: this.unresolvedLogicalResourceIdents) {
identityCache.addLogicalResourceIdent(value.getResourceType(), value.getLogicalId(), value.getLogicalResourceId());
}
}

@Override
Expand Down Expand Up @@ -386,7 +399,7 @@ private CommonTokenValue lookupCommonTokenValue(String codeSystem, String tokenV
}

/**
* Get the LogicalReosurceIdentValue we've assigned for the given (resourceType, logicalId)
* Get the LogicalResourceIdentValue we've assigned for the given (resourceType, logicalId)
* tuple. The returned value may not yet have the actual logical_resource_id yet - we fetch
* these values later and create new database records as necessary
* @param resourceType
Expand All @@ -403,7 +416,15 @@ private LogicalResourceIdentValue lookupLogicalResourceIdentValue(String resourc
.withLogicalId(logicalId)
.build();
this.logicalResourceIdentMap.put(key, result);
this.unresolvedLogicalResourceIdents.add(result);

// see if we can find the logical_resource_id from the cache
Long logicalResourceId = identityCache.getLogicalResourceIdentId(resourceType, logicalId);
if (logicalResourceId != null) {
result.setLogicalResourceId(logicalResourceId);
} else {
// Add to the unresolved list to look up later
this.unresolvedLogicalResourceIdents.add(result);
}
}
return result;
}
Expand Down

0 comments on commit 3e6699d

Please sign in to comment.