Skip to content

Commit

Permalink
Actually make use of Error Monitor and add configuration option for it
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-shelkovnikov committed Jul 4, 2024
1 parent 7bc85bb commit 1dbeb4c
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 43 deletions.
7 changes: 7 additions & 0 deletions connectors/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ def _default_config():
"preflight_idle": 30,
"max_errors": 20,
"max_errors_span": 600,
"error_monitor": {
"max_total_errors": 1000,
"max_consecutive_errors": 10,
"max_error_rate": 0.15,
"error_window_size": 100,
"error_queue_size": 10,
},
"max_concurrent_content_syncs": 1,
"max_concurrent_access_control_syncs": 1,
"max_file_download_size": DEFAULT_MAX_FILE_SIZE,
Expand Down
77 changes: 50 additions & 27 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ def __init__(self, cause=None):
self.__cause__ = cause


class DocumentIngestionError(Exception):
pass


class Sink:
"""Send bulk operations in batches by consuming a queue.
Expand All @@ -136,6 +140,7 @@ def __init__(
max_concurrency,
max_retries,
retry_interval,
error_monitor,
logger_=None,
enable_bulk_operations_logging=False,
):
Expand All @@ -145,6 +150,7 @@ def __init__(
self.pipeline = pipeline
self.chunk_mem_size = chunk_mem_size * 1024 * 1024
self.bulk_tasks = ConcurrentTasks(max_concurrency=max_concurrency)
self.error_monitor = error_monitor
self.max_retires = max_retries
self.retry_interval = retry_interval
self.error = None
Expand Down Expand Up @@ -272,18 +278,19 @@ async def _process_bulk_response(self, res, ids_to_ops, do_log=False):
successful_result = result in SUCCESSFUL_RESULTS
if not successful_result:
if "error" in item[action_item]:
message = f"Failed to execute '{action_item}' on document with id '{doc_id}'. Error: {item[action_item].get('error')}"
self.error_monitor.track_error(DocumentIngestionError(message))
if do_log:
self._logger.debug(
f"Failed to execute '{action_item}' on document with id '{doc_id}'. Error: {item[action_item].get('error')}"
)
self._logger.debug(message)
self.counters.increment(RESULT_ERROR, namespace=BULK_RESPONSES)
else:
message = f"Executed '{action_item}' on document with id '{doc_id}', but got non-successful result: {result}"
self.error_monitor.track_error(DocumentIngestionError(message))
if do_log:
self._logger.debug(
f"Executed '{action_item}' on document with id '{doc_id}', but got non-successful result: {result}"
)
self._logger.debug(message)
self.counters.increment(RESULT_UNDEFINED, namespace=BULK_RESPONSES)
else:
self.error_monitor.track_success()
if do_log:
self._logger.debug(
f"Successfully executed '{action_item}' on document with id '{doc_id}'. Result: {result}"
Expand Down Expand Up @@ -424,6 +431,7 @@ def __init__(
client,
queue,
index,
error_monitor,
filter_=None,
sync_rules_enabled=False,
content_extraction_enabled=True,
Expand All @@ -449,27 +457,37 @@ def __init__(
self.concurrent_downloads = concurrent_downloads
self._logger = logger_ or logger
self._canceled = False
self.error_monitor = error_monitor
self.skip_unchanged_documents = skip_unchanged_documents

async def _deferred_index(self, lazy_download, doc_id, doc, operation):
data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD])

if data is not None:
self.counters.increment(BIN_DOCS_DOWNLOADED)
data.pop("_id", None)
data.pop(TIMESTAMP_FIELD, None)
doc.update(data)

doc.pop("_original_filename", None)

await self.put_doc(
{
"_op_type": operation,
"_index": self.index,
"_id": doc_id,
"doc": doc,
}
)
try:
data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD])

if data is not None:
self.counters.increment(BIN_DOCS_DOWNLOADED)
data.pop("_id", None)
data.pop(TIMESTAMP_FIELD, None)
doc.update(data)

doc.pop("_original_filename", None)

await self.put_doc(
{
"_op_type": operation,
"_index": self.index,
"_id": doc_id,
"doc": doc,
}
)
self.error_monitor.track_success()
except ForceCanceledError:
raise
except Exception as ex:
self._logger.error(
f"Failed to do deferred index operation for doc {doc_id}: {ex}"
)
self.error_monitor.track_error(ex)

def force_cancel(self):
self._canceled = True
Expand Down Expand Up @@ -599,6 +617,8 @@ async def get_docs(self, generator, skip_unchanged_documents=False):
}
)

lazy_downloads.raise_any_exception()

await asyncio.sleep(0)
finally:
# wait for all downloads to be finished
Expand Down Expand Up @@ -803,7 +823,8 @@ class SyncOrchestrator:
- once they are both over, returns totals
"""

def __init__(self, elastic_config, logger_=None):
def __init__(self, elastic_config, error_monitor, logger_=None):
self.error_monitor = error_monitor
self._logger = logger_ or logger
self._logger.debug(f"SyncOrchestrator connecting to {elastic_config['host']}")
self.es_management_client = ESManagementClient(elastic_config)
Expand Down Expand Up @@ -884,7 +905,7 @@ def _extractor_task_running(self):
async def cancel(self):
if self._sink_task_running():
self._logger.info(
f"Cancling the Sink task: {self._sink_task.name}" # pyright: ignore
f"Canceling the Sink task: {self._sink_task.get_name()}" # pyright: ignore
)
self._sink_task.cancel()
else:
Expand All @@ -894,7 +915,7 @@ async def cancel(self):

if self._extractor_task_running():
self._logger.info(
f"Canceling the Extractor task: {self._extractor_task.name}" # pyright: ignore
f"Canceling the Extractor task: {self._extractor_task.get_name()}" # pyright: ignore
)
self._extractor_task.cancel()
else:
Expand Down Expand Up @@ -1005,6 +1026,7 @@ async def async_bulk(
self.es_management_client,
stream,
index,
error_monitor=self.error_monitor,
filter_=filter_,
sync_rules_enabled=sync_rules_enabled,
content_extraction_enabled=content_extraction_enabled,
Expand All @@ -1031,6 +1053,7 @@ async def async_bulk(
max_concurrency=max_concurrency,
max_retries=max_bulk_retries,
retry_interval=retry_interval,
error_monitor=self.error_monitor,
logger_=self._logger,
enable_bulk_operations_logging=enable_bulk_operations_logging,
)
Expand Down
8 changes: 6 additions & 2 deletions connectors/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from connectors.logger import logger
from connectors.utils import (
TIKA_SUPPORTED_FILETYPES,
ErrorMonitor,
convert_to_b64,
epoch_timestamp_zulu,
get_file_extension,
Expand Down Expand Up @@ -410,7 +409,7 @@ def __init__(self, configuration):

# this will be overwritten by set_framework_config()
self.framework_config = DataSourceFrameworkConfig.Builder().build()
self.error_monitor = ErrorMonitor()
self.error_monitor = None

def __str__(self):
return f"Datasource `{self.__class__.name}`"
Expand All @@ -419,6 +418,9 @@ def set_logger(self, logger_):
self._logger = logger_
self._set_internal_logger()

def set_error_monitor(self, error_monitor):
self.error_monitor = error_monitor

def _set_internal_logger(self):
# no op for BaseDataSource
# if there are internal class (e.g. Client class) to which the logger need to be set,
Expand Down Expand Up @@ -778,12 +780,14 @@ async def download_and_extract_file(
doc = await self.handle_file_content_extraction(
doc, source_filename, temp_filename
)
self.error_monitor.track_success()
return doc
except Exception as e:
self._logger.warning(
f"File download and extraction or conversion for file {source_filename} failed: {e}",
exc_info=True,
)
self.error_monitor.track_error(e)
if return_doc_if_failed:
return doc
else:
Expand Down
5 changes: 3 additions & 2 deletions connectors/sources/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,9 @@ def _serialize(value):
value = _serialize(value.as_doc().to_dict())
return value

for key, value in doc.items():
doc[key] = _serialize(value)
with self.with_error_monitoring():
for key, value in doc.items():
doc[key] = _serialize(value)

return doc

Expand Down
7 changes: 5 additions & 2 deletions connectors/sync_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
INDEXED_DOCUMENT_VOLUME,
)
from connectors.source import BaseDataSource
from connectors.utils import truncate_id
from connectors.utils import ErrorMonitor, truncate_id

UTF_8 = "utf-8"

Expand Down Expand Up @@ -113,6 +113,8 @@ def __init__(
self.es_config = es_config
self.service_config = service_config
self.sync_orchestrator = None
error_monitor_config = service_config.get("error_monitor", {})
self.error_monitor = ErrorMonitor(**error_monitor_config)
self.job_reporting_task = None
self.bulk_options = self.es_config.get("bulk", {})
self._start_time = None
Expand Down Expand Up @@ -149,6 +151,7 @@ async def execute(self):
configuration=self.sync_job.configuration
)
self.data_provider.set_logger(self.sync_job.logger)
self.data_provider.set_error_monitor(self.error_monitor)
self.data_provider.set_framework_config(
self._data_source_framework_config()
)
Expand Down Expand Up @@ -183,7 +186,7 @@ async def execute(self):
await self._update_native_connector_authentication()

self.sync_orchestrator = SyncOrchestrator(
self.es_config, self.sync_job.logger
self.es_config, self.error_monitor, self.sync_job.logger
)

if job_type in [JobType.INCREMENTAL, JobType.FULL]:
Expand Down
8 changes: 4 additions & 4 deletions connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ def __init__(
max_consecutive_errors=10,
max_error_rate=0.15,
error_window_size=100,
error_queue_size=20,
error_queue_size=10,
):
self.max_error_rate = max_error_rate
self.error_window_size = error_window_size
Expand Down Expand Up @@ -1082,13 +1082,13 @@ def _error_window_error_rate(self):

def _raise_if_necessary(self):
if self.consecutive_error_count > self.max_consecutive_errors:
msg = f"Exceeded maximum consecutive errors - saw {self.consecutive_error_count} errors in a row"
msg = f"Exceeded maximum consecutive errors - saw {self.consecutive_error_count} errors in a row. Last error: {self.last_error}"
raise TooManyErrors(msg) from self.last_error
elif self.total_error_count > self.max_total_errors:
msg = f"Exceeded maximum total error count - saw {self.total_error_count} errors"
msg = f"Exceeded maximum total error count - saw {self.total_error_count} errors. Last error: {self.last_error}"
raise TooManyErrors(msg) from self.last_error
elif self.error_window_size > 0:
error_rate = self._error_window_error_rate()
if error_rate > self.max_error_rate:
msg = f"Exceeded maximum error ratio of {self.max_error_rate} for last {self.error_window_size} operations."
msg = f"Exceeded maximum error ratio of {self.max_error_rate} for last {self.error_window_size} operations. Last error: {self.last_error}"
raise TooManyErrors(msg) from self.last_error
2 changes: 2 additions & 0 deletions tests/sources/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# you may not use this file except in compliance with the Elastic License 2.0.
#
from contextlib import asynccontextmanager
from unittest.mock import Mock

from connectors.source import DEFAULT_CONFIGURATION, DataSourceConfiguration

Expand All @@ -18,6 +19,7 @@ async def create_source(klass, **extras):
config[k] = DEFAULT_CONFIGURATION.copy() | {"value": v}

source = klass(configuration=DataSourceConfiguration(config))
source.set_error_monitor(Mock())
try:
yield source
finally:
Expand Down
Loading

0 comments on commit 1dbeb4c

Please sign in to comment.