Skip to content

Commit

Permalink
Merge pull request #2449 from opencb/TASK-6005
Browse files Browse the repository at this point in the history
TASK-6005 - Error loading variants : "Row length 45522 is > 32767"
  • Loading branch information
j-coll authored May 17, 2024
2 parents 37b1618 + f6b73fe commit 0e5a9e6
Show file tree
Hide file tree
Showing 40 changed files with 491 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.mockito.Mockito;
import org.opencb.biodata.models.variant.metadata.Aggregation;
import org.opencb.biodata.models.variant.metadata.VariantSetStats;
import org.opencb.commons.datastore.core.DataResult;
import org.opencb.commons.datastore.core.Event;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
Expand All @@ -43,8 +42,8 @@
import org.opencb.opencga.core.models.file.File;
import org.opencb.opencga.core.models.file.FileInternalVariantIndex;
import org.opencb.opencga.core.models.file.VariantIndexStatus;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.models.operations.variant.VariantIndexParams;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.testclassification.duration.MediumTests;
import org.opencb.opencga.core.tools.result.ExecutionResult;
import org.opencb.opencga.storage.core.StorageEngineFactory;
Expand Down Expand Up @@ -186,7 +185,7 @@ public void testDeleteIndexedFile() throws Exception {
Study study = catalogManager.getFileManager().getStudy(inputFile, sessionId);

thrown.expect(CatalogException.class);
thrown.expectMessage("The status is READY");
thrown.expectMessage("Could not unlink file '" + inputFile.getId() + "'");
catalogManager.getFileManager().unlink(study.getFqn(), inputFile.getId(), sessionId);
}

Expand All @@ -200,7 +199,7 @@ public void testDeleteSampleFromIndexedFile() throws Exception {
Query query = new Query(SampleDBAdaptor.QueryParams.ID.key(), inputFile.getSampleIds().get(100));
thrown.expect(CatalogException.class);
thrown.expectMessage("Sample associated to the files");
DataResult delete = catalogManager.getSampleManager().delete(studyFqn, query, null, sessionId);
catalogManager.getSampleManager().delete(studyFqn, query, null, sessionId);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.opencb.opencga.app.migrations.v2_12_5.storage;

import org.opencb.opencga.app.migrations.StorageMigrationTool;
import org.opencb.opencga.catalog.migration.Migration;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;

@Migration(id="add_missing_column_to_phoenix_TASK-6005", description = "Add missing ALLELES column to phoenix #TASK-6005",
version = "2.12.5", domain = Migration.MigrationDomain.STORAGE, date = 20240510
)
public class AddAllelesColumnToPhoenix extends StorageMigrationTool {

@Override
protected void run() throws Exception {
for (String project : getVariantStorageProjects()) {
VariantStorageEngine engine = getVariantStorageEngineByProject(project);
if (engine.getStorageEngineId().equals("hadoop")) {
logger.info("Adding missing columns (if any) for project " + project);
// Using same class for both migrations
Class<?> aClass = Class.forName("org.opencb.opencga.storage.hadoop.variant.migration.v2_3_0.AddMissingColumns");
Runnable runnable = (Runnable) aClass
.getConstructor(Object.class)
.newInstance(engine);
runnable.run();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1832,9 +1832,9 @@ public OpenCGAResult<File> unlink(@Nullable String studyId, String fileId, Strin
study.getUuid(), auditParams, new AuditRecord.Status(AuditRecord.Status.Result.SUCCESS));

return result;
} catch (CatalogException e) {
} catch (Exception e) {
auditManager.audit(userId, Enums.Action.UNLINK, Enums.Resource.FILE, fileId, "", study.getId(), study.getUuid(),
auditParams, new AuditRecord.Status(AuditRecord.Status.Result.ERROR, e.getError()));
auditParams, new AuditRecord.Status(AuditRecord.Status.Result.ERROR, new Error(0, "", e.getMessage())));
throw new CatalogException("Could not unlink file '" + fileId + "'", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opencb.opencga.storage.core.StoragePipelineResult;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQuery;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
import org.opencb.opencga.storage.core.variant.adaptors.iterators.VariantDBIterator;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
Expand All @@ -39,11 +40,14 @@
public abstract class VariantStorageEngineSVTest extends VariantStorageBaseTest {

protected static StudyMetadata studyMetadata;
protected static StudyMetadata studyMetadata2;
protected static boolean loaded = false;
protected static StoragePipelineResult pipelineResult1;
protected static StoragePipelineResult pipelineResult2;
protected static StoragePipelineResult pipelineResult3;
protected static URI input1;
protected static URI input2;
protected static URI input3;

@Before
public void before() throws Exception {
Expand All @@ -66,12 +70,22 @@ protected void loadFiles() throws Exception {
variantStorageEngine.getOptions().append(VariantStorageOptions.ANNOTATOR_CELLBASE_EXCLUDE.key(), "expression,clinical");
pipelineResult1 = runDefaultETL(input1, variantStorageEngine, studyMetadata, new QueryOptions()
.append(VariantStorageOptions.ANNOTATE.key(), true)
.append(VariantStorageOptions.STATS_CALCULATE.key(), true)
.append(VariantStorageOptions.ASSEMBLY.key(), "grch38")
);
input2 = getResourceUri("variant-test-sv_2.vcf");
pipelineResult2 = runDefaultETL(input2, variantStorageEngine, studyMetadata, new QueryOptions()
.append(VariantStorageOptions.ANNOTATE.key(), true)
.append(VariantStorageOptions.STATS_CALCULATE.key(), true)
.append(VariantStorageOptions.ASSEMBLY.key(), "grch38"));

input3 = getResourceUri("variant-test-sv-large.vcf");
studyMetadata2 = new StudyMetadata(2, "s2");
pipelineResult3 = runDefaultETL(input3, variantStorageEngine, studyMetadata2, new QueryOptions()
.append(VariantStorageOptions.ANNOTATE.key(), true)
.append(VariantStorageOptions.STATS_CALCULATE.key(), true)
.append(VariantStorageOptions.ASSEMBLY.key(), "grch38"));

}

@Test
Expand All @@ -88,7 +102,7 @@ public void checkCount() throws Exception {
+ 1
+ 1 // negative cipos
;
int count = variantStorageEngine.getDBAdaptor().count().first().intValue();
int count = variantStorageEngine.count(new VariantQuery().study(studyMetadata.getName())).first().intValue();
assertEquals(expected, count);
}

Expand Down Expand Up @@ -146,7 +160,10 @@ protected void checkCorrectness(URI file) throws StorageEngineException, NonStan

@Test
public void exportVcf() throws Exception {
variantStorageEngine.exportData(null, VariantWriterFactory.VariantOutputFormat.VCF, null, new Query(VariantQueryParam.UNKNOWN_GENOTYPE.key(), "./."), new QueryOptions(QueryOptions.SORT, true));
variantStorageEngine.exportData(null, VariantWriterFactory.VariantOutputFormat.VCF, null,
new VariantQuery().unknownGenotype("./.").study(studyMetadata.getName()), new QueryOptions(QueryOptions.SORT, true));
variantStorageEngine.exportData(null, VariantWriterFactory.VariantOutputFormat.VCF, null,
new VariantQuery().unknownGenotype("./.").study(studyMetadata2.getName()), new QueryOptions(QueryOptions.SORT, true));
}

protected Map<String, Variant> readVariants(URI input) throws StorageEngineException, NonStandardCompliantSampleField {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,12 @@ public Scan parseQuery(VariantQueryProjection selectElements, Query query, Query
//// filters.addFilter(keyOnlyFilter);
// scan.addColumn(genomeHelper.getColumnFamily(), VariantPhoenixHelper.VariantColumn.TYPE.bytes());
// }
if (selectElements.getFields().contains(VariantField.TYPE) || !scan.hasFamilies()) {
scan.addColumn(family, VariantColumn.TYPE.bytes());
}

// Alleles must always be included.
scan.addColumn(family, VariantColumn.ALLELES.bytes());
// Because alleles column may be empty, we must still ensure that we get, at least, one result per row.
// Include "type" column, which is never empty.
scan.addColumn(family, VariantColumn.TYPE.bytes());

// if (!columnPrefixes.isEmpty()) {
// MultipleColumnPrefixFilter columnPrefixFilter = new MultipleColumnPrefixFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import htsjdk.variant.variantcontext.Allele;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.query.QueryConstants;
Expand All @@ -28,6 +30,7 @@
import org.opencb.biodata.models.variant.Variant;
import org.opencb.biodata.models.variant.VariantBuilder;
import org.opencb.biodata.models.variant.avro.*;
import org.opencb.opencga.storage.hadoop.variant.GenomeHelper;

import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -43,12 +46,15 @@
*/
public class VariantPhoenixKeyFactory {

public static final Integer UINT_SIZE = PUnsignedInt.INSTANCE.getByteSize();
protected static final String SV_ALTERNATE_SEPARATOR = "|";
protected static final String SV_ALTERNATE_SEPARATOR_SPLIT = "\\" + SV_ALTERNATE_SEPARATOR;

public static final Comparator<String> HBASE_KEY_CHROMOSOME_COMPARATOR = (c1, c2) -> Bytes.compareTo(
VariantPhoenixKeyFactory.generateSimpleVariantRowKey(c1, 1, "N", "N"),
VariantPhoenixKeyFactory.generateSimpleVariantRowKey(c2, 1, "N", "N"));
public static final String HASH_PREFIX = "#";
public static final byte[] HASH_PREFIX_BYTES = Bytes.toBytes(HASH_PREFIX);

public static byte[] generateVariantRowKey(String chrom, int position) {
return generateSimpleVariantRowKey(chrom, position, "", "");
Expand All @@ -59,6 +65,26 @@ public static byte[] generateVariantRowKey(Variant var) {
var.getSv());
}

public static byte[] generateVariantRowKey(ResultSet resultSet) {
String chromosome = null;
Integer start = null;
String reference = null;
String alternate = null;
try {
chromosome = resultSet.getString(VariantPhoenixSchema.VariantColumn.CHROMOSOME.column());
start = resultSet.getInt(VariantPhoenixSchema.VariantColumn.POSITION.column());
reference = resultSet.getString(VariantPhoenixSchema.VariantColumn.REFERENCE.column());
alternate = resultSet.getString(VariantPhoenixSchema.VariantColumn.ALTERNATE.column());

return generateVariantRowKey(chromosome, start, null, reference, alternate, null);
} catch (RuntimeException | SQLException e) {
throw new IllegalStateException("Fail to generate row key from Phoenix result set: " + chromosome
+ ':' + start
+ ':' + (reference == null ? "-" : reference)
+ ':' + (alternate == null ? "-" : alternate), e);
}
}

public static byte[] generateVariantRowKey(VariantAnnotation variantAnnotation) {
byte[] bytesRowKey = null;
if (variantAnnotation.getAdditionalAttributes() != null) {
Expand Down Expand Up @@ -97,6 +123,12 @@ public static byte[] generateSimpleVariantRowKey(String chrom, int position, Str
return generateVariantRowKey(chrom, position, null, ref, alt, null);
}

public static boolean mightHashAlleles(Variant variant) {
int size = getSize(variant);
return size > HConstants.MAX_ROW_LENGTH;
}


/**
* Generates a Row key based on Chromosome, start, end (optional), ref and alt. <br>
* <ul>
Expand All @@ -114,16 +146,16 @@ public static byte[] generateSimpleVariantRowKey(String chrom, int position, Str
*/
public static byte[] generateVariantRowKey(String chrom, int start, Integer end, String ref, String alt, StructuralVariation sv) {
chrom = Region.normalizeChromosome(chrom);
int size = PVarchar.INSTANCE.estimateByteSizeFromLength(chrom.length())
+ QueryConstants.SEPARATOR_BYTE_ARRAY.length
+ PUnsignedInt.INSTANCE.getByteSize()
+ PVarchar.INSTANCE.estimateByteSizeFromLength(ref.length());
alt = buildSymbolicAlternate(ref, alt, end, sv);
if (!alt.isEmpty()) {
size += QueryConstants.SEPARATOR_BYTE_ARRAY.length
+ PVarchar.INSTANCE.estimateByteSizeFromLength(alt.length());
int size = getSize(chrom, ref, alt);

if (size > HConstants.MAX_ROW_LENGTH) {
// This is a problem. The row key is too long.
// Use hashCode for reference/alternate/SV fields
ref = hashAllele(ref);
alt = hashAllele(alt);
size = getSize(chrom, ref, alt);
}

byte[] rk = new byte[size];

int offset = 0;
Expand All @@ -143,6 +175,31 @@ public static byte[] generateVariantRowKey(String chrom, int start, Integer end,
return rk;
}

private static int getSize(Variant variant) {
String symbolicAlternate = buildSymbolicAlternate(variant);
return getSize(variant.getChromosome(), variant.getReference(), symbolicAlternate);
}

private static int getSize(String chrom, String ref, String alt) {
int size = PVarchar.INSTANCE.estimateByteSizeFromLength(chrom.length())
+ QueryConstants.SEPARATOR_BYTE_ARRAY.length
+ PUnsignedInt.INSTANCE.getByteSize()
+ PVarchar.INSTANCE.estimateByteSizeFromLength(ref.length());
if (!alt.isEmpty()) {
size += QueryConstants.SEPARATOR_BYTE_ARRAY.length
+ PVarchar.INSTANCE.estimateByteSizeFromLength(alt.length());
}
return size;
}

public static String hashAllele(String ref) {
return HASH_PREFIX + Integer.toString(ref.hashCode());
}

public static String buildAlleles(Variant v) {
return v.getReference() + SV_ALTERNATE_SEPARATOR + buildSymbolicAlternate(v);
}

public static String buildSymbolicAlternate(Variant v) {
return buildSymbolicAlternate(v.getReference(), v.getAlternate(), v.getEnd(), v.getSv());
}
Expand Down Expand Up @@ -215,6 +272,26 @@ public static String extractChrFromVariantRowKey(byte[] variantRowKey, int offse
return (String) PVarchar.INSTANCE.toObject(variantRowKey, offset, chrPosSeparator, PVarchar.INSTANCE);
}

public static Variant extractVariantFromResult(Result result) {
byte[] variantRowKey = result.getRow();

int chrPosSeparator = ArrayUtils.indexOf(variantRowKey, (byte) 0);
int referenceOffset = chrPosSeparator + 1 + UINT_SIZE;
if (variantRowKey.length > (referenceOffset + HASH_PREFIX_BYTES.length)
&& Bytes.equals(variantRowKey, referenceOffset, HASH_PREFIX_BYTES.length,
HASH_PREFIX_BYTES, 0, HASH_PREFIX_BYTES.length)) {
// The reference and alternate are hashed.
// The type and alleles are stored in the result
byte[] type = result.getValue(GenomeHelper.COLUMN_FAMILY_BYTES,
VariantPhoenixSchema.VariantColumn.TYPE.bytes());
byte[] alleles = result.getValue(GenomeHelper.COLUMN_FAMILY_BYTES,
VariantPhoenixSchema.VariantColumn.ALLELES.bytes());
return extractVariantFromVariantRowKey(variantRowKey, type, alleles);
} else {
return extractVariantFromVariantRowKey(variantRowKey, null, null);
}
}

public static Variant extractVariantFromResultSet(ResultSet resultSet) {
String chromosome = null;
Integer start = null;
Expand All @@ -226,9 +303,10 @@ public static Variant extractVariantFromResultSet(ResultSet resultSet) {
reference = resultSet.getString(VariantPhoenixSchema.VariantColumn.REFERENCE.column());
alternate = resultSet.getString(VariantPhoenixSchema.VariantColumn.ALTERNATE.column());

String alleles = resultSet.getString(VariantPhoenixSchema.VariantColumn.ALLELES.column());
String type = resultSet.getString(VariantPhoenixSchema.VariantColumn.TYPE.column());

return buildVariant(chromosome, start, reference, alternate, type);
return buildVariant(chromosome, start, reference, alternate, type, alleles);
} catch (RuntimeException | SQLException e) {
throw new IllegalStateException("Fail to parse variant: " + chromosome
+ ':' + start
Expand All @@ -237,13 +315,12 @@ public static Variant extractVariantFromResultSet(ResultSet resultSet) {
}
}

public static Variant extractVariantFromVariantRowKey(byte[] variantRowKey) {
public static Variant extractVariantFromVariantRowKey(byte[] variantRowKey, byte[] type, byte[] alleles) {
int chrPosSeparator = ArrayUtils.indexOf(variantRowKey, (byte) 0);
String chromosome = (String) PVarchar.INSTANCE.toObject(variantRowKey, 0, chrPosSeparator, PVarchar.INSTANCE);

Integer intSize = PUnsignedInt.INSTANCE.getByteSize();
int position = (Integer) PUnsignedInt.INSTANCE.toObject(variantRowKey, chrPosSeparator + 1, intSize, PUnsignedInt.INSTANCE);
int referenceOffset = chrPosSeparator + 1 + intSize;
int position = (Integer) PUnsignedInt.INSTANCE.toObject(variantRowKey, chrPosSeparator + 1, UINT_SIZE, PUnsignedInt.INSTANCE);
int referenceOffset = chrPosSeparator + 1 + UINT_SIZE;
int refAltSeparator = ArrayUtils.indexOf(variantRowKey, (byte) 0, referenceOffset);
String reference;
String alternate;
Expand All @@ -257,8 +334,16 @@ public static Variant extractVariantFromVariantRowKey(byte[] variantRowKey) {
alternate = (String) PVarchar.INSTANCE.toObject(variantRowKey, refAltSeparator + 1,
variantRowKey.length - (refAltSeparator + 1), PVarchar.INSTANCE);
}
String typeStr = null;
String alleleStr = null;
if (type != null) {
typeStr = (String) PVarchar.INSTANCE.toObject(type);
}
if (alleles != null) {
alleleStr = (String) PVarchar.INSTANCE.toObject(alleles);
}
try {
return buildVariant(chromosome, position, reference, alternate, null);
return buildVariant(chromosome, position, reference, alternate, typeStr, alleleStr);
} catch (RuntimeException e) {
throw new IllegalStateException("Fail to parse variant: " + chromosome
+ ':' + position
Expand All @@ -268,7 +353,17 @@ public static Variant extractVariantFromVariantRowKey(byte[] variantRowKey) {
}
}

public static Variant buildVariant(String chromosome, int start, String reference, String alternate, String type) {
public static Variant buildVariant(String chromosome, int start, String reference, String alternate, String type, String alleles) {
if ((reference != null && reference.startsWith(HASH_PREFIX)) || (alternate != null && alternate.startsWith(HASH_PREFIX))) {
if (StringUtils.isNotEmpty(alleles)) {
int i1 = alleles.indexOf(SV_ALTERNATE_SEPARATOR);
reference = alleles.substring(0, i1);
alternate = alleles.substring(i1 + SV_ALTERNATE_SEPARATOR.length());
} else {
throw new IllegalStateException("Reference and alternate are hashed, but alleles is empty!"
+ " '" + chromosome + "' '" + start + "' '" + reference + "' '" + alternate + "'");
}
}

if (alternate != null && alternate.length() > 5 && alternate.contains(SV_ALTERNATE_SEPARATOR)) {
Integer end = null;
Expand Down
Loading

0 comments on commit 0e5a9e6

Please sign in to comment.