Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector APIs] Connector update last sync info, status, error #2641

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ async def connector_check_in(self, connector_id):
headers={"accept": "application/json"},
)

async def connector_update_error(self, connector_id, error):
await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_error",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={"error": error},
)

async def connector_update_last_sync_info(self, connector_id, last_sync_info):
await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_last_sync",
headers={"accept": "application/json", "Content-Type": "application/json"},
body=last_sync_info,
)

async def connector_update_filtering_draft_validation(
self, connector_id, validation_result
):
Expand Down Expand Up @@ -98,6 +114,20 @@ async def connector_check_in(self, connector_id):
partial(self._api_wrapper.connector_check_in, connector_id)
)

async def connector_update_error(self, connector_id, error):
await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_update_error, connector_id, error)
)

async def connector_update_last_sync_info(self, connector_id, last_sync_info):
await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_update_last_sync_info,
connector_id,
last_sync_info,
)
)

async def connector_update_filtering_draft_validation(
self, connector_id, validation_result
):
Expand Down
73 changes: 49 additions & 24 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,12 +707,17 @@ def next_sync(self, job_type, now):
return next_run(scheduling_property.get("interval"), now)

async def _update_datetime(self, field, new_ts):
await self.index.update(
doc_id=self.id,
doc={field: iso_utc(new_ts)},
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)
if self.index.feature_use_connectors_api:
await self.index.api.connector_update_last_sync_info(
connector_id=self.id, last_sync_info={field: iso_utc(new_ts)}
)
else:
await self.index.update(
doc_id=self.id,
doc={field: iso_utc(new_ts)},
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)

async def update_last_sync_scheduled_at_by_job_type(self, job_type, new_ts):
match job_type:
Expand Down Expand Up @@ -745,24 +750,37 @@ async def sync_starts(self, job_type):
msg = f"Unknown job type: {job_type}"
raise ValueError(msg)

doc = {
"status": Status.CONNECTED.value,
"error": None,
} | last_sync_information
if self.index.feature_use_connectors_api:
await self.index.api.connector_update_error(
connector_id=self.id, error=None
)
await self.index.api.connector_update_last_sync_info(
connector_id=self.id, last_sync_info=last_sync_information
)
else:
doc = {
"status": Status.CONNECTED.value,
"error": None,
} | last_sync_information

await self.index.update(
doc_id=self.id,
doc=doc,
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)
await self.index.update(
doc_id=self.id,
doc=doc,
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)

async def error(self, error):
doc = {
"status": Status.ERROR.value,
"error": str(error),
}
await self.index.update(doc_id=self.id, doc=doc)
if self.index.feature_use_connectors_api:
await self.index.api.connector_update_error(
connector_id=self.id, error=error
)
else:
doc = {
"status": Status.ERROR.value,
"error": str(error),
}
await self.index.update(doc_id=self.id, doc=doc)

async def sync_done(self, job, cursor=None):
job_status = JobStatus.ERROR if job is None else job.status
Expand Down Expand Up @@ -801,8 +819,6 @@ async def sync_done(self, job, cursor=None):

doc = {
"last_synced": iso_utc(),
"status": connector_status.value,
"error": job_error,
} | last_sync_information

# only update sync cursor after a successful content sync job
Expand All @@ -813,7 +829,16 @@ async def sync_done(self, job, cursor=None):
doc["last_indexed_document_count"] = job.indexed_document_count
doc["last_deleted_document_count"] = job.deleted_document_count

await self.index.update(doc_id=self.id, doc=doc)
if self.index.feature_use_connectors_api:
await self.index.api.connector_update_error(
connector_id=self.id, error=job_error
)
await self.index.api.connector_update_last_sync_info(
connector_id=self.id, last_sync_info=last_sync_information
)
else:
doc = doc | {"status": connector_status.value, "error": job_error}
await self.index.update(doc_id=self.id, doc=doc)

@with_concurrency_control()
async def prepare(self, config, sources):
Expand Down
2 changes: 1 addition & 1 deletion connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def _callback(self, task):
)
elif task.exception():
logger.error(
f"Exception found for task {task.get_name()}: {task.exception()}",
f"Exception found for task {task.get_name()}: {task.exception()} {task}"
)

def _add_task(self, coroutine, name=None):
Expand Down
Loading