Skip to content

Commit

Permalink
issue #3437 remote index kafka consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Arnold <robin.arnold@ibm.com>
  • Loading branch information
punktilious committed May 10, 2022
1 parent 68819ab commit 0752bbb
Show file tree
Hide file tree
Showing 26 changed files with 2,225 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ public class SearchParameterValue {
// The composite id used to tie together values belonging to the same composite parameter. Null for ordinary params.
private Integer compositeId;

// True if this parameter should also be stored at the whole-system level
private Boolean wholeSystem;

/**
* @return the name
*/
Expand Down Expand Up @@ -46,4 +49,26 @@ public Integer getCompositeId() {
public void setCompositeId(Integer compositeId) {
this.compositeId = compositeId;
}

/**
* @return the wholeSystem
*/
public Boolean getWholeSystem() {
return wholeSystem;
}

/**
* Returns true iff the wholeSystem property is not null and true
* @return
*/
public boolean isSystemParam() {
return this.wholeSystem != null && this.wholeSystem.booleanValue();
}

/**
* @param wholeSystem the wholeSystem to set
*/
public void setWholeSystem(Boolean wholeSystem) {
this.wholeSystem = wholeSystem;
}
}
4 changes: 4 additions & 0 deletions fhir-remote-index/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.remote.index.api;

import com.ibm.fhir.persistence.exception.FHIRPersistenceException;
import com.ibm.fhir.persistence.index.DateParameter;
import com.ibm.fhir.persistence.index.LocationParameter;
import com.ibm.fhir.persistence.index.NumberParameter;
import com.ibm.fhir.persistence.index.QuantityParameter;
import com.ibm.fhir.persistence.index.StringParameter;
import com.ibm.fhir.persistence.index.TokenParameter;
import com.ibm.fhir.remote.index.database.CodeSystemValue;
import com.ibm.fhir.remote.index.database.CommonTokenValue;
import com.ibm.fhir.remote.index.database.ParameterNameValue;

/**
* Processes batched parameters
*/
public interface BatchParameterProcessor {
/**
* Compute the shard key value use to distribute resources among nodes
* of the database
* @param resourceType
* @param logicalId
* @return
*/
short encodeShardKey(String resourceType, String logicalId);

/**
* @param resourceType
* @param logicalId
* @param logicalResourceId
* @param parameterNameValue
* @param parameter
*/
void process(String resourceType, String logicalId, long logicalResourceId, ParameterNameValue parameterNameValue, StringParameter parameter) throws FHIRPersistenceException;

/**
* @param resourceType
* @param logicalId
* @param logicalResourceId
* @param parameterNameValue
* @param parameter
*/
void process(String resourceType, String logicalId, long logicalResourceId, ParameterNameValue parameterNameValue, NumberParameter parameter) throws FHIRPersistenceException;

/**
* @param resourceType
* @param logicalId
* @param logicalResourceId
* @param parameterNameValue
* @param parameter
*/
void process(String resourceType, String logicalId, long logicalResourceId, ParameterNameValue parameterNameValue, QuantityParameter parameter, CodeSystemValue codeSystemValue) throws FHIRPersistenceException;

/**
* @param resourceType
* @param logicalId
* @param logicalResourceId
* @param parameterNameValue
* @param parameter
*/
void process(String resourceType, String logicalId, long logicalResourceId, ParameterNameValue parameterNameValue, LocationParameter parameter) throws FHIRPersistenceException;

/**
* @param resourceType
* @param logicalId
* @param logicalResourceId
* @param parameterNameValue
* @param parameter
*/
void process(String resourceType, String logicalId, long logicalResourceId, ParameterNameValue parameterNameValue, DateParameter parameter) throws FHIRPersistenceException;

/**
* @param resourceType
* @param logicalId
* @param logicalResourceId
* @param parameterNameValue
* @param parameter
*/
void process(String resourceType, String logicalId, long logicalResourceId, ParameterNameValue parameterNameValue, TokenParameter parameter, CommonTokenValue commonTokenValue) throws FHIRPersistenceException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.remote.index.api;

import com.ibm.fhir.persistence.exception.FHIRPersistenceException;
import com.ibm.fhir.remote.index.database.ParameterNameValue;

/**
* A parameter value batched for later processing
*/
public abstract class BatchParameterValue {
protected final ParameterNameValue parameterNameValue;
protected final String resourceType;
protected final String logicalId;
protected final long logicalResourceId;

/**
* Protected constructor
* @param resourceType
* @param logicalId
* @param logicalResourceId
* @param parameterNameValue
*/
protected BatchParameterValue(String resourceType, String logicalId, long logicalResourceId, ParameterNameValue parameterNameValue) {
this.resourceType = resourceType;
this.logicalId = logicalId;
this.logicalResourceId = logicalResourceId;
this.parameterNameValue = parameterNameValue;
}

/**
* Apply this parameter value to the target processor
* @param processor
*/
public abstract void apply(BatchParameterProcessor processor) throws FHIRPersistenceException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@

import java.util.List;

import com.ibm.fhir.persistence.exception.FHIRPersistenceException;

/**
* Our interface for handling messages received by the consumer. Used
* to decouple the Kafka consumer from the database persistence logic
*/
public interface IMessageHandler {

void process(List<String> messages);
void process(List<String> messages) throws FHIRPersistenceException;

/**
* Close any resources held by the handler
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.remote.index.api;


/**
* Interface to hides the implementation of various caches we use during
* ingestion persistence
*/
public interface IdentityCache {
/**
* Get the parameter_name_id value for the given parameterName
* @param parameterName
* @return the parameter_name_id or null if the value is not found in the cache
*/
Integer getParameterNameId(String parameterName);

/**
* Get the code_system_id value for the given codeSystem value
* @param codeSystem
* @return the code_system_id or null if the value is not found in the cache
*/
Integer getCodeSystemId(String codeSystem);

/**
* Get the common_token_value_id for the given codeSystem and tokenValue
* @param shardKey
* @param codeSystem
* @param tokenValue
* @return the common_token_value_id or null if the value is not found in the cache
*/
Long getCommonTokenValueId(short shardKey, String codeSystem, String tokenValue);

/**
* Add the given parameterName to parameterNameId mapping to the cache
* @param parameterName
* @param parameterNameId
*/
void addParameterName(String parameterName, int parameterNameId);
}
Loading

0 comments on commit 0752bbb

Please sign in to comment.