Skip to content

Commit

Permalink
Merge pull request #2519 from IBM/robin-perf-eval
Browse files Browse the repository at this point in the history
Issues 1921, 1923, 2348, and 2499 - whole-system search updates and related changes
  • Loading branch information
punktilious authored Jun 25, 2021
2 parents 553df99 + ac75a32 commit 583b798
Show file tree
Hide file tree
Showing 133 changed files with 5,924 additions and 1,288 deletions.
37 changes: 34 additions & 3 deletions fhir-bucket/src/main/java/com/ibm/fhir/bucket/app/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
import com.ibm.fhir.task.api.ITaskGroup;
import com.ibm.fhir.task.core.service.TaskService;

/**
* The fhir-bucket application for loading data from COS into a FHIR server
* and tracking the returned ids along with response times.
*/
public class Main {
private static final Logger logger = Logger.getLogger(Main.class.getName());
private static final int DEFAULT_CONNECTION_POOL_SIZE = 10;
Expand Down Expand Up @@ -191,6 +195,12 @@ public class Main {
// How many reindex calls should we run in parallel
private int reindexConcurrentRequests = 1;

// The number of patients to fetch into the buffer
private int patientBufferSize = 500000;

// How many times should we cycle through the patient buffer before refilling
private int bufferRecycleCount = 1;

/**
* Parse command line arguments
* @param args
Expand All @@ -209,6 +219,13 @@ public void parseArgs(String[] args) {
case "--create-schema":
this.createSchema = true;
break;
case "--schema-name":
if (i < args.length + 1) {
this.schemaName = args[++i];
} else {
throw new IllegalArgumentException("missing value for --schema-name");
}
break;
case "--cos-properties":
if (i < args.length + 1) {
loadCosProperties(args[++i]);
Expand Down Expand Up @@ -356,6 +373,20 @@ public void parseArgs(String[] args) {
throw new IllegalArgumentException("missing value for --max-resources-per-bundle");
}
break;
case "--patient-buffer-size":
if (i < args.length + 1) {
this.patientBufferSize = Integer.parseInt(args[++i]);
} else {
throw new IllegalArgumentException("missing value for --patient-buffer-size");
}
break;
case "--buffer-recycle-count":
if (i < args.length + 1) {
this.bufferRecycleCount = Integer.parseInt(args[++i]);
} else {
throw new IllegalArgumentException("missing value for --buffer-recycle-count");
}
break;
case "--incremental":
this.incremental = true;
break;
Expand Down Expand Up @@ -653,10 +684,10 @@ public void bootstrapDb() {

if (adapter.getTranslator().getType() == DbType.POSTGRESQL) {
// Postgres doesn't support batched merges, so we go with a simpler UPSERT
MergeResourceTypesPostgres mrt = new MergeResourceTypesPostgres(resourceTypes);
MergeResourceTypesPostgres mrt = new MergeResourceTypesPostgres(schemaName, resourceTypes);
adapter.runStatement(mrt);
} else {
MergeResourceTypes mrt = new MergeResourceTypes(resourceTypes);
MergeResourceTypes mrt = new MergeResourceTypes(schemaName, resourceTypes);
adapter.runStatement(mrt);
}
} catch (Exception x) {
Expand Down Expand Up @@ -825,7 +856,7 @@ protected void scanAndLoad() {
if (this.concurrentPayerRequests > 0 && fhirClient != null) {
// set up the CMS payer thread to add some read-load to the system
InteropScenario scenario = new InteropScenario(this.fhirClient);
cmsPayerWorkload = new InteropWorkload(dataAccess, scenario, concurrentPayerRequests, 500000);
cmsPayerWorkload = new InteropWorkload(dataAccess, scenario, concurrentPayerRequests, this.patientBufferSize, this.bufferRecycleCount);
cmsPayerWorkload.init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,52 +32,62 @@ public class InteropWorkload {

// The scenario we use to process each randomly picked patient
private final IPatientScenario patientScenario;

// the maximum number of requests we permit
private final int maxConcurrentRequests;

private final Lock lock = new ReentrantLock();
private final Condition capacityCondition = lock.newCondition();

private volatile int runningRequests;

private volatile boolean running = true;

// The thread running the main loop
private Thread thread;

// How many patients should we load into the buffer
private final int patientBufferSize;


// How many times should we use the same set of patient ids?
private final int bufferRecycleCount;

// Access to the FHIRBATCH schema
private final DataAccess dataAccess;

// thread pool for processing requests
private final ExecutorService pool = Executors.newCachedThreadPool();

// for picking random patient ids
private final SecureRandom random = new SecureRandom();

private long statsResetTime = -1;
private final AtomicInteger fhirRequests = new AtomicInteger();
private final AtomicLong fhirRequestTime = new AtomicLong();
private final AtomicInteger resourceCount = new AtomicInteger();

// how many nanoseconds between stats reports
private static final long STATS_REPORT_TIME = 10L * 1000000000L;

/**
* Public constructor
* @param client
* @param dataAccess
* @param patientScenario
* @param maxConcurrentRequests
* @param patientBufferSize
* @param bufferRecycleCount
*/
public InteropWorkload(DataAccess dataAccess, IPatientScenario patientScenario, int maxConcurrentRequests, int patientBufferSize) {
public InteropWorkload(DataAccess dataAccess, IPatientScenario patientScenario, int maxConcurrentRequests, int patientBufferSize, int bufferRecycleCount) {
if (bufferRecycleCount < 1) {
throw new IllegalArgumentException("bufferRecycleCount must be >= 1");
}
this.dataAccess = dataAccess;
this.patientScenario = patientScenario;
this.maxConcurrentRequests = maxConcurrentRequests;
this.patientBufferSize = patientBufferSize;
this.bufferRecycleCount = bufferRecycleCount;
}


/**
* Start the main loop
Expand All @@ -90,49 +100,49 @@ public void init() {
thread = new Thread(() -> mainLoop());
thread.start();
}

public void signalStop() {
this.running = false;

lock.lock();
try {
// wake up the thread if it's waiting on the capacity condition
capacityCondition.signalAll();
} finally {
lock.unlock();
}


// try to break into any IO operation for a quicker exit
if (thread != null) {
thread.interrupt();
}

// make sure the pool doesn't start new work
pool.shutdown();
}

/**
* Wait until things are stopped
*/
public void waitForStop() {
if (this.running) {
signalStop();
}

try {
pool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException x) {
logger.warning("Wait for pool shutdown interrupted");
}
}

/**
* The main loop in this object which starts when {@link #init()} is called
* and will run until {@link #shutdown()}.
*/
protected void mainLoop() {

// How many samples have we taken from the current buffer?
int samples = 0;
// The list of patientIds we process
Expand All @@ -142,13 +152,13 @@ protected void mainLoop() {
while (this.running) {

try {
if (patientIdBuffer.isEmpty() || samples > patientIdBuffer.size()) {
if (patientIdBuffer.isEmpty() || samples > patientIdBuffer.size() * this.bufferRecycleCount) {
// Refill the buffer of patient ids. There might be more available now
patientIdBuffer.clear();
samples = 0;
dataAccess.selectRandomPatientIds(patientIdBuffer, this.patientBufferSize);
}

// calculate how many requests we want to submit from the buffer, based
// on the maxConcurrentRequests.
int batchSize = 0;
Expand All @@ -157,8 +167,8 @@ protected void mainLoop() {
while (running && runningRequests == maxConcurrentRequests) {
capacityCondition.await(5, TimeUnit.SECONDS);
}
// Submit as many requests as we have available. If we have a small

// Submit as many requests as we have available. If we have a small
// patient buffer, then patients are more likely to be picked more than once
int freeCapacity = maxConcurrentRequests - runningRequests;
batchSize = Math.min(patientIdBuffer.size(), freeCapacity);
Expand All @@ -171,7 +181,7 @@ protected void mainLoop() {
} finally {
lock.unlock();
}

// Submit a request for each allocated patient to the thread pool
for (int i=0; i<batchSize && running; i++) {
// pick a random patient id in the buffer
Expand All @@ -180,7 +190,7 @@ protected void mainLoop() {
samples++; // track how many times we've sampled from the buffer
pool.submit(() -> processPatientThr(patientId));
}

long now = System.nanoTime();
if (now >= nextStatsReport) {
// Time to report average throughput stats
Expand All @@ -191,10 +201,10 @@ protected void mainLoop() {
if (this.fhirRequests.get() > 0) {
avgResponseTime = this.fhirRequestTime.get() / 1e9 / this.fhirRequests.get();
}
logger.info(String.format("STATS: FHIR=%7.1f calls/s, rate=%7.1f resources/s, response time=%5.3f s",

logger.info(String.format("STATS: FHIR=%7.1f calls/s, rate=%7.1f resources/s, response time=%5.3f s",
avgCallPerSecond, avgResourcesPerSecond, avgResponseTime));

// Reset the stats for the next report window
statsStartTime = now;
nextStatsReport = now + STATS_REPORT_TIME;
Expand Down Expand Up @@ -231,7 +241,7 @@ private void processPatientThr(String patientId) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Processing patient: '" + patientId + "'");
}

patientScenario.process(patientId, fhirRequests, fhirRequestTime, resourceCount);
} catch (Exception x) {
logger.log(Level.SEVERE, "Processing patient '" + patientId + "'" , x);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2020
* (C) Copyright IBM Corp. 2020, 2021
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand All @@ -16,6 +16,7 @@

import com.ibm.fhir.database.utils.api.IDatabaseSupplier;
import com.ibm.fhir.database.utils.api.IDatabaseTranslator;
import com.ibm.fhir.database.utils.common.DataDefinitionUtil;
import com.ibm.fhir.database.utils.model.DbType;

/**
Expand All @@ -25,18 +26,24 @@
public class AddBucketPath implements IDatabaseSupplier<Long> {
private static final Logger logger = Logger.getLogger(RegisterLoaderInstance.class.getName());

// The name of the schema holding the FHIRBUCKET tables
private final String schemaName;

// The name of the bucket
private final String bucketName;

// The path in which the item resides...basically the key up and including the last /
private final String bucketPath;


/**
* Public constructor
* @param schemaName
* @param bucketName
* @param bucketPath
*/
public AddBucketPath(String bucketName, String bucketPath) {
public AddBucketPath(String schemaName, String bucketName, String bucketPath) {
this.schemaName = schemaName;
this.bucketName = bucketName;
this.bucketPath = bucketPath;
}
Expand All @@ -48,15 +55,16 @@ public Long run(IDatabaseTranslator translator, Connection c) {
// MERGE and upsert-like tricks don't appear to work with Derby
// when using autogenerated identity columns. So we have to
// try the old-fashioned way and handle duplicate key
String dml;
final String bucketPaths = DataDefinitionUtil.getQualifiedName(schemaName, "bucket_paths");
final String dml;
if (translator.getType() == DbType.POSTGRESQL) {
// For POSTGRES, if a statement fails it causes the whole transaction
// to fail, so we need turn this into an UPSERT
dml = "INSERT INTO bucket_paths (bucket_name, bucket_path) VALUES (?,?) ON CONFLICT(bucket_name, bucket_path) DO NOTHING";
dml = "INSERT INTO " + bucketPaths + "(bucket_name, bucket_path) VALUES (?,?) ON CONFLICT(bucket_name, bucket_path) DO NOTHING";
} else {
dml = "INSERT INTO bucket_paths (bucket_name, bucket_path) VALUES (?,?)";
dml = "INSERT INTO " + bucketPaths + "(bucket_name, bucket_path) VALUES (?,?)";
}

try (PreparedStatement ps = c.prepareStatement(dml, Statement.RETURN_GENERATED_KEYS)) {
ps.setString(1, bucketName);
ps.setString(2, bucketPath);
Expand All @@ -77,11 +85,11 @@ public Long run(IDatabaseTranslator translator, Connection c) {
throw translator.translate(x);
}
}

// If we didn't create a new record, fetch the id of the existing record (we don't delete these
// records, so no chance of a race condition
if (bucketPathId == null) {
final String SQL = "SELECT bucket_path_id FROM bucket_paths WHERE bucket_name = ? AND bucket_path = ?";
final String SQL = "SELECT bucket_path_id FROM " + bucketPaths + " WHERE bucket_name = ? AND bucket_path = ?";
try (PreparedStatement ps = c.prepareStatement(SQL)) {
ps.setString(1, bucketName);
ps.setString(2, bucketPath);
Expand All @@ -96,7 +104,7 @@ public Long run(IDatabaseTranslator translator, Connection c) {
throw translator.translate(x);
}
}

return bucketPathId;
}
}
Loading

0 comments on commit 583b798

Please sign in to comment.