From 6149f4274d3ba5082e3ae0c49815f22d7f3a4677 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Tue, 30 Jul 2024 16:57:22 -0700 Subject: [PATCH 1/2] refactor: process annotations after spans --- src/phoenix/db/bulk_inserter.py | 8 ++++---- src/phoenix/db/insertion/constants.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/phoenix/db/bulk_inserter.py b/src/phoenix/db/bulk_inserter.py index d0f914c59e..a1abc3da8f 100644 --- a/src/phoenix/db/bulk_inserter.py +++ b/src/phoenix/db/bulk_inserter.py @@ -154,10 +154,6 @@ async def _bulk_insert(self) -> None: or self._spans or self._evaluations ): - if not self._queue_inserters.empty: - if inserted_ids := await self._queue_inserters.insert(): - for project_rowid in await self._get_project_rowids(inserted_ids): - self._last_updated_at_by_project[project_rowid] = datetime.now(timezone.utc) if self._operations.empty() and not (self._spans or self._evaluations): await asyncio.sleep(self._sleep) continue @@ -200,6 +196,10 @@ async def _bulk_insert(self) -> None: evaluations_buffer = None for project_rowid in transaction_result.updated_project_rowids: self._last_updated_at_by_project[project_rowid] = datetime.now(timezone.utc) + if not self._queue_inserters.empty: + if inserted_ids := await self._queue_inserters.insert(): + for project_rowid in await self._get_project_rowids(inserted_ids): + self._last_updated_at_by_project[project_rowid] = datetime.now(timezone.utc) await asyncio.sleep(self._sleep) async def _insert_spans(self, spans: List[Tuple[Span, str]]) -> TransactionResult: diff --git a/src/phoenix/db/insertion/constants.py b/src/phoenix/db/insertion/constants.py index 4462df5d3f..6028ca3718 100644 --- a/src/phoenix/db/insertion/constants.py +++ b/src/phoenix/db/insertion/constants.py @@ -1,2 +1,2 @@ -DEFAULT_RETRY_DELAY_SEC: float = 60 -DEFAULT_RETRY_ALLOWANCE: int = 10 +DEFAULT_RETRY_DELAY_SEC: float = 10 +DEFAULT_RETRY_ALLOWANCE: int = 60 From 89fb7ef4d7214ca93847237fe66c34d5da2b2a17 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Tue, 30 Jul 2024 17:10:27 -0700 Subject: [PATCH 2/2] clean up --- src/phoenix/db/bulk_inserter.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/phoenix/db/bulk_inserter.py b/src/phoenix/db/bulk_inserter.py index a1abc3da8f..1a0f2f39ab 100644 --- a/src/phoenix/db/bulk_inserter.py +++ b/src/phoenix/db/bulk_inserter.py @@ -154,7 +154,12 @@ async def _bulk_insert(self) -> None: or self._spans or self._evaluations ): - if self._operations.empty() and not (self._spans or self._evaluations): + if ( + self._queue_inserters.empty + and self._operations.empty() + and not self._spans + and not self._evaluations + ): await asyncio.sleep(self._sleep) continue ops_remaining, events = self._max_ops_per_transaction, []