Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live waveform data in Emap #66

Open
wants to merge 108 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 104 commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
be8944d
First pass at adding waveform data to Emap
jeremyestein Jun 10, 2024
7911d23
Write test which checks some of the data and passes
jeremyestein Jun 12, 2024
50d34f8
checkstyle fixes and make test quicker
jeremyestein Jun 13, 2024
36f7a84
Merge branch 'develop' into jeremy/hf-data
jeremyestein Jun 14, 2024
4aff3a9
Clarify validation run docs
jeremyestein Jun 19, 2024
5407881
In new version of checkstyle, the encoding tag is no longer valid, and
jeremyestein Jun 19, 2024
a6e11d9
Remove duplicated file set_mvn_proxy.sh
jeremyestein Jun 20, 2024
39a1fa7
Bare bones waveform-reader service. Docker image builds, but is
jeremyestein Jun 20, 2024
cf27fe5
Initialise the fake UDS properly; add fake UDS and waveform reader to
jeremyestein Jun 25, 2024
ff7db6f
Temporarily break the core proc RabbitMQ listener by only listening to
jeremyestein Jun 25, 2024
d1c1c9c
Make data more realistic in terms of volume. Had to add some layers to
jeremyestein Jun 26, 2024
b7f10f2
Make message IDs more descriptive to enable better performance tracking
jeremyestein Jun 27, 2024
a3db3fc
Ten seconds of data per message
jeremyestein Jun 27, 2024
a16adbf
call saveAll rather than save many times
jeremyestein Jun 27, 2024
a209f90
Batch single-row INSERTs
jeremyestein Jun 27, 2024
9615106
Couple of other tweaks, not sure what it'll do, but need a baseline
jeremyestein Jun 27, 2024
e9bff1f
Add reWriteBatchedInserts option to db connection string so that batched
jeremyestein Jun 27, 2024
3193369
Does this help?
jeremyestein Jun 27, 2024
46ff9a4
Expicitly specify sequence Id generation so we don't have to query the
jeremyestein Jun 27, 2024
5361af2
Can't use existing sequence as it has a different increment value
jeremyestein Jul 1, 2024
e2635c8
Use SQL arrays (of double precision DB type)
jeremyestein Jul 2, 2024
7b9b01a
Fix copy constructor
jeremyestein Jul 3, 2024
182d2c2
Copy constructor checker didn't know about Double[]
jeremyestein Jul 3, 2024
5558e48
Put data from multiple patients to test it doesn't get mixed up. Shuffle
jeremyestein Jul 3, 2024
49d93d9
Make observation time in synthetic data evenly spread out, as real data
jeremyestein Jul 16, 2024
cd5567f
Make number of patients, start date, and warp factor configurable to
jeremyestein Jul 17, 2024
55e0c60
Add indexes to Waveform table
jeremyestein Jul 18, 2024
73619d4
Cassandra is still running out of memory
jeremyestein Jul 18, 2024
1c3cc09
Make configurable the rabbitmq queues that core will listen on,
jeremyestein Jul 19, 2024
52dbc3a
Make the synthetic data values use a variable amount of digits to test
jeremyestein Jul 19, 2024
cd56584
Set an end date time for synthetic data and exit when this is reached,
jeremyestein Jul 22, 2024
3fa518a
Add waveform-reader to validation script
jeremyestein Jul 22, 2024
f5f914b
Fix some copy paste errors
jeremyestein Jul 24, 2024
0b5b576
Schedule a task in the core proc to periodically delete old waveform
jeremyestein Jul 25, 2024
4113ba8
Receive messages on a TCP server port, with source IP filtering
jeremyestein Jul 26, 2024
8a9a2b0
Split into two services, one a fake for generating HL7 messages, and
jeremyestein Jul 26, 2024
1a3442d
Copied from project board
jeremyestein Jul 29, 2024
ba238c2
Merge pull request #42 from UCLH-DHCT/jeremy/hf-data
skeating Jul 30, 2024
1d73817
In a seperate process, generate synthetic messages to simulate an HL7
jeremyestein Jul 30, 2024
51af8d9
Generator does not need interchange
jeremyestein Aug 9, 2024
cf878e6
Remove more unnecessary dependencies
jeremyestein Aug 9, 2024
5eecc32
Merge pull request #22 from UCLH-DHCT/jeremy/hf-data
jeremyestein Aug 12, 2024
25ba727
Infer location visit from time and location string
jeremyestein Aug 13, 2024
27a154a
Add visit observation type field (+ tests) so we know what the
jeremyestein Aug 15, 2024
8bb5ff0
Differentiate between source and mapped location in test
jeremyestein Aug 15, 2024
1dd7fbc
Test that the mapped stream name gets through too
jeremyestein Aug 15, 2024
f8e2840
Update emap-tests.yaml
skeating Aug 15, 2024
945b946
Fix copy constructor
jeremyestein Aug 15, 2024
ddbfc4e
Merge pull request #49 from UCLH-DHCT/jeremy/hf-data
jeremyestein Aug 19, 2024
f62dbad
Generate real HL7 in the synth generator. Parse real HL7 with HAPI in
jeremyestein Aug 19, 2024
52f0742
Use correct line endings for generated HL7 messages
jeremyestein Aug 20, 2024
ee34e0d
Thin out the logging a bit
jeremyestein Aug 20, 2024
e08dcdb
Give the terser explicit indexes rather than building a query string;
jeremyestein Aug 20, 2024
f152dda
Use a simple, ad hoc parser for HL7. It's much quicker.
jeremyestein Aug 21, 2024
ceafdae
Use the correct year in the date format!
jeremyestein Aug 21, 2024
fd45fed
Generate message timestamp as slightly after the observation timestamp
jeremyestein Aug 21, 2024
33abe68
Make HL7 messages much, much smaller, more in line with the real ones.
jeremyestein Aug 21, 2024
3b901f3
Merge pull request #51 from UCLH-DHCT/jeremy/hf-data
jeremyestein Aug 27, 2024
3484c15
First go at collating tiny Waveform messages into bigger messages that
jeremyestein Aug 21, 2024
1b236b3
Use sample count rather than time as the message size target/limit.
jeremyestein Aug 28, 2024
ca110b5
More fine-grained locking, should allow better multithreaded message
jeremyestein Aug 30, 2024
8ed5ae7
Check number of samples in generated messages. Test what happens when
jeremyestein Sep 2, 2024
a96a8b2
Fix magic number in gap checking, tidy up tests and other things
jeremyestein Sep 9, 2024
aa782f9
Bring in metadata from ICU_HL7_Data_Extraction repo commit 05676f0aeb…
jeremyestein Sep 10, 2024
af9dcf7
Read stream metadata from CSV file and use it to describe the streams in
jeremyestein Sep 11, 2024
e434619
Need the stream id to make a unique message ID! Now all the messages get
jeremyestein Sep 17, 2024
2f156df
Remove/downgrade excessive logging
jeremyestein Sep 18, 2024
ee74b95
Add thread pools on the TCP listener and the HL7 message handler to see
jeremyestein Sep 18, 2024
6ed92f2
We need to allow lists of messages to queue to be from different
jeremyestein Sep 19, 2024
ea5098e
Merge pull request #52 from UCLH-DHCT/jeremy/hf-data-collate
jeremyestein Sep 19, 2024
d6fb58c
Define expected location mappings with realistic location strings
jeremyestein Sep 23, 2024
e47325c
Implement location mapping
jeremyestein Sep 23, 2024
50a17a9
Fix Capsule bed names
jeremyestein Sep 25, 2024
2e0c48c
Before looking at the gap validation problem, make the synth data mor…
jeremyestein Sep 23, 2024
5d7f255
Test the created visit_observation_type rows
jeremyestein Sep 25, 2024
390e03c
Use only the unrounded timestamp when detecting gaps. Differentiate
jeremyestein Sep 25, 2024
4673b45
Check that data out of the database resembles the original sine wave,
jeremyestein Sep 26, 2024
6cdd7f5
Add unit to waveform data
jeremyestein Sep 26, 2024
8e7ed08
Message separator is not actually CRLF
jeremyestein Sep 30, 2024
70d8e07
Read from a file containing an HL7 message dump from Smartlinx
jeremyestein Oct 2, 2024
2eba4de
Merge pull request #55 from UCLH-DHCT/jeremy/hf-data-metadata
jeremyestein Oct 3, 2024
bc6132e
Merge pull request #58 from UCLH-DHCT/jeremy/hf-data-location
jeremyestein Oct 3, 2024
a7f769f
Merge pull request #59 from UCLH-DHCT/jeremy/hf-data-gaps
jeremyestein Oct 3, 2024
dfed36c
Need to map the directory containing test data
jeremyestein Oct 3, 2024
360fe56
Python setup script - allow independent switching of hoover, hl7 adt and
jeremyestein Oct 2, 2024
ecba32f
BooleanOptionalAction doesn't exist on python 3.8, do it ourselves
jeremyestein Oct 3, 2024
9902814
synth vs reader confusion
jeremyestein Oct 3, 2024
e178b6d
Don't parse values as numeric unless they are in fact numeric
jeremyestein Oct 3, 2024
387f9d4
Log, skip and continue if HL7 messages cause parsing errors
jeremyestein Oct 4, 2024
c9f49fd
Logic for checking empty queues seems to be inverted
jeremyestein Oct 4, 2024
68b8ad1
microsecond/second mixup meant that timeout was never reached
jeremyestein Oct 4, 2024
21c4259
Handle missing segments without crashing
jeremyestein Oct 4, 2024
c5a2465
String.substring needs to be in bounds
jeremyestein Oct 4, 2024
10474e0
Translate more assorted parsing errors to Hl7ParseException for better
jeremyestein Oct 4, 2024
1501016
Fix ends of files
jeremyestein Oct 7, 2024
4ed7c1a
Merge pull request #61 from UCLH-DHCT/jeremy/hf-data-misc
jeremyestein Oct 7, 2024
4b96678
Don't use waveform generator in validation
jeremyestein Oct 7, 2024
2d5872d
Reproduce issue #25
jeremyestein Oct 7, 2024
e0d9c99
Fix issue #25 - don't delete existing repo without warning when running
jeremyestein Oct 7, 2024
fd9d987
Bring design doc more up to date. Still needs more work though.
jeremyestein Oct 7, 2024
b7819da
Address review feedback
jeremyestein Oct 8, 2024
498a495
Add tests for recently changed code
jeremyestein Oct 8, 2024
d2b0c8f
Update docs and rename property for clarity
jeremyestein Oct 9, 2024
4d87fac
Merge pull request #64 from UCLH-DHCT/jeremy/hf-data-setup
jeremyestein Oct 9, 2024
c635859
Address some review comments
jeremyestein Oct 23, 2024
fa0936b
Remove JPA options relating to insert/update batching, since waveform
jeremyestein Oct 24, 2024
e4e6460
Make cassandra settings configurable with unchanged defaults suitable…
jeremyestein Oct 24, 2024
7c4d9e3
Make timeout configurable as per review feedback
jeremyestein Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion .github/workflows/emap-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main, develop ]
branches: [ main, develop, sk/waveform-dev ]
repository_dispatch:
permissions:
contents: read
Expand All @@ -18,6 +18,7 @@ jobs:
emap-interchange: ${{ steps.filter.outputs.emap-interchange }}
core: ${{ steps.filter.outputs.core }}
hl7-reader: ${{ steps.filter.outputs.hl7-reader }}
waveform-reader: ${{ steps.filter.outputs.waveform-reader }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
Expand All @@ -44,6 +45,11 @@ jobs:
- 'emap-interchange/**'
- 'emap-star/**'
- 'hl7-reader/**'
waveform-reader:
- '.github/**'
- 'emap-checker.xml'
- 'emap-interchange/**'
- 'waveform-reader/**'
emap-star-tests:
needs: [filter]
runs-on: ubuntu-latest
Expand Down Expand Up @@ -138,3 +144,26 @@ jobs:
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'
waveform-reader-tests:
needs: [filter]
runs-on: ubuntu-latest
if: needs.filter.outputs.waveform-reader == 'true'
steps:
- uses: actions/checkout@v3
- name: Set up java
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
cache: 'maven'
- name: Build emap-interchange
working-directory: emap-interchange
run: mvn clean install
- name: Run waveform-reader tests
working-directory: waveform-reader
run: mvn clean test
- name: Publish Test Report
uses: mikepenz/action-junit-report@v2
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'
2 changes: 1 addition & 1 deletion core/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ SHELL ["/bin/bash", "-c"]
# Install zip & unzip for glowroot
RUN apt update; apt install -yy zip
# Set up the Maven proxy settings
COPY core/set_mvn_proxy.sh /app/
COPY docker/set_mvn_proxy.sh /app/
# Download and extract glowroot
WORKDIR /app/core
RUN curl -s https://api.github.com/repos/glowroot/glowroot/releases/latest \
Expand Down
7 changes: 5 additions & 2 deletions core/docker-compose.fakeuds.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
version: '3.2'
services:
fakeuds:
image: postgres:10.5-alpine
image: postgres:11-alpine
environment:
POSTGRES_DB: fakeuds
# should be from global config?
POSTGRES_USER: inform_user
POSTGRES_PASSWORD: inform
volumes:
- postgres-data-fakeuds:/var/lib/postgresql/data
- ./init/uds:/docker-entrypoint-initdb.d
restart: unless-stopped
ports:
- "${FAKEUDS_PORT}:5432"
Expand Down
19 changes: 14 additions & 5 deletions core/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.2'
services:
core:
build:
Expand All @@ -15,8 +14,10 @@ services:
driver: "json-file"
restart: on-failure
depends_on:
- glowroot-central
- rabbitmq
glowroot-central:
condition: service_started
rabbitmq:
condition: service_healthy
rabbitmq:
image: rabbitmq:management
env_file:
Expand All @@ -25,12 +26,20 @@ services:
- "${RABBITMQ_PORT}:5672"
- "${RABBITMQ_ADMIN_PORT}:15672"
restart: on-failure
healthcheck:
# rabbitmq server crashes if any rabbitmq-diagnostics cmd is run very soon
# after starting, so can't check too aggressively here
test: rabbitmq-diagnostics -q check_running
interval: 30s
timeout: 10s
retries: 3
cassandra:
image: cassandra
restart: on-failure
environment:
- MAX_HEAP_SIZE=4G
- HEAP_NEWSIZE=800M
# if set too high the process will get OOM killed. Need to make configurable.
- MAX_HEAP_SIZE=1G
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
- HEAP_NEWSIZE=250M
glowroot-central:
build:
context: ..
Expand Down
3 changes: 3 additions & 0 deletions core/init/uds/uds-init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- For initialising a local DB in the case where the UDS/IDS does not exist (eg. Emap in a box),
-- NOT a fake for unit testing
CREATE SCHEMA IF NOT EXISTS uds_schema;
1 change: 0 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@
<version>${checkstyle.plugin.version}</version>
<configuration>
<configLocation>../emap-checker.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<linkXRef>false</linkXRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.EnableScheduling;
import uk.ac.ucl.rits.inform.datasinks.emapstar.exceptions.MessageIgnoredException;
import uk.ac.ucl.rits.inform.datasinks.emapstar.repos.IdsEffectLogging;
import uk.ac.ucl.rits.inform.datasinks.emapstar.repos.IdsEffectLoggingRepository;
Expand All @@ -34,6 +35,7 @@
"uk.ac.ucl.rits.inform.datasinks",
"uk.ac.ucl.rits.inform.informdb"})
@EnableCaching
@EnableScheduling
public class App {
private static final Logger logger = LoggerFactory.getLogger(App.class);

Expand Down Expand Up @@ -71,7 +73,7 @@ public static void main(String[] args) {
* @throws IOException if rabbitmq channel has a problem
*/
@Profile("default")
@RabbitListener(queues = {"hl7Queue", "databaseExtracts", "extensionProjects"})
@RabbitListener(queues = "#{'${core.rabbitmq.listen_queues}'.split(',')}")
public void receiveMessage(EmapOperationMessage msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
IdsEffectLogging idsEffectLogging = new IdsEffectLogging();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import uk.ac.ucl.rits.inform.datasinks.emapstar.dataprocessors.FormProcessor;
import uk.ac.ucl.rits.inform.datasinks.emapstar.dataprocessors.LabProcessor;
import uk.ac.ucl.rits.inform.datasinks.emapstar.dataprocessors.PatientStateProcessor;
import uk.ac.ucl.rits.inform.datasinks.emapstar.dataprocessors.WaveformProcessor;
import uk.ac.ucl.rits.inform.interchange.AdvanceDecisionMessage;
import uk.ac.ucl.rits.inform.interchange.ConsultMetadata;
import uk.ac.ucl.rits.inform.interchange.ConsultRequest;
import uk.ac.ucl.rits.inform.interchange.EmapOperationMessageProcessingException;
import uk.ac.ucl.rits.inform.interchange.EmapOperationMessageProcessor;
import uk.ac.ucl.rits.inform.interchange.location.DepartmentMetadata;
import uk.ac.ucl.rits.inform.interchange.location.LocationMetadata;
import uk.ac.ucl.rits.inform.interchange.PatientAllergy;
import uk.ac.ucl.rits.inform.interchange.PatientInfection;
import uk.ac.ucl.rits.inform.interchange.PatientProblem;
Expand All @@ -39,8 +38,11 @@
import uk.ac.ucl.rits.inform.interchange.form.FormQuestionMetadataMsg;
import uk.ac.ucl.rits.inform.interchange.lab.LabMetadataMsg;
import uk.ac.ucl.rits.inform.interchange.lab.LabOrderMsg;
import uk.ac.ucl.rits.inform.interchange.location.DepartmentMetadata;
import uk.ac.ucl.rits.inform.interchange.location.LocationMetadata;
import uk.ac.ucl.rits.inform.interchange.visit_observations.Flowsheet;
import uk.ac.ucl.rits.inform.interchange.visit_observations.FlowsheetMetadata;
import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage;

import javax.annotation.PostConstruct;
import java.time.Instant;
Expand Down Expand Up @@ -69,6 +71,8 @@ public class InformDbOperations implements EmapOperationMessageProcessor {
private AdvanceDecisionProcessor advanceDecisionProcessor;
@Autowired
private FormProcessor formProcessor;
@Autowired
private WaveformProcessor waveformProcessor;

@Value("${features.sde:false}")
private boolean sdeFeatureEnabled;
Expand Down Expand Up @@ -103,6 +107,12 @@ public void processMessage(PatientAllergy msg) throws EmapOperationMessageProces
patientStateProcessor.processMessage(msg, storedFrom);
}

@Override
public void processMessage(WaveformMessage msg) throws EmapOperationMessageProcessingException {
Instant storedFrom = Instant.now();
waveformProcessor.processMessage(msg, storedFrom);
}

/**
* @param msg the ADT message to process
* @throws EmapOperationMessageProcessingException if message cannot be processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import uk.ac.ucl.rits.inform.informdb.visit_recordings.VisitObservationTypeAudit;
import uk.ac.ucl.rits.inform.interchange.visit_observations.Flowsheet;
import uk.ac.ucl.rits.inform.interchange.visit_observations.FlowsheetMetadata;
import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage;

import javax.annotation.Resource;
import java.time.Instant;
Expand Down Expand Up @@ -122,6 +123,22 @@ public void processFlowsheet(Flowsheet msg, HospitalVisit visit, Instant storedF
}
}

/**
* Process the parts of a waveform message that relate to its visit observation type.
* @param msg waveform msg from which to extract VisitObservationType data
* @param storedFrom stored from
* @return new or existing VisitObservationType
*/
public VisitObservationType getOrCreateFromWaveform(WaveformMessage msg, Instant storedFrom) {
// RowState<VisitObservationType, VisitObservationTypeAudit> observationType = getOrCreateObservationTypeState(
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
VisitObservationType observationType = cache.getOrCreatePersistedObservationType(
msg.getSourceStreamId(), msg.getSourceStreamId(), "waveform", msg.getObservationTime(), storedFrom);
observationType.setName(msg.getMappedStreamDescription());
observationType.setIsRealTime(true);
// XXX: what about updates if the description changes over time?
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
return observationType;
}

/**
* If mapping between internal id and interface id already exists, nothing (?) needs to change.
* @param interfaceId Identifier of observation type.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package uk.ac.ucl.rits.inform.datasinks.emapstar.controllers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import uk.ac.ucl.rits.inform.datasinks.emapstar.exceptions.MessageIgnoredException;
import uk.ac.ucl.rits.inform.datasinks.emapstar.repos.LocationVisitRepository;
import uk.ac.ucl.rits.inform.datasinks.emapstar.repos.visit_observations.WaveformRepository;
import uk.ac.ucl.rits.inform.informdb.movement.LocationVisit;
import uk.ac.ucl.rits.inform.informdb.visit_recordings.VisitObservationType;
import uk.ac.ucl.rits.inform.informdb.visit_recordings.Waveform;
import uk.ac.ucl.rits.inform.interchange.InterchangeValue;
import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage;

import java.time.Instant;
import java.util.List;
import java.util.Optional;

/**
* Controller for Waveform specific information.
* @author Jeremy Stein
*/
@Component
public class WaveformController {
private final Logger logger = LoggerFactory.getLogger(getClass());

private final WaveformRepository waveformRepository;
private final LocationVisitRepository locationVisitRepository;

WaveformController(
WaveformRepository waveformRepository,
LocationVisitRepository locationVisitRepository
) {
this.waveformRepository = waveformRepository;
this.locationVisitRepository = locationVisitRepository;
}

/**
* Process waveform data message.
* @param msg the interchange message
* @param visitObservationType to associate with this waveform data
* @param storedFrom stored from timestamp
* @throws MessageIgnoredException if message not processed
*/
@Transactional
public void processWaveform(
WaveformMessage msg,
VisitObservationType visitObservationType,
Instant storedFrom) throws MessageIgnoredException {
InterchangeValue<List<Double>> interchangeValue = msg.getNumericValues();
if (!interchangeValue.isSave()) {
throw new MessageIgnoredException("Updating/deleting waveform data is not supported");
}
// All given values are put into one new row. It's the responsibility of whoever is
// generating the message to choose an appropriate size of array.
List<Double> numericValues = interchangeValue.get();
Instant observationTime = msg.getObservationTime();
// Try to find the visit. We don't have enough information to create the visit if it doesn't already exist.
Optional<LocationVisit> inferredLocationVisit =
locationVisitRepository.findLocationVisitByLocationAndTime(observationTime, msg.getMappedLocationString());
// XXX: will have to do some sanity checks here to be sure that the HL7 feed hasn't gone down.
// See issue #36, and here for discussion:
// https://github.com/UCLH-DHCT/emap/blob/jeremy/hf-data/docs/dev/features/waveform_hf_data.md#core-processor-logic-orphan-data-problem
Waveform dataRow = new Waveform(
observationTime,
observationTime,
storedFrom);
inferredLocationVisit.ifPresent(dataRow::setLocationVisitId);
Double[] valuesAsArray = numericValues.toArray(new Double[0]);
dataRow.setSamplingRate(msg.getSamplingRate());
dataRow.setSourceLocation(msg.getSourceLocationString());
dataRow.setVisitObservationTypeId(visitObservationType);
dataRow.setUnit(msg.getUnit());
dataRow.setValuesArray(valuesAsArray);
waveformRepository.save(dataRow);
}

/**
* Delete waveform data before the cutoff date.
* @param olderThanCutoff cutoff date
* @return number of rows deleted
*/
@Transactional
public int deleteOldWaveformData(Instant olderThanCutoff) {
return waveformRepository.deleteAllInBatchByObservationDatetimeBefore(olderThanCutoff);
}

/**
* @return Return observation datetime of most recent waveform data.
*/
public Instant mostRecentObservationDatatime() {
return waveformRepository.mostRecentObservationDatatime();
}
}
Loading
Loading