Skip to content

Commit

Permalink
kafka_robustness
Browse files Browse the repository at this point in the history
  • Loading branch information
JKL98ISR committed Sep 15, 2024
1 parent 3f842ed commit 50d286b
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions backend/deepchecks_monitoring/logic/data_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,16 @@ async def _send_with_retry(
results = await asyncio.gather(*send_futures, return_exceptions=True)

failed_messages = [
(key, message) for ((key, message), result) in zip(messages, results)
((key, message), result) for ((key, message), result) in zip(messages, results)
if isinstance(result, Exception)
]

for ((key, _), result) in failed_messages:
self.logger.warning(f"Failed message key: {key.decode() if key else 'None'} with error {str(result)}")

if failed_messages:
self.logger.warning(f"Retry {retry_count + 1}: {len(failed_messages)} messages failed to send.")
messages = failed_messages
messages = [msg for msg, _ in failed_messages]
retry_count += 1

if retry_count < max_retries:
Expand All @@ -385,8 +388,6 @@ async def _send_with_retry(
f"Failed to send {len(failed_messages)} "
f"messages after {max_retries} retries out of original {total_msgs}."
)
for key, _ in failed_messages:
self.logger.error(f"Failed message key: {key.decode() if key else 'None'}")

async def log_samples(
self,
Expand Down

0 comments on commit 50d286b

Please sign in to comment.