Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter to /vcf endpoint to force async writes to complete before returning response #75

Merged
merged 8 commits into from
Jan 23, 2024
9 changes: 8 additions & 1 deletion src/anyvar/restapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,17 @@ def register_vrs_object(
tags=[EndpointTag.VARIATIONS],
)
async def annotate_vcf(
request: Request, vcf: UploadFile = File(..., description="VCF to register and annotate")
request: Request,
vcf: UploadFile = File(..., description="VCF to register and annotate"),
allow_async_write: bool = Query(
default=False, description="Whether to allow asynchronous write of VRS objects to database"
),
):
"""Register alleles from a VCF and return a file annotated with VRS IDs.

:param request: FastAPI request object
:param vcf: incoming VCF file object
:param allow_async_write: whether to allow async database writes
:return: streamed annotated file
"""
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
Expand All @@ -217,6 +222,8 @@ async def annotate_vcf(
except ValueError as e:
_logger.error(f"Encountered error during VCF registration: {e}")
return {"error": "Encountered ValueError when registering VCF"}
if not allow_async_write:
av.object_store.wait_for_writes()
return FileResponse(temp_out_file.name)


Expand Down
4 changes: 4 additions & 0 deletions src/anyvar/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def get_variation_count(self, variation_type: VariationStatisticType) -> int:
def wipe_db(self):
"""Empty database of all stored records."""

@abstractmethod
def wait_for_writes(self):
"""Return true once any currently pending database modifications have been completed."""


class _BatchManager(AbstractContextManager):
"""Base context management class for batch writing.
Expand Down
4 changes: 4 additions & 0 deletions src/anyvar/storage/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ def __delitem__(self, name: str) -> None:
cur.execute("DELETE FROM vrs_objects WHERE vrs_id = %s;", [name])
self.conn.commit()

def wait_for_writes(self):
"""Return true once any currently pending database modifications have been completed."""
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the pass is necessary. Is there an existing issue to do this for postgres backend?

Suggested change
pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we shouldn't worry about implementing it PG-side until there's a use case/demand for doing something like queueing. I think there's a world in which it could be preferable to retain the PG implementation as a more streamlined read/write option.

That said, it might be good to note in the docstring that this method is just a stub to retain compatibility with the base class, but shouldn't do anything unless we decide to implement queueing logic for the postgres writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I don't think it should be an abstractmethod in the _Storage class and should only exist in the snowflake backend

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the wait_for_writes() method should remain on _Storage because it is called from anyvar.restapi.main which does not understand which backend it is using. The REST API method implementations only calls methods on the _Storage base class. Without it on the base class, calling /vcf with allow_async_write=no results in a 500 error if the backend store in Postgres.

I cleaned up the comments and removed the pass line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ehclark that makes sense. Thanks


def close(self):
"""Terminate connection if necessary."""
if self.conn is not None:
Expand Down
26 changes: 25 additions & 1 deletion src/anyvar/storage/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,30 @@ def __delitem__(self, name: str) -> None:
cur.execute(f"DELETE FROM {self.table_name} WHERE vrs_id = ?;", [name]) # nosec B608
self.conn.commit()

def wait_for_writes(self):
"""Return true once any currently pending database modifications have been completed."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but I don't think this method returns true

if self.batch_thread is not None:
# short circuit if the queue is empty
with self.batch_thread.cond:
if len(self.batch_thread.pending_batch_list) == 0:
ehclark marked this conversation as resolved.
Show resolved Hide resolved
return

# queue an empty batch
batch = []
self.batch_thread.queue_batch(batch)
# wait for the batch to be removed from the pending queue
while True:
with self.batch_thread.cond:
if (
len(
list(filter(lambda x: x is batch, self.batch_thread.pending_batch_list))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checking that is is appropriate here -- the object doesn't get copied or moved around while being held on the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this should be a strict identity check. The empty batch is essentially a sentinel object and batch objects are not modified as they move through the queue.

)
> 0
):
ehclark marked this conversation as resolved.
Show resolved Hide resolved
self.batch_thread.cond.wait()
else:
break

def close(self):
"""Stop the batch thread and wait for it to complete"""
if self.batch_thread is not None:
Expand Down Expand Up @@ -420,7 +444,7 @@ def queue_batch(self, batch_insert_values: List[Tuple]):
:param batch_insert_values: list of tuples where each tuple consists of (vrs_id, vrs_object)
"""
with self.cond:
if batch_insert_values:
if batch_insert_values is not None:
_logger.info("Queueing batch of %s items", len(batch_insert_values))
while len(self.pending_batch_list) >= self.max_pending_batches:
_logger.debug("Pending batch queue is full, waiting for space...")
Expand Down
18 changes: 16 additions & 2 deletions tests/storage/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,22 @@ def test_batch_mgmt_and_async_write_single_thread(mocker):

sf = SnowflakeObjectStore("snowflake://account/?param=value", 2, "vrs_objects2", 4)
with sf.batch_manager(sf):
for vo in vrs_id_object_pairs:
sf[vo[0]] = vo[1]
sf.wait_for_writes()
assert sf.num_pending_batches() == 0
sf[vrs_id_object_pairs[0][0]] = vrs_id_object_pairs[0][1]
sf[vrs_id_object_pairs[1][0]] = vrs_id_object_pairs[1][1]
assert sf.num_pending_batches() > 0
sf.wait_for_writes()
assert sf.num_pending_batches() == 0
sf[vrs_id_object_pairs[2][0]] = vrs_id_object_pairs[2][1]
sf[vrs_id_object_pairs[3][0]] = vrs_id_object_pairs[3][1]
sf[vrs_id_object_pairs[4][0]] = vrs_id_object_pairs[4][1]
sf[vrs_id_object_pairs[5][0]] = vrs_id_object_pairs[5][1]
sf[vrs_id_object_pairs[6][0]] = vrs_id_object_pairs[6][1]
sf[vrs_id_object_pairs[7][0]] = vrs_id_object_pairs[7][1]
sf[vrs_id_object_pairs[8][0]] = vrs_id_object_pairs[8][1]
sf[vrs_id_object_pairs[9][0]] = vrs_id_object_pairs[9][1]
sf[vrs_id_object_pairs[10][0]] = vrs_id_object_pairs[10][1]

assert sf.num_pending_batches() > 0
sf.close()
Expand Down