Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace usages of 'elastic_server' with 'sync_orchestrator' #2076

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions connectors/sync_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __init__(
self.sync_job = sync_job
self.connector = connector
self.es_config = es_config
self.elastic_server = None
self.sync_orchestrator = None
self.job_reporting_task = None
self.bulk_options = self.es_config.get("bulk", {})
self._start_time = None
Expand Down Expand Up @@ -132,7 +132,9 @@ async def execute(self):
bulk_options = self.bulk_options.copy()
self.data_provider.tweak_bulk_options(bulk_options)

self.elastic_server = SyncOrchestrator(self.es_config, self.sync_job.logger)
self.sync_orchestrator = SyncOrchestrator(
self.es_config, self.sync_job.logger
)

if job_type in [JobType.INCREMENTAL, JobType.FULL]:
self.sync_job.log_info(f"Executing {job_type.value} sync")
Expand All @@ -147,10 +149,10 @@ async def execute(self):
self.update_ingestion_stats(JOB_REPORTING_INTERVAL)
)

while not self.elastic_server.done():
while not self.sync_orchestrator.done():
await self.check_job()
await asyncio.sleep(JOB_CHECK_INTERVAL)
fetch_error = self.elastic_server.fetch_error()
fetch_error = self.sync_orchestrator.fetch_error()
sync_status = (
JobStatus.COMPLETED if fetch_error is None else JobStatus.ERROR
)
Expand All @@ -164,8 +166,8 @@ async def execute(self):
await self._sync_done(sync_status=JobStatus.ERROR, sync_error=e)
finally:
self.running = False
if self.elastic_server is not None:
await self.elastic_server.close()
if self.sync_orchestrator is not None:
await self.sync_orchestrator.close()
if self.data_provider is not None:
await self.data_provider.close()

Expand All @@ -174,14 +176,16 @@ async def _execute_access_control_sync_job(self, job_type, bulk_options):
(
is_platinum_license_enabled,
license_enabled,
) = await self.elastic_server.has_active_license_enabled(License.PLATINUM)
) = await self.sync_orchestrator.has_active_license_enabled(
License.PLATINUM
)

if not is_platinum_license_enabled:
raise InsufficientESLicenseError(
required_license=License.PLATINUM, actual_license=license_enabled
)

await self.elastic_server.async_bulk(
await self.sync_orchestrator.async_bulk(
self.sync_job.index_name,
self.generator(),
self.sync_job.pipeline,
Expand All @@ -196,7 +200,7 @@ async def _execute_content_sync_job(self, job_type, bulk_options):

logger.debug("Preparing the content index")

await self.elastic_server.prepare_content_index(
await self.sync_orchestrator.prepare_content_index(
index=self.sync_job.index_name, language_code=self.sync_job.language
)

Expand All @@ -205,7 +209,7 @@ async def _execute_content_sync_job(self, job_type, bulk_options):
or self.sync_job.pipeline["extract_binary_content"]
)

await self.elastic_server.async_bulk(
await self.sync_orchestrator.async_bulk(
self.sync_job.index_name,
self.prepare_docs(),
self.sync_job.pipeline,
Expand All @@ -217,8 +221,8 @@ async def _execute_content_sync_job(self, job_type, bulk_options):
)

async def _sync_done(self, sync_status, sync_error=None):
if self.elastic_server is not None and not self.elastic_server.done():
await self.elastic_server.cancel()
if self.sync_orchestrator is not None and not self.sync_orchestrator.done():
await self.sync_orchestrator.cancel()
if self.job_reporting_task is not None and not self.job_reporting_task.done():
self.job_reporting_task.cancel()
try:
Expand All @@ -227,7 +231,9 @@ async def _sync_done(self, sync_status, sync_error=None):
self.sync_job.log_debug("Job reporting task is stopped.")

result = (
{} if self.elastic_server is None else self.elastic_server.ingestion_stats()
{}
if self.sync_orchestrator is None
else self.sync_orchestrator.ingestion_stats()
)
ingestion_stats = {
"indexed_document_count": result.get("indexed_document_count", 0),
Expand Down Expand Up @@ -361,7 +367,7 @@ async def update_ingestion_stats(self, interval):
if not await self.reload_sync_job():
break

result = self.elastic_server.ingestion_stats()
result = self.sync_orchestrator.ingestion_stats()
ingestion_stats = {
"indexed_document_count": result.get("indexed_document_count", 0),
"indexed_document_volume": result.get("indexed_document_volume", 0),
Expand Down
2 changes: 1 addition & 1 deletion tests/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ async def test_batch_bulk_with_retry():
],
)
@pytest.mark.asyncio
async def test_elastic_server_done(
async def test_sync_orchestrator_done(
extractor_task, extractor_task_done, sink_task, sink_task_done, expected_result
):
if extractor_task is not None:
Expand Down
Loading
Loading