Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter to /vcf endpoint to force async writes to complete before returning response #75

Merged
merged 8 commits into from
Jan 23, 2024
6 changes: 6 additions & 0 deletions src/anyvar/restapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,16 @@ async def annotate_vcf(
request: Request,
vcf: UploadFile = File(..., description="VCF to register and annotate"),
for_ref: bool = Query(default=True, description="Whether to compute VRS IDs for REF alleles"),
allow_async_write: bool = Query(
default=False, description="Whether to allow asynchronous write of VRS objects to database"
),
):
"""Register alleles from a VCF and return a file annotated with VRS IDs.

:param request: FastAPI request object
:param vcf: incoming VCF file object
:param for_ref: whether to compute VRS IDs for REF alleles
:param allow_async_write: whether to allow async database writes
:return: streamed annotated file
"""
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
Expand All @@ -234,6 +238,8 @@ async def annotate_vcf(
except ValueError as e:
_logger.error(f"Encountered error during VCF registration: {e}")
return {"error": "Encountered ValueError when registering VCF"}
if not allow_async_write:
av.object_store.wait_for_writes()
return FileResponse(temp_out_file.name)


Expand Down
4 changes: 4 additions & 0 deletions src/anyvar/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def get_variation_count(self, variation_type: VariationStatisticType) -> int:
def wipe_db(self):
"""Empty database of all stored records."""

@abstractmethod
def wait_for_writes(self):
"""Returns once any currently pending database modifications have been completed."""

@abstractmethod
def close(self):
"""Closes the storage integration and cleans up any resources"""
Expand Down
5 changes: 5 additions & 0 deletions src/anyvar/storage/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ def __delitem__(self, name: str) -> None:
cur.execute("DELETE FROM vrs_objects WHERE vrs_id = %s;", [name])
self.conn.commit()

def wait_for_writes(self):
"""Returns once any currently pending database modifications have been completed.
The PostgresObjectStore does not implement async writes, therefore this method is a no-op
and present only to maintain compatibility with the `_Storage` base class"""

def close(self):
"""Terminate connection if necessary."""
if self.conn is not None:
Expand Down
21 changes: 20 additions & 1 deletion src/anyvar/storage/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,25 @@ def __delitem__(self, name: str) -> None:
cur.execute(f"DELETE FROM {self.table_name} WHERE vrs_id = ?;", [name]) # nosec B608
self.conn.commit()

def wait_for_writes(self):
"""Returns once any currently pending database modifications have been completed."""
if self.batch_thread is not None:
# short circuit if the queue is empty
with self.batch_thread.cond:
if not self.batch_thread.pending_batch_list:
return

# queue an empty batch
batch = []
self.batch_thread.queue_batch(batch)
# wait for the batch to be removed from the pending queue
while True:
with self.batch_thread.cond:
if list(filter(lambda x: x is batch, self.batch_thread.pending_batch_list)):
self.batch_thread.cond.wait()
else:
break

def close(self):
"""Stop the batch thread and wait for it to complete"""
if self.batch_thread is not None:
Expand Down Expand Up @@ -420,7 +439,7 @@ def queue_batch(self, batch_insert_values: List[Tuple]):
:param batch_insert_values: list of tuples where each tuple consists of (vrs_id, vrs_object)
"""
with self.cond:
if batch_insert_values:
if batch_insert_values is not None:
_logger.info("Queueing batch of %s items", len(batch_insert_values))
while len(self.pending_batch_list) >= self.max_pending_batches:
_logger.debug("Pending batch queue is full, waiting for space...")
Expand Down
18 changes: 16 additions & 2 deletions tests/storage/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,22 @@ def test_batch_mgmt_and_async_write_single_thread(mocker):

sf = SnowflakeObjectStore("snowflake://account/?param=value", 2, "vrs_objects2", 4)
with sf.batch_manager(sf):
for vo in vrs_id_object_pairs:
sf[vo[0]] = vo[1]
sf.wait_for_writes()
assert sf.num_pending_batches() == 0
sf[vrs_id_object_pairs[0][0]] = vrs_id_object_pairs[0][1]
sf[vrs_id_object_pairs[1][0]] = vrs_id_object_pairs[1][1]
assert sf.num_pending_batches() > 0
sf.wait_for_writes()
assert sf.num_pending_batches() == 0
sf[vrs_id_object_pairs[2][0]] = vrs_id_object_pairs[2][1]
sf[vrs_id_object_pairs[3][0]] = vrs_id_object_pairs[3][1]
sf[vrs_id_object_pairs[4][0]] = vrs_id_object_pairs[4][1]
sf[vrs_id_object_pairs[5][0]] = vrs_id_object_pairs[5][1]
sf[vrs_id_object_pairs[6][0]] = vrs_id_object_pairs[6][1]
sf[vrs_id_object_pairs[7][0]] = vrs_id_object_pairs[7][1]
sf[vrs_id_object_pairs[8][0]] = vrs_id_object_pairs[8][1]
sf[vrs_id_object_pairs[9][0]] = vrs_id_object_pairs[9][1]
sf[vrs_id_object_pairs[10][0]] = vrs_id_object_pairs[10][1]

assert sf.num_pending_batches() > 0
sf.close()
Expand Down