Skip to content

Commit

Permalink
issue #3437 initial derby unit test for fhir-remote-index
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Arnold <robin.arnold@ibm.com>
  • Loading branch information
punktilious committed Jun 3, 2022
1 parent fcf2c1e commit f2687cc
Show file tree
Hide file tree
Showing 13 changed files with 1,965 additions and 1,293 deletions.
23 changes: 23 additions & 0 deletions fhir-remote-index/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,29 @@
<artifactId>derbytools</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>fhir-persistence-schema</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>fhir-model</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.parsson</groupId>
<artifactId>jakarta.json</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.ibm.fhir.remote.index.cache.IdentityCacheImpl;
import com.ibm.fhir.remote.index.database.CacheLoader;
import com.ibm.fhir.remote.index.database.DistributedPostgresMessageHandler;
import com.ibm.fhir.remote.index.database.PlainDerbyMessageHandler;
import com.ibm.fhir.remote.index.database.PlainPostgresMessageHandler;
import com.ibm.fhir.remote.index.kafka.RemoteIndexConsumer;
import com.ibm.fhir.remote.index.sharded.ShardedPostgresMessageHandler;
Expand Down Expand Up @@ -332,7 +333,11 @@ private IMessageHandler buildHandler() throws FHIRPersistenceException {
case SHARDED:
return new ShardedPostgresMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs);
case PLAIN:
return new PlainPostgresMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs);
if (dbType == DbType.DERBY) {
return new PlainDerbyMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs);
} else {
return new PlainPostgresMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs);
}
case DISTRIBUTED:
return new DistributedPostgresMessageHandler(c, getSchemaName(), identityCache, maxReadyTimeMs);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import com.ibm.fhir.database.utils.postgres.PostgresTranslator;
import com.ibm.fhir.persistence.exception.FHIRPersistenceException;
import com.ibm.fhir.remote.index.api.IdentityCache;

Expand All @@ -22,7 +23,7 @@
* by a sequence, which means a slightly different INSERT statement
* in certain cases
*/
public class DistributedPostgresMessageHandler extends PlainPostgresMessageHandler {
public class DistributedPostgresMessageHandler extends PlainMessageHandler {
private static final Logger logger = Logger.getLogger(DistributedPostgresMessageHandler.class.getName());

/**
Expand All @@ -33,7 +34,12 @@ public class DistributedPostgresMessageHandler extends PlainPostgresMessageHandl
* @param maxReadyTimeMs
*/
public DistributedPostgresMessageHandler(Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) {
super(connection, schemaName, cache, maxReadyTimeMs);
super(new PostgresTranslator(), connection, schemaName, cache, maxReadyTimeMs);
}

@Override
protected String onConflict() {
return "ON CONFLICT DO NOTHING";
}

@Override
Expand All @@ -47,7 +53,8 @@ protected void addMissingCommonTokenValues(List<CommonTokenValue> missing) throw
insert.append(" OVERRIDING SYSTEM VALUE "); // we want to use our sequence number
insert.append(" VALUES (?,?,");
insert.append(nextVal); // next sequence value
insert.append(") ON CONFLICT DO NOTHING");
insert.append(") ");
insert.append(onConflict());

try (PreparedStatement ps = connection.prepareStatement(insert.toString())) {
int count = 0;
Expand Down Expand Up @@ -82,7 +89,8 @@ protected void addMissingCommonCanonicalValues(List<CommonCanonicalValue> missin
insert.append(" OVERRIDING SYSTEM VALUE "); // we want to use our sequence number
insert.append(" VALUES (?,");
insert.append(nextVal); // next sequence value
insert.append(") ON CONFLICT DO NOTHING");
insert.append(") ");
insert.append(onConflict());

final String DML = insert.toString();
if (logger.isLoggable(Level.FINE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public void process(String requestShard, String resourceType, String logicalId,
systemDao.addDate(logicalResourceId, parameterNameValue.getParameterNameId(), valueDateStart, valueDateEnd, p.getCompositeId());
}
} catch (SQLException x) {
logger.log(Level.SEVERE, "DateParameter", x);
throw new FHIRPersistenceException("Failed inserting date params for '" + resourceType + "'");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.remote.index.database;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.ibm.fhir.database.utils.derby.DerbyTranslator;
import com.ibm.fhir.persistence.exception.FHIRPersistenceException;
import com.ibm.fhir.remote.index.api.IdentityCache;

/**
* Derby variant of the plain schema message handler which is needed because Derby
* needs slightly different syntax for some queries
*/
public class PlainDerbyMessageHandler extends PlainMessageHandler {
private static final Logger logger = Logger.getLogger(PlainDerbyMessageHandler.class.getName());

/**
* Public constructor
* @param connection
* @param schemaName
* @param cache
* @param maxReadyTimeMs
*/
public PlainDerbyMessageHandler(Connection connection, String schemaName, IdentityCache cache, long maxReadyTimeMs) {
super(new DerbyTranslator(), connection, schemaName, cache, maxReadyTimeMs);
}

@Override
protected String onConflict() {
return "";
}

@Override
protected PreparedStatement buildLogicalResourceIdentSelectStatement(List<LogicalResourceIdentValue> values) throws SQLException {
StringBuilder query = new StringBuilder();
query.append("SELECT rt.resource_type, lri.logical_id, lri.logical_resource_id ");
query.append(" FROM logical_resource_ident AS lri ");
query.append(" JOIN resource_types AS rt ON (rt.resource_type_id = lri.resource_type_id)");
query.append(" WHERE ");
for (int i=0; i<values.size(); i++) {
if (i > 0) {
query.append(" OR ");
}
query.append("(lri.resource_type_id = ? AND lri.logical_id = ?)");
}
PreparedStatement ps = connection.prepareStatement(query.toString());
// bind the parameter values
int param = 1;
for (LogicalResourceIdentValue val: values) {
ps.setInt(param++, val.getResourceTypeId());
ps.setString(param++, val.getLogicalId());
}
logger.fine(() -> "logicalResourceIdents: " + query.toString());
return ps;
}

@Override
protected Integer createParameterName(String parameterName) throws SQLException {
Integer parameterNameId = getNextRefId();
final String insertParameterName = ""
+ "INSERT INTO parameter_names (parameter_name_id, parameter_name) "
+ " VALUES (?, ?)";
try (PreparedStatement stmt = connection.prepareStatement(insertParameterName)) {
stmt.setInt(1, parameterNameId);
stmt.setString(2, parameterName);
stmt.execute();
}

return parameterNameId;
}

@Override
protected PreparedStatementWrapper buildCommonTokenValueSelectStatement(List<CommonTokenValue> values) throws SQLException {
StringBuilder query = new StringBuilder();
// need the code_system name - so we join back to the code_systems table as well
query.append("SELECT cs.code_system_name, c.token_value, c.common_token_value_id ");
query.append(" FROM common_token_values c");
query.append(" JOIN code_systems cs ON (cs.code_system_id = c.code_system_id)");
query.append(" WHERE ");

// Create a (codeSystem, tokenValue) tuple for each of the CommonTokenValue records
boolean first = true;
for (CommonTokenValue ctv: values) {
if (first) {
first = false;
} else {
query.append(" OR ");
}
query.append("(c.code_system_id = ");
query.append(ctv.getCodeSystemValue().getCodeSystemId()); // literal for code_system_id
query.append(" AND c.token_value = ?)");
}

// Create the prepared statement and bind the values
final String statementText = query.toString();
PreparedStatement ps = connection.prepareStatement(statementText);

// bind the parameter values
int param = 1;
for (CommonTokenValue ctv: values) {
ps.setString(param++, ctv.getTokenValue());
}
return new PreparedStatementWrapper(statementText, ps);
}
@Override
protected void addMissingCommonCanonicalValues(List<CommonCanonicalValue> missing) throws FHIRPersistenceException {

final String nextVal = translator.nextValue(schemaName, "fhir_sequence");
StringBuilder insert = new StringBuilder();
insert.append("INSERT INTO common_canonical_values (url, canonical_id) ");
insert.append(" VALUES (?,");
insert.append(nextVal); // next sequence value
insert.append(") ");

final String DML = insert.toString();
if (logger.isLoggable(Level.FINE)) {
logger.fine("addMissingCanonicalIds: " + DML);
}
try (PreparedStatement ps = connection.prepareStatement(DML)) {
int count = 0;
for (CommonCanonicalValue ctv: missing) {
logger.finest(() -> "Adding canonical value [" + ctv.toString() + "]");
ps.setString(1, ctv.getUrl());
ps.addBatch();
if (++count == this.maxCommonCanonicalValuesPerStatement) {
// not too many statements in a single batch
ps.executeBatch();
count = 0;
}
}
if (count > 0) {
// final batch
ps.executeBatch();
}
} catch (SQLException x) {
logger.log(Level.SEVERE, "failed: " + insert.toString(), x);
throw new FHIRPersistenceException("failed inserting new common canonical values");
}
}

}
Loading

0 comments on commit f2687cc

Please sign in to comment.