Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 70 additions & 51 deletions backend/python/app/connectors/core/base/sync_service/sync_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,73 +26,84 @@ def __init__(self, logger, celery_app: CeleryApp, arango_service) -> None:
raise ValueError("Celery app is not initialized")

# Check if celery has task decorator
if not hasattr(self.celery, 'task'):
if not hasattr(self.celery, "task"):
self.logger.error("❌ Celery app does not have 'task' attribute!")
self.logger.error(f"Celery app type: {type(self.celery)}")
self.logger.error(f"Celery app attributes: {dir(self.celery)}")
raise AttributeError("Celery app does not have 'task' decorator")

self._setup_tasks_cached = False
self.setup_tasks()

def setup_tasks(self) -> None:
"""Setup Celery task decorators"""
self.logger.info("🔄 Starting task registration")

# Get the Celery app instance - it might be wrapped
celery_instance = self.celery
# To improve performance, avoid repeated costly log events/task registrations if called multiple times
if getattr(self, "_setup_tasks_cached", False):
return

# If CeleryApp is a wrapper, get the actual Celery instance
if hasattr(self.celery, 'app'):
celery_instance = self.celery.app
elif hasattr(self.celery, 'celery'):
celery_instance = self.celery.celery
logger = self.logger # local var for reduced attribute lookups
celery = self.celery

self.logger.info(f"📌 Using celery instance of type: {type(celery_instance)}")
logger.info("🔄 Starting task registration")

# Define the task using the actual Celery instance
@celery_instance.task(
name="app.connectors.core.base.sync_service.sync_tasks.schedule_next_changes_watch",
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
max_retries=5,
)
def schedule_next_changes_watch() -> None:
"""Renew watches for all services"""
try:
self.logger.info("🔄 Starting scheduled watch renewal cycle")
self.logger.info("📅 Current execution time: %s", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# Efficiently get the Celery app instance
celery_instance = getattr(celery, "app", getattr(celery, "celery", celery))

# Create event loop for async operations
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info(f"📌 Using celery instance of type: {type(celery_instance)}")

try:
# Create and run the coroutine
loop.run_until_complete(self._async_schedule_next_changes_watch())
finally:
loop.close()
# Use locals to avoid lookups in innermost Celery task function
arango_service = self.arango_service

self.logger.info("✅ Watch renewal cycle completed")
# Only define and register the task if not already set
if not hasattr(self, "schedule_next_changes_watch"):

except Exception as e:
self.logger.error(f"❌ Critical error in watch renewal cycle: {str(e)}")
self.logger.exception("Detailed error information:")
# Only retry for specific exceptions that warrant retries
if isinstance(e, (ConnectionError, TimeoutError)):
raise
return # Don't retry for other exceptions
@celery_instance.task(
name="app.connectors.core.base.sync_service.sync_tasks.schedule_next_changes_watch",
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
max_retries=5,
)
def schedule_next_changes_watch() -> None:
"""Renew watches for all services"""

# Store the task as an instance attribute
self.schedule_next_changes_watch = schedule_next_changes_watch
self.logger.info("✅ Watch renewal task registered successfully")
try:
logger.info("🔄 Starting scheduled watch renewal cycle")
logger.info(
"📅 Current execution time: %s",
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)

# Create event loop for async operations
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
self._async_schedule_next_changes_watch()
)
finally:
loop.close()

logger.info("✅ Watch renewal cycle completed")
except Exception as e:
logger.error(f"❌ Critical error in watch renewal cycle: {str(e)}")
logger.exception("Detailed error information:")
if isinstance(e, (ConnectionError, TimeoutError)):
raise
return # Don't retry for other exceptions

self.schedule_next_changes_watch = schedule_next_changes_watch

logger.info("✅ Watch renewal task registered successfully")
self._setup_tasks_cached = True

def register_connector_sync_control(
self,
connector_name: str,
sync_control_method: Callable,
task_name: Optional[str] = None
task_name: Optional[str] = None,
) -> None:
"""
Register a connector's manual sync control method
Expand All @@ -108,9 +119,9 @@ def register_connector_sync_control(

# Get the Celery app instance
celery_instance = self.celery
if hasattr(self.celery, 'app'):
if hasattr(self.celery, "app"):
celery_instance = self.celery.app
elif hasattr(self.celery, 'celery'):
elif hasattr(self.celery, "celery"):
celery_instance = self.celery.celery

# Create the Celery task
Expand All @@ -136,22 +147,28 @@ def manual_sync_control_task(action: str, org_id: str) -> Dict[str, Any]:

try:
# Call the registered sync control method
result = loop.run_until_complete(sync_control_method(action, org_id))
result = loop.run_until_complete(
sync_control_method(action, org_id)
)
return result
finally:
loop.close()

except Exception as e:
self.logger.error(f"Error in manual sync control for {connector_name}: {str(e)}")
self.logger.error(
f"Error in manual sync control for {connector_name}: {str(e)}"
)
return {"status": "error", "message": str(e)}

# Store the task and method
self.registered_connectors[connector_name] = {
"task": manual_sync_control_task,
"method": sync_control_method
"method": sync_control_method,
}

self.logger.info(f"✅ Registered sync control task for connector: {connector_name}")
self.logger.info(
f"✅ Registered sync control task for connector: {connector_name}"
)

async def _async_schedule_next_changes_watch(self) -> None:
"""Async implementation of watch renewal - to be overridden by connectors"""
Expand All @@ -174,7 +191,9 @@ async def _async_schedule_next_changes_watch(self) -> None:
try:
await self._renew_user_watches(email)
except Exception as e:
self.logger.error(f"Failed to renew watches for user {email}: {str(e)}")
self.logger.error(
f"Failed to renew watches for user {email}: {str(e)}"
)
continue

async def _renew_user_watches(self, email: str) -> None:
Expand Down