Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cohort demographics results are available for visualization if explicitly requested while executing #2402

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions src/main/java/org/ohdsi/webapi/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ interface Params {
String EXECUTABLE_FILE_NAME = "executableFilename";
String GENERATION_ID = "generation_id";
String DESIGN_HASH = "design_hash";
String DEMOGRAPHIC_STATS = "demographic_stats";
}

interface Variables {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ohdsi.webapi.cohortcharacterization;

import org.ohdsi.webapi.cohortdefinition.CohortDefinition;
import org.ohdsi.webapi.cohortdefinition.CohortDefinitionDetails;
import org.ohdsi.webapi.cohortdefinition.CohortGenerationRequestBuilder;
import org.ohdsi.webapi.cohortdefinition.CohortGenerationUtils;
import org.ohdsi.webapi.generationcache.GenerationCacheHelper;
Expand Down Expand Up @@ -32,6 +33,7 @@

import static org.ohdsi.webapi.Constants.Params.SOURCE_ID;
import static org.ohdsi.webapi.Constants.Params.TARGET_TABLE;
import static org.ohdsi.webapi.Constants.Params.DEMOGRAPHIC_STATS;

public class GenerateLocalCohortTasklet implements StoppableTasklet {

Expand Down Expand Up @@ -89,14 +91,14 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon
if (useAsyncCohortGeneration) {
List<CompletableFuture> executions = cohortDefinitions.stream()
.map(cd ->
CompletableFuture.supplyAsync(() -> generateCohort(cd, source, resultSchema, targetTable),
CompletableFuture.supplyAsync(() -> generateCohort(cd, source, resultSchema, targetTable),
Executors.newSingleThreadExecutor()
)
).collect(Collectors.toList());
CompletableFuture.allOf(executions.toArray(new CompletableFuture[]{})).join();
} else {
CompletableFuture.runAsync(() ->
cohortDefinitions.stream().forEach(cd -> generateCohort(cd, source, resultSchema, targetTable)),
cohortDefinitions.stream().forEach(cd -> generateCohort(cd, source, resultSchema, targetTable)),
Executors.newSingleThreadExecutor()
).join();
}
Expand All @@ -113,8 +115,8 @@ private Object generateCohort(CohortDefinition cd, Source source, String resultS
sessionId,
resultSchema
);

int designHash = this.generationCacheHelper.computeHash(cd.getDetails().getExpression());
CohortDefinitionDetails details = cd.getDetails();
int designHash = this.generationCacheHelper.computeHash(details.getExpression());
CohortGenerationUtils.insertInclusionRules(cd, source, designHash, resultSchema, sessionId, cancelableJdbcTemplate);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.odysseusinc.arachne.commons.utils.ConverterUtils;
import org.apache.commons.lang3.StringUtils;
import org.ohdsi.analysis.CohortMetadata;
import org.ohdsi.analysis.Utils;
import org.ohdsi.analysis.cohortcharacterization.design.CcResultType;
import org.ohdsi.webapi.cohortcharacterization.domain.CcStrataConceptSetEntity;
Expand All @@ -18,7 +17,6 @@
import org.ohdsi.webapi.tag.domain.Tag;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ public CohortGenerationInfo(CohortDefinition definition, Integer sourceId)
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "created_by_id")
private UserEntity createdBy;

@Column(name = "cc_generate_id")
private Long ccGenerateId;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This identifier is necessary to correlate a Cohort execution with associated demographics Cohort Characterization


// If true, then demographic has been selected.
@Column(name = "is_choose_demographic")
private boolean isChooseDemographic;

public boolean isChooseDemographic() {
return isChooseDemographic;
}

public void setIsChooseDemographic(boolean isChooseDemographic) {
this.isChooseDemographic = isChooseDemographic;
}

public CohortGenerationInfoId getId() {
return id;
Expand Down Expand Up @@ -187,4 +202,13 @@ public void setCreatedBy(UserEntity createdBy) {
public UserEntity getCreatedBy() {
return createdBy;
}

public Long getCcGenerateId() {
return ccGenerateId;
}

public void setCcGenerateId(Long ccGenerateId) {
this.ccGenerateId = ccGenerateId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public class CohortGenerationRequest {
private String targetSchema;
private Integer targetId;

public CohortGenerationRequest(CohortExpression expression, Source source, String sessionId, Integer targetId, String targetSchema) {
public CohortGenerationRequest(CohortExpression expression, Source source, String sessionId, Integer targetId,
String targetSchema) {

this.expression = expression;
this.source = source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static String[] buildGenerationSql(CohortGenerationRequest request) {
"results_database_schema.cohort_inclusion_stats",
"results_database_schema.cohort_summary_stats",
"results_database_schema.cohort_censor_stats",
"results_database_schema.cohort_inclusion"
"results_database_schema.cohort_inclusion"
},
new String[] {
COHORT_CACHE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,38 @@
package org.ohdsi.webapi.cohortdefinition;

import org.ohdsi.circe.helper.ResourceHelper;
import org.ohdsi.cohortcharacterization.CCQueryBuilder;
import org.ohdsi.sql.BigQuerySparkTranslate;
import org.ohdsi.sql.SqlRender;
import org.ohdsi.sql.SqlSplit;
import org.ohdsi.sql.SqlTranslate;
import org.ohdsi.webapi.cohortcharacterization.domain.CohortCharacterizationEntity;
import org.ohdsi.webapi.common.generation.CancelableTasklet;
import org.ohdsi.webapi.common.generation.GenerationUtils;
import org.ohdsi.webapi.feanalysis.domain.FeAnalysisEntity;
import org.ohdsi.webapi.feanalysis.repository.FeAnalysisEntityRepository;
import org.ohdsi.webapi.generationcache.GenerationCacheHelper;
import org.ohdsi.webapi.shiro.Entities.UserRepository;
import org.ohdsi.webapi.source.Source;
import org.ohdsi.webapi.source.SourceService;
import org.ohdsi.webapi.util.CancelableJdbcTemplate;
import org.ohdsi.webapi.util.SessionUtils;
import org.ohdsi.webapi.util.SourceUtils;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionTemplate;

import com.google.common.collect.ImmutableList;
import com.odysseusinc.arachne.commons.types.DBMSType;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.ohdsi.webapi.Constants.Params.*;

Expand All @@ -44,54 +61,148 @@ public class GenerateCohortTasklet extends CancelableTasklet implements Stoppabl
private final GenerationCacheHelper generationCacheHelper;
private final CohortDefinitionRepository cohortDefinitionRepository;
private final SourceService sourceService;
private final FeAnalysisEntityRepository feAnalysisRepository;

public GenerateCohortTasklet(final CancelableJdbcTemplate jdbcTemplate, final TransactionTemplate transactionTemplate,
final GenerationCacheHelper generationCacheHelper,
final CohortDefinitionRepository cohortDefinitionRepository, final SourceService sourceService) {
super(LoggerFactory.getLogger(GenerateCohortTasklet.class), jdbcTemplate, transactionTemplate);
this.generationCacheHelper = generationCacheHelper;
this.cohortDefinitionRepository = cohortDefinitionRepository;
this.sourceService = sourceService;
this.feAnalysisRepository = null;
}

public GenerateCohortTasklet(
final CancelableJdbcTemplate jdbcTemplate,
final TransactionTemplate transactionTemplate,
final GenerationCacheHelper generationCacheHelper,
final CohortDefinitionRepository cohortDefinitionRepository,
final SourceService sourceService
final SourceService sourceService, final FeAnalysisEntityRepository feAnalysisRepository
) {
super(LoggerFactory.getLogger(GenerateCohortTasklet.class), jdbcTemplate, transactionTemplate);
this.generationCacheHelper = generationCacheHelper;
this.cohortDefinitionRepository = cohortDefinitionRepository;
this.sourceService = sourceService;
this.feAnalysisRepository = feAnalysisRepository;
}

@Override
protected String[] prepareQueries(ChunkContext chunkContext, CancelableJdbcTemplate jdbcTemplate) {
Map<String, Object> jobParams = chunkContext.getStepContext().getJobParameters();

Boolean demographicStat = jobParams.get(DEMOGRAPHIC_STATS) == null ? null
: Boolean.valueOf((String) jobParams.get(DEMOGRAPHIC_STATS));

if (demographicStat != null && demographicStat.booleanValue()) {
return prepareQueriesDemographic(chunkContext, jdbcTemplate);
}

return prepareQueriesDefault(jobParams, jdbcTemplate);
}

private String[] prepareQueriesDemographic(ChunkContext chunkContext, CancelableJdbcTemplate jdbcTemplate) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic has been taken from Cohort Characterization

Map<String, Object> jobParams = chunkContext.getStepContext().getJobParameters();
CohortCharacterizationEntity cohortCharacterization = new CohortCharacterizationEntity();

Integer cohortDefinitionId = Integer.valueOf(jobParams.get(COHORT_DEFINITION_ID).toString());
CohortDefinition cohortDefinition = cohortDefinitionRepository.findOneWithDetail(cohortDefinitionId);

cohortCharacterization.setCohortDefinitions(new HashSet<>(Arrays.asList(cohortDefinition)));

// Get FE Analysis Demographic (Gender, Age, Race,)
Set<FeAnalysisEntity> feAnalysis = feAnalysisRepository.findByListIds(Arrays.asList(70, 72, 74, 77));

// Set<CcFeAnalysisEntity> ccFeAnalysis = feAnalysis.stream().map(a -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It relates to https://github.com/OHDSI/WebAPI/pull/2388/files when it is merged to 'master'

// CcFeAnalysisEntity ccA = new CcFeAnalysisEntity();
// ccA.setCohortCharacterization(cohortCharacterization);
// ccA.setFeatureAnalysis(a);
// return ccA;
// }).collect(Collectors.toSet());

cohortCharacterization.setFeatureAnalyses(feAnalysis);

final Long jobId = chunkContext.getStepContext().getStepExecution().getJobExecution().getId();

final Integer sourceId = Integer.valueOf(jobParams.get(SOURCE_ID).toString());
final Source source = sourceService.findBySourceId(sourceId);

final String cohortTable = jobParams.get(TARGET_TABLE).toString();
final String sessionId = jobParams.get(SESSION_ID).toString();

final String tempSchema = SourceUtils.getTempQualifier(source);

boolean includeAnnual = false;
boolean includeTemporal = false;

CCQueryBuilder ccQueryBuilder = new CCQueryBuilder(cohortCharacterization, cohortTable, sessionId,
SourceUtils.getCdmQualifier(source), SourceUtils.getResultsQualifier(source),
SourceUtils.getVocabularyQualifier(source), tempSchema, jobId);
String sql = ccQueryBuilder.build();

/*
* There is an issue with temp tables on sql server: Temp tables scope is
* session or stored procedure. To execute PreparedStatement sql server
* uses stored procedure <i>sp_executesql</i> and this is the reason why
* multiple PreparedStatements cannot share the same local temporary
* table.
*
* On the other side, temp tables cannot be re-used in the same
* PreparedStatement, e.g. temp table cannot be created, used, dropped and
* created again in the same PreparedStatement because sql optimizator
* detects object already exists and fails. When is required to re-use
* temp table it should be separated to several PreparedStatements.
*
* An option to use global temp tables also doesn't work since such tables
* can be not supported / disabled.
*
* Therefore, there are two ways: - either precisely group SQLs into
* statements so that temp tables aren't re-used in a single statement, -
* or use ‘permanent temporary tables’
*
* The second option looks better since such SQL could be exported and
* executed manually, which is not the case with the first option.
*/
if (ImmutableList.of(DBMSType.MS_SQL_SERVER.getOhdsiDB(), DBMSType.PDW.getOhdsiDB())
.contains(source.getSourceDialect())) {
sql = sql.replaceAll("#", tempSchema + "." + sessionId + "_").replaceAll("tempdb\\.\\.", "");
}
if (source.getSourceDialect().equals("spark")) {
try {
sql = BigQuerySparkTranslate.sparkHandleInsert(sql, source.getSourceConnection());
} catch (SQLException e) {
e.printStackTrace();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A log entry should be created instead both in the source code from where it was copied and here

}
}

final String translatedSql = SqlTranslate.translateSql(sql, source.getSourceDialect(), sessionId, tempSchema);
return SqlSplit.splitSql(translatedSql);
}

private String[] prepareQueriesDefault(Map<String, Object> jobParams, CancelableJdbcTemplate jdbcTemplate) {
Integer cohortDefinitionId = Integer.valueOf(jobParams.get(COHORT_DEFINITION_ID).toString());
Integer sourceId = Integer.parseInt(jobParams.get(SOURCE_ID).toString());
String targetSchema = jobParams.get(TARGET_DATABASE_SCHEMA).toString();
String sessionId = jobParams.getOrDefault(SESSION_ID, SessionUtils.sessionId()).toString();

CohortDefinition cohortDefinition = cohortDefinitionRepository.findOneWithDetail(cohortDefinitionId);
Source source = sourceService.findBySourceId(sourceId);

CohortGenerationRequestBuilder generationRequestBuilder = new CohortGenerationRequestBuilder(sessionId,
targetSchema);

int designHash = this.generationCacheHelper.computeHash(cohortDefinition.getDetails().getExpression());
CohortGenerationUtils.insertInclusionRules(cohortDefinition, source, designHash, targetSchema, sessionId,
jdbcTemplate);

GenerationCacheHelper.CacheResult res = generationCacheHelper.computeCacheIfAbsent(cohortDefinition, source,
generationRequestBuilder,
(resId, sqls) -> generationCacheHelper.runCancelableCohortGeneration(jdbcTemplate, stmtCancel, sqls));

Map<String, Object> jobParams = chunkContext.getStepContext().getJobParameters();

Integer cohortDefinitionId = Integer.valueOf(jobParams.get(COHORT_DEFINITION_ID).toString());
Integer sourceId = Integer.parseInt(jobParams.get(SOURCE_ID).toString());
String targetSchema = jobParams.get(TARGET_DATABASE_SCHEMA).toString();
String sessionId = jobParams.getOrDefault(SESSION_ID, SessionUtils.sessionId()).toString();

CohortDefinition cohortDefinition = cohortDefinitionRepository.findOneWithDetail(cohortDefinitionId);
Source source = sourceService.findBySourceId(sourceId);

CohortGenerationRequestBuilder generationRequestBuilder = new CohortGenerationRequestBuilder(
sessionId,
targetSchema
);

int designHash = this.generationCacheHelper.computeHash(cohortDefinition.getDetails().getExpression());
CohortGenerationUtils.insertInclusionRules(cohortDefinition, source, designHash, targetSchema, sessionId, jdbcTemplate);

GenerationCacheHelper.CacheResult res = generationCacheHelper.computeCacheIfAbsent(
cohortDefinition,
source,
generationRequestBuilder,
(resId, sqls) -> generationCacheHelper.runCancelableCohortGeneration(jdbcTemplate, stmtCancel, sqls)
);

String sql = SqlRender.renderSql(
copyGenerationIntoCohortTableSql,
new String[]{ RESULTS_DATABASE_SCHEMA, COHORT_DEFINITION_ID, DESIGN_HASH },
new String[]{ targetSchema, cohortDefinition.getId().toString(), res.getIdentifier().toString() }
);
sql = SqlTranslate.translateSql(sql, source.getSourceDialect());
return SqlSplit.splitSql(sql);
String sql = SqlRender.renderSql(copyGenerationIntoCohortTableSql,
new String[] { RESULTS_DATABASE_SCHEMA, COHORT_DEFINITION_ID, DESIGN_HASH },
new String[] { targetSchema, cohortDefinition.getId().toString(), res.getIdentifier().toString() });
sql = SqlTranslate.translateSql(sql, source.getSourceDialect());
return SqlSplit.splitSql(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void afterJob(JobExecution je) {
CohortGenerationInfo info = findBySourceId(df, sourceId);
setExecutionDurationIfPossible(je, info);
info.setStatus(GenerationStatus.COMPLETE);
info.setCcGenerateId(je.getId());

if (je.getStatus() == BatchStatus.FAILED || je.getStatus() == BatchStatus.STOPPED) {
info.setIsValid(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.List;

import org.ohdsi.webapi.cohortcharacterization.report.Report;

/**
*
* @author Chris Knoll <cknoll@ohdsi.org>
Expand All @@ -42,5 +44,10 @@ public static class InclusionRuleStatistic
public Summary summary;
public List<InclusionRuleStatistic> inclusionRuleStats;
public String treemapData;
public List<Report> demographicsStats;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be found a way not to extend the existing InclusionRuleReport class as they don't relate to each other


public Float prevalenceThreshold = 0.01f;
public Boolean showEmptyResults = false;
public int count = 0;

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public CohortGenerationInfoDTO convert(CohortGenerationInfo info) {
dto.setStartTime(info.getStartTime());
dto.setStatus(info.getStatus());
dto.setIsValid(info.isIsValid());
dto.setCcGenerateId(info.getCcGenerateId());
dto.setIsChooseDemographic(info.isChooseDemographic());

return dto;
}
Expand Down
Loading
Loading