Skip to content

Commit

Permalink
feat(ingest): measure sink bottlenecking (#10628)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jun 4, 2024
1 parent 8f40229 commit fcab544
Showing 1 changed file with 31 additions and 25 deletions.
56 changes: 31 additions & 25 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import concurrent.futures
import contextlib
import dataclasses
import functools
import logging
import uuid
from dataclasses import dataclass
from enum import auto
from typing import Optional, Union

Expand All @@ -29,6 +29,7 @@
MetadataChangeProposal,
)
from datahub.utilities.advanced_thread_executor import PartitionExecutor
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.server_config_util import set_gms_config

logger = logging.getLogger(__name__)
Expand All @@ -44,15 +45,17 @@ class DatahubRestSinkConfig(DatahubClientConfig):

# These only apply in async mode.
max_threads: int = 15
max_pending_requests: int = 500
max_pending_requests: int = 2000


@dataclass
@dataclasses.dataclass
class DataHubRestSinkReport(SinkReport):
max_threads: int = -1
gms_version: str = ""
max_threads: Optional[int] = None
gms_version: Optional[str] = None
pending_requests: int = 0

main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)

def compute_stats(self) -> None:
super().compute_stats()

Expand Down Expand Up @@ -105,7 +108,7 @@ def __post_init__(self) -> None:
self.report.gms_version = (
gms_config.get("versions", {})
.get("acryldata/datahub", {})
.get("version", "")
.get("version", None)
)
self.report.max_threads = self.config.max_threads
logger.debug("Setting env variables to override config")
Expand Down Expand Up @@ -189,25 +192,28 @@ def write_record_async(
],
write_callback: WriteCallback,
) -> None:
record = record_envelope.record
if self.config.mode == SyncOrAsync.ASYNC:
partition_key = _get_partition_key(record_envelope)
self.executor.submit(
partition_key,
self._emit_wrapper,
record,
done_callback=functools.partial(
self._write_done_callback, record_envelope, write_callback
),
)
self.report.pending_requests += 1
else:
# execute synchronously
try:
self._emit_wrapper(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})
# Because the default is async mode and most sources are slower than the sink, this
# should only have a high value if the sink is actually a bottleneck.
with self.report.main_thread_blocking_timer:
record = record_envelope.record
if self.config.mode == SyncOrAsync.ASYNC:
partition_key = _get_partition_key(record_envelope)
self.executor.submit(
partition_key,
self._emit_wrapper,
record,
done_callback=functools.partial(
self._write_done_callback, record_envelope, write_callback
),
)
self.report.pending_requests += 1
else:
# execute synchronously
try:
self._emit_wrapper(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})

def emit_async(
self,
Expand Down

0 comments on commit fcab544

Please sign in to comment.