From c40eefa903ce3b80d120fc4694824a0d20f56252 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Mon, 9 Sep 2024 11:49:23 +0545 Subject: [PATCH 01/11] Introduce telemetry in datachain --- pyproject.toml | 3 +- src/datachain/lib/data_model.py | 2 + src/datachain/lib/dc.py | 23 +++++++++ src/datachain/lib/listing.py | 2 + src/datachain/lib/udf.py | 3 ++ src/datachain/progress.py | 14 +----- src/datachain/telemetry.py | 88 +++++++++++++++++++++++++++++++++ src/datachain/utils.py | 11 +++++ 8 files changed, 133 insertions(+), 13 deletions(-) create mode 100644 src/datachain/telemetry.py diff --git a/pyproject.toml b/pyproject.toml index 222ddb0b7..4daf6b77d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,8 @@ dependencies = [ "Pillow>=10.0.0,<11", "msgpack>=1.0.4,<2", "psutil", - "huggingface_hub" + "huggingface_hub", + "iterative-telemetry>=0.0.8" ] [project.optional-dependencies] diff --git a/src/datachain/lib/data_model.py b/src/datachain/lib/data_model.py index 09a92cc60..46b062f2b 100644 --- a/src/datachain/lib/data_model.py +++ b/src/datachain/lib/data_model.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, create_model from datachain.lib.model_store import ModelStore +from datachain.telemetry import api_telemetry StandardType = Union[ type[int], @@ -32,6 +33,7 @@ def __pydantic_init_subclass__(cls): ModelStore.register(cls) @staticmethod + @api_telemetry def register(models: Union[DataType, Sequence[DataType]]): """For registering classes manually. It accepts a single class or a sequence of classes.""" diff --git a/src/datachain/lib/dc.py b/src/datachain/lib/dc.py index 70057e29a..baadcdc71 100644 --- a/src/datachain/lib/dc.py +++ b/src/datachain/lib/dc.py @@ -58,6 +58,7 @@ ) from datachain.query.schema import DEFAULT_DELIMITER, Column, DatasetRow from datachain.sql.functions import path as pathfunc +from datachain.telemetry import api_telemetry from datachain.utils import inside_notebook if TYPE_CHECKING: @@ -348,6 +349,7 @@ def add_schema(self, signals_schema: SignalSchema) -> "Self": # noqa: D102 return self @classmethod + @api_telemetry def from_storage( cls, uri, @@ -424,6 +426,7 @@ def from_storage( return ls(dc, list_path, recursive=recursive, object_name=object_name) @classmethod + @api_telemetry def from_dataset( cls, name: str, @@ -445,6 +448,7 @@ def from_dataset( return DataChain(name=name, version=version, session=session, settings=settings) @classmethod + @api_telemetry def from_json( cls, path, @@ -510,6 +514,7 @@ def jmespath_to_name(s: str): return chain.gen(**signal_dict) # type: ignore[misc, arg-type] @classmethod + @api_telemetry def from_jsonl( cls, path, @@ -570,6 +575,7 @@ def jmespath_to_name(s: str): return chain.gen(**signal_dict) # type: ignore[misc, arg-type] @classmethod + @api_telemetry def datasets( cls, session: Optional[Session] = None, @@ -684,6 +690,7 @@ def print_jsonl_schema( # type: ignore[override] output=str, ) + @api_telemetry def save( # type: ignore[override] self, name: Optional[str] = None, version: Optional[int] = None, **kwargs ) -> "Self": @@ -697,6 +704,7 @@ def save( # type: ignore[override] schema = self.signals_schema.clone_without_sys_signals().serialize() return super().save(name=name, version=version, feature_schema=schema, **kwargs) + @api_telemetry def apply(self, func, *args, **kwargs): """Apply any function to the chain. @@ -719,6 +727,7 @@ def parse_stem(chain): """ return func(self, *args, **kwargs) + @api_telemetry def map( self, func: Optional[Callable] = None, @@ -768,6 +777,7 @@ def map( return chain.add_schema(udf_obj.output).reset_settings(self._settings) + @api_telemetry def gen( self, func: Optional[Callable] = None, @@ -804,6 +814,7 @@ def gen( return chain.reset_schema(udf_obj.output).reset_settings(self._settings) + @api_telemetry def agg( self, func: Optional[Callable] = None, @@ -845,6 +856,7 @@ def agg( return chain.reset_schema(udf_obj.output).reset_settings(self._settings) + @api_telemetry def batch_map( self, func: Optional[Callable] = None, @@ -960,6 +972,7 @@ def select_except(self, *args: str) -> "Self": chain.signals_schema = new_schema return chain + @api_telemetry @detach def mutate(self, **kwargs) -> "Self": """Create new signals based on existing signals. @@ -1089,6 +1102,7 @@ def collect(self, col: str) -> Iterator[DataType]: ... # type: ignore[overload- @overload def collect(self, *cols: str) -> Iterator[tuple[DataType, ...]]: ... + @api_telemetry def collect(self, *cols: str) -> Iterator[Union[DataType, tuple[DataType, ...]]]: # type: ignore[overload-overlap,misc] """Yields rows of values, optionally limited to the specified columns. @@ -1171,6 +1185,7 @@ def remove_file_signals(self) -> "Self": # noqa: D102 schema = self.signals_schema.clone_without_file_signals() return self.select(*schema.values.keys()) + @api_telemetry @detach def merge( self, @@ -1341,6 +1356,7 @@ def subtract( # type: ignore[override] return super()._subtract(other, signals) # type: ignore[arg-type] @classmethod + @api_telemetry def from_values( cls, ds_name: str = "", @@ -1374,6 +1390,7 @@ def _func_fr() -> Iterator[tuple_type]: # type: ignore[valid-type] return chain.gen(_func_fr, output=output) @classmethod + @api_telemetry def from_pandas( # type: ignore[override] cls, df: "pd.DataFrame", @@ -1417,6 +1434,7 @@ def from_pandas( # type: ignore[override] **fr_map, ) + @api_telemetry def to_pandas(self, flatten=False) -> "pd.DataFrame": """Return a pandas DataFrame from the chain. @@ -1431,6 +1449,7 @@ def to_pandas(self, flatten=False) -> "pd.DataFrame": return pd.DataFrame.from_records(self.results(), columns=columns) + @api_telemetry def show( self, limit: int = 20, @@ -1485,6 +1504,7 @@ def show( print(f"\n[Limited by {len(df)} rows]") @classmethod + @api_telemetry def from_hf( cls, dataset: Union[str, "HFDatasetType"], @@ -1616,6 +1636,7 @@ def parse_tabular( ) @classmethod + @api_telemetry def from_csv( cls, path, @@ -1703,6 +1724,7 @@ def from_csv( ) @classmethod + @api_telemetry def from_parquet( cls, path, @@ -1771,6 +1793,7 @@ def to_parquet( ) @classmethod + @api_telemetry def from_records( cls, to_insert: Optional[Union[dict, list[dict]]], diff --git a/src/datachain/lib/listing.py b/src/datachain/lib/listing.py index ad8f92ff6..5ba6de67c 100644 --- a/src/datachain/lib/listing.py +++ b/src/datachain/lib/listing.py @@ -11,6 +11,7 @@ from datachain.lib.file import File from datachain.query.schema import Column from datachain.sql.functions import path as pathfunc +from datachain.telemetry import telemetry from datachain.utils import uses_glob if TYPE_CHECKING: @@ -79,6 +80,7 @@ def parse_listing_uri(uri: str, cache, client_config) -> tuple[str, str, str]: """ client = Client.get_client(uri, cache, **client_config) storage_uri, path = Client.parse_url(uri) + telemetry.log_param("client", client.PREFIX) # clean path without globs lst_uri_path = ( diff --git a/src/datachain/lib/udf.py b/src/datachain/lib/udf.py index 46a13b218..e30daaedc 100644 --- a/src/datachain/lib/udf.py +++ b/src/datachain/lib/udf.py @@ -17,6 +17,7 @@ from datachain.query.schema import ColumnParameter from datachain.query.udf import UDFBase as _UDFBase from datachain.query.udf import UDFProperties +from datachain.telemetry import api_telemetry if TYPE_CHECKING: from collections.abc import Iterable, Iterator, Sequence @@ -42,6 +43,7 @@ def __init__( self.inner = inner super().__init__(properties) + @api_telemetry def run( self, udf_fields: "Sequence[str]", @@ -69,6 +71,7 @@ def run( if hasattr(self.inner, "teardown") and callable(self.inner.teardown): self.inner.teardown() + @api_telemetry def run_once( self, catalog: "Catalog", diff --git a/src/datachain/progress.py b/src/datachain/progress.py index 99ed0218c..5507742ff 100644 --- a/src/datachain/progress.py +++ b/src/datachain/progress.py @@ -1,8 +1,6 @@ """Manages progress bars.""" import logging -import os -import re import sys from threading import RLock from typing import Any, ClassVar @@ -10,20 +8,12 @@ from fsspec.callbacks import TqdmCallback from tqdm import tqdm +from datachain.utils import env2bool + logger = logging.getLogger(__name__) tqdm.set_lock(RLock()) -def env2bool(var, undefined=False): - """ - undefined: return value if env var is unset - """ - var = os.getenv(var, None) - if var is None: - return undefined - return bool(re.search("1|y|yes|true", var, flags=re.IGNORECASE)) - - class Tqdm(tqdm): """ maximum-compatibility tqdm-based progressbars diff --git a/src/datachain/telemetry.py b/src/datachain/telemetry.py new file mode 100644 index 000000000..0a6ac7739 --- /dev/null +++ b/src/datachain/telemetry.py @@ -0,0 +1,88 @@ +import logging +from functools import wraps +from importlib.metadata import PackageNotFoundError, version + +from iterative_telemetry import IterativeTelemetryLogger + +from datachain.config import read_config +from datachain.utils import DataChainDir, env2bool + +logger = logging.getLogger(__name__) + + +def is_enabled(): + """ + Determine if telemetry is enabled based on environment variables and configuration. + """ + # Disable telemetry if running in test mode + if env2bool("DATACHAIN_TEST"): + return False + + # Check if telemetry is disabled by environment variable + disabled = env2bool("DATACHAIN_NO_ANALYTICS") + if disabled: + logger.debug("Telemetry is disabled by environment variable.") + return False + + # Check if telemetry is disabled by configuration file + config = read_config(DataChainDir.find().root) + if config and config.get("core", {}).get("no_analytics", False): + logger.debug("Telemetry is disabled by configuration.") + return False + + logger.debug("Telemetry is enabled.") + return True + + +# Try to get the version of the datachain package +try: + __version__ = version("datachain") +except PackageNotFoundError: + __version__ = "unknown" + +# Initialize telemetry logger +telemetry = IterativeTelemetryLogger("datachain", __version__, is_enabled) +_is_api_running = False +_pass_params = True + + +def api_telemetry(f): + """ + Decorator to add telemetry logging to API functions. + """ + + @wraps(f) + def wrapper(*args, **kwargs): + # Use global variables to track state + global _is_api_running, _pass_params # noqa: PLW0603 + # Check if an API call is already running + is_nested = _is_api_running + # Check if parameters should be passed to telemetry + pass_params = _pass_params + # Reset global variables + _pass_params = False + _is_api_running = True + try: + # Start telemetry event scope + with telemetry.event_scope("api", f.__name__) as event: + try: + return f(*args, **kwargs) + except Exception as exc: + event.error = exc.__class__.__name__ + raise + finally: + if not is_nested: + # Send telemetry event if not nested + telemetry.send_event( + event.interface, event.action, event.error, **event.kwargs + ) + + finally: + if pass_params: + # Log parameters if pass_params flag is set + for key, value in event.kwargs.items(): + telemetry.log_param(key, value) + _is_api_running = is_nested # Restore previous API running state + _pass_params = pass_params # Restore previous pass_params state + + return wrapper diff --git a/src/datachain/utils.py b/src/datachain/utils.py index 3a06683bc..77e927d06 100644 --- a/src/datachain/utils.py +++ b/src/datachain/utils.py @@ -5,6 +5,7 @@ import os import os.path as osp import random +import re import stat import sys import time @@ -450,3 +451,13 @@ def get_datachain_executable() -> list[str]: def uses_glob(path: str) -> bool: """Checks if some URI path has glob syntax in it""" return glob.has_magic(os.path.basename(os.path.normpath(path))) + + +def env2bool(var, undefined=False): + """ + undefined: return value if env var is unset + """ + var = os.getenv(var, None) + if var is None: + return undefined + return bool(re.search("1|y|yes|true", var, flags=re.IGNORECASE)) From 3a026f49466052cf1c1634b384e15f01ac5b54be Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Mon, 9 Sep 2024 19:40:34 +0545 Subject: [PATCH 02/11] Add test and implement telemetry to api call --- src/datachain/catalog/catalog.py | 8 + src/datachain/cli.py | 279 ++++++++++++++++--------------- src/datachain/telemetry.py | 13 ++ tests/conftest.py | 5 + tests/test_telemetry.py | 24 +++ 5 files changed, 196 insertions(+), 133 deletions(-) create mode 100644 tests/test_telemetry.py diff --git a/src/datachain/catalog/catalog.py b/src/datachain/catalog/catalog.py index 2dbf74b68..2375855f5 100644 --- a/src/datachain/catalog/catalog.py +++ b/src/datachain/catalog/catalog.py @@ -64,6 +64,7 @@ from datachain.remote.studio import StudioClient from datachain.sql.types import JSON, Boolean, DateTime, Int64, SQLType, String from datachain.storage import Storage, StorageStatus, StorageURI +from datachain.telemetry import api_telemetry from datachain.utils import ( DataChainDir, batched, @@ -1415,6 +1416,7 @@ def remove_dataset( version, ) + @api_telemetry def edit_dataset( self, name: str, @@ -1524,6 +1526,7 @@ def ls( for source in data_sources: # type: ignore [union-attr] yield source, source.ls(fields) + @api_telemetry def pull_dataset( self, dataset_uri: str, @@ -1681,6 +1684,7 @@ def _instantiate_dataset(): _instantiate_dataset() + @api_telemetry def clone( self, sources: list[str], @@ -1731,6 +1735,7 @@ def clone( output, sources, client_config=client_config, recursive=recursive ) + @api_telemetry def apply_udf( self, udf_location: str, @@ -1831,6 +1836,7 @@ def query( return_code=proc.returncode, ) + @api_telemetry def cp( self, sources: list[str], @@ -1938,6 +1944,7 @@ def du_dirs(src, node, subdepth): for src in sources: yield from du_dirs(src, src.node, depth) + @api_telemetry def find( self, sources, @@ -2023,5 +2030,6 @@ def index( only_index=True, ) + @api_telemetry def find_stale_storages(self) -> None: self.metastore.find_stale_storages() diff --git a/src/datachain/cli.py b/src/datachain/cli.py index 406efffa9..d6288cc25 100644 --- a/src/datachain/cli.py +++ b/src/datachain/cli.py @@ -15,6 +15,7 @@ from datachain import utils from datachain.cli_utils import BooleanOptionalAction, CommaSeparatedArgs, KeyValueArgs from datachain.lib.dc import DataChain +from datachain.telemetry import api_telemetry, telemetry, track_cli_telemetry from datachain.utils import DataChainDir if TYPE_CHECKING: @@ -691,6 +692,7 @@ def ls_remote( first = False +@api_telemetry def ls( sources, long: bool = False, @@ -703,6 +705,7 @@ def ls( config = get_remote_config(read_config(DataChainDir.find().root), remote=remote) remote_type = config["type"] + telemetry.log_param("remote_type", remote_type) if remote_type == "local": ls_local(sources, long=long, **kwargs) else: @@ -715,12 +718,14 @@ def ls( ) +@api_telemetry def ls_datasets(catalog: "Catalog"): for d in catalog.ls_datasets(): for v in d.versions: print(f"{d.name} (v{v.version})") +@api_telemetry def rm_dataset( catalog: "Catalog", name: str, @@ -730,6 +735,7 @@ def rm_dataset( catalog.remove_dataset(name, version=version, force=force) +@api_telemetry def dataset_stats( catalog: "Catalog", name: str, @@ -747,6 +753,7 @@ def dataset_stats( print(f"Total objects size: {utils.sizeof_fmt(stats.size, si=si): >7}") +@api_telemetry def du(catalog: "Catalog", sources, show_bytes=False, si=False, **kwargs): for path, size in catalog.du(sources, **kwargs): if show_bytes: @@ -755,6 +762,7 @@ def du(catalog: "Catalog", sources, show_bytes=False, si=False, **kwargs): print(f"{utils.sizeof_fmt(size, si=si): >7} {path}") +@api_telemetry def index( catalog: "Catalog", sources, @@ -763,6 +771,7 @@ def index( catalog.index(sources, **kwargs) +@api_telemetry def show( catalog: "Catalog", name: str, @@ -794,6 +803,7 @@ def show( dc.print_schema() +@api_telemetry def query( catalog: "Catalog", script: str, @@ -841,10 +851,12 @@ def query( catalog.metastore.set_job_status(job_id, JobStatus.COMPLETE) +@api_telemetry def clear_cache(catalog: "Catalog"): catalog.cache.clear() +@api_telemetry def garbage_collect(catalog: "Catalog"): temp_tables = catalog.get_temp_table_names() if not temp_tables: @@ -895,139 +907,140 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09 try: catalog = get_catalog(client_config=client_config) - if args.command == "cp": - catalog.cp( - args.sources, - args.output, - force=bool(args.force), - update=bool(args.update), - recursive=bool(args.recursive), - edatachain_file=None, - edatachain_only=False, - no_edatachain_file=True, - no_glob=args.no_glob, - ttl=args.ttl, - ) - elif args.command == "clone": - catalog.clone( - args.sources, - args.output, - force=bool(args.force), - update=bool(args.update), - recursive=bool(args.recursive), - no_glob=args.no_glob, - ttl=args.ttl, - no_cp=args.no_cp, - edatachain=args.edatachain, - edatachain_file=args.edatachain_file, - ) - elif args.command == "pull": - catalog.pull_dataset( - args.dataset, - args.output, - no_cp=args.no_cp, - force=bool(args.force), - edatachain=args.edatachain, - edatachain_file=args.edatachain_file, - ) - elif args.command == "edit-dataset": - catalog.edit_dataset( - args.name, - description=args.description, - new_name=args.new_name, - labels=args.labels, - ) - elif args.command == "ls": - ls( - args.sources, - long=bool(args.long), - remote=args.remote, - ttl=args.ttl, - update=bool(args.update), - client_config=client_config, - ) - elif args.command == "ls-datasets": - ls_datasets(catalog) - elif args.command == "show": - show( - catalog, - args.name, - args.version, - limit=args.limit, - offset=args.offset, - columns=args.columns, - no_collapse=args.no_collapse, - schema=args.schema, - ) - elif args.command == "rm-dataset": - rm_dataset(catalog, args.name, version=args.version, force=args.force) - elif args.command == "dataset-stats": - dataset_stats( - catalog, - args.name, - args.version, - show_bytes=args.bytes, - si=args.si, - ) - elif args.command == "du": - du( - catalog, - args.sources, - show_bytes=args.bytes, - depth=args.depth, - si=args.si, - ttl=args.ttl, - update=bool(args.update), - client_config=client_config, - ) - elif args.command == "find": - results_found = False - for result in catalog.find( - args.sources, - ttl=args.ttl, - update=bool(args.update), - names=args.name, - inames=args.iname, - paths=args.path, - ipaths=args.ipath, - size=args.size, - typ=args.type, - columns=args.columns, - ): - print(result) - results_found = True - if not results_found: - print("No results") - elif args.command == "index": - index( - catalog, - args.sources, - ttl=args.ttl, - update=bool(args.update), - ) - elif args.command == "completion": - print(completion(args.shell)) - elif args.command == "find-stale-storages": - catalog.find_stale_storages() - elif args.command == "query": - query( - catalog, - args.script, - parallel=args.parallel, - params=args.param, - ) - elif args.command == "apply-udf": - catalog.apply_udf( - args.udf, args.source, args.target, args.parallel, args.udf_params - ) - elif args.command == "clear-cache": - clear_cache(catalog) - elif args.command == "gc": - garbage_collect(catalog) - else: - print(f"invalid command: {args.command}", file=sys.stderr) - return 1 - return 0 + with track_cli_telemetry(args.command): + if args.command == "cp": + catalog.cp( + args.sources, + args.output, + force=bool(args.force), + update=bool(args.update), + recursive=bool(args.recursive), + edatachain_file=None, + edatachain_only=False, + no_edatachain_file=True, + no_glob=args.no_glob, + ttl=args.ttl, + ) + elif args.command == "clone": + catalog.clone( + args.sources, + args.output, + force=bool(args.force), + update=bool(args.update), + recursive=bool(args.recursive), + no_glob=args.no_glob, + ttl=args.ttl, + no_cp=args.no_cp, + edatachain=args.edatachain, + edatachain_file=args.edatachain_file, + ) + elif args.command == "pull": + catalog.pull_dataset( + args.dataset, + args.output, + no_cp=args.no_cp, + force=bool(args.force), + edatachain=args.edatachain, + edatachain_file=args.edatachain_file, + ) + elif args.command == "edit-dataset": + catalog.edit_dataset( + args.name, + description=args.description, + new_name=args.new_name, + labels=args.labels, + ) + elif args.command == "ls": + ls( + args.sources, + long=bool(args.long), + remote=args.remote, + ttl=args.ttl, + update=bool(args.update), + client_config=client_config, + ) + elif args.command == "ls-datasets": + ls_datasets(catalog) + elif args.command == "show": + show( + catalog, + args.name, + args.version, + limit=args.limit, + offset=args.offset, + columns=args.columns, + no_collapse=args.no_collapse, + schema=args.schema, + ) + elif args.command == "rm-dataset": + rm_dataset(catalog, args.name, version=args.version, force=args.force) + elif args.command == "dataset-stats": + dataset_stats( + catalog, + args.name, + args.version, + show_bytes=args.bytes, + si=args.si, + ) + elif args.command == "du": + du( + catalog, + args.sources, + show_bytes=args.bytes, + depth=args.depth, + si=args.si, + ttl=args.ttl, + update=bool(args.update), + client_config=client_config, + ) + elif args.command == "find": + results_found = False + for result in catalog.find( + args.sources, + ttl=args.ttl, + update=bool(args.update), + names=args.name, + inames=args.iname, + paths=args.path, + ipaths=args.ipath, + size=args.size, + typ=args.type, + columns=args.columns, + ): + print(result) + results_found = True + if not results_found: + print("No results") + elif args.command == "index": + index( + catalog, + args.sources, + ttl=args.ttl, + update=bool(args.update), + ) + elif args.command == "completion": + print(completion(args.shell)) + elif args.command == "find-stale-storages": + catalog.find_stale_storages() + elif args.command == "query": + query( + catalog, + args.script, + parallel=args.parallel, + params=args.param, + ) + elif args.command == "apply-udf": + catalog.apply_udf( + args.udf, args.source, args.target, args.parallel, args.udf_params + ) + elif args.command == "clear-cache": + clear_cache(catalog) + elif args.command == "gc": + garbage_collect(catalog) + else: + print(f"invalid command: {args.command}", file=sys.stderr) + return 1 + return 0 except BrokenPipeError: # Python flushes standard streams on exit; redirect remaining output # to devnull to avoid another BrokenPipeError at shutdown diff --git a/src/datachain/telemetry.py b/src/datachain/telemetry.py index 0a6ac7739..b15a40f5a 100644 --- a/src/datachain/telemetry.py +++ b/src/datachain/telemetry.py @@ -1,3 +1,4 @@ +import contextlib import logging from functools import wraps from importlib.metadata import PackageNotFoundError, version @@ -46,6 +47,18 @@ def is_enabled(): _pass_params = True +@contextlib.contextmanager +def track_cli_telemetry(command: str): + global _pass_params # noqa: PLW0603 + pass_params = _pass_params + try: + telemetry.log_param("cli_command", command) + _pass_params = True + yield + finally: + _pass_params = pass_params + + def api_telemetry(f): """ Decorator to add telemetry logging to API functions. diff --git a/tests/conftest.py b/tests/conftest.py index 2d052048b..f847d629b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -33,6 +33,11 @@ collect_ignore = ["setup.py"] +@pytest.fixture(scope="session", autouse=True) +def add_test_env(): + os.environ["DATACHAIN_TEST"] = "true" + + @pytest.fixture(autouse=True) def virtual_memory(mocker): class VirtualMemory(NamedTuple): diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 000000000..837b4e8b0 --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,24 @@ +import os + +from datachain.lib.dc import DataChain +from datachain.telemetry import is_enabled + + +def test_is_enabled(): + assert not is_enabled() + + +def test_telemetry_api_call(mocker, tmp_dir): + patch_send = mocker.patch("iterative_telemetry.IterativeTelemetryLogger.send") + os.environ["DATACHAIN_TEST"] = "true" + + DataChain.from_storage(tmp_dir.as_uri()) + assert patch_send.call_count == 1 + assert patch_send.call_args_list[0].args == ( + { + "interface": "api", + "action": "from_storage", + "error": None, + "extra": {"client": "file://"}, + }, + ) From a3e8e4593dd91c59a857ad8e0803017b37ae580d Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Mon, 9 Sep 2024 20:11:19 +0545 Subject: [PATCH 03/11] Env --- tests/test_telemetry.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index 837b4e8b0..bf8de4083 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -1,5 +1,3 @@ -import os - from datachain.lib.dc import DataChain from datachain.telemetry import is_enabled @@ -10,7 +8,6 @@ def test_is_enabled(): def test_telemetry_api_call(mocker, tmp_dir): patch_send = mocker.patch("iterative_telemetry.IterativeTelemetryLogger.send") - os.environ["DATACHAIN_TEST"] = "true" DataChain.from_storage(tmp_dir.as_uri()) assert patch_send.call_count == 1 From 17a48734374cccf19a94e264254282b722575e28 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Tue, 10 Sep 2024 21:23:22 +0545 Subject: [PATCH 04/11] Try new alternative --- src/datachain/catalog/catalog.py | 8 - src/datachain/cli.py | 286 +++++++++++++++---------------- src/datachain/lib/data_model.py | 2 - src/datachain/lib/dc.py | 26 +-- src/datachain/lib/udf.py | 3 - src/datachain/telemetry.py | 61 +------ tests/test_telemetry.py | 14 +- 7 files changed, 155 insertions(+), 245 deletions(-) diff --git a/src/datachain/catalog/catalog.py b/src/datachain/catalog/catalog.py index 2375855f5..2dbf74b68 100644 --- a/src/datachain/catalog/catalog.py +++ b/src/datachain/catalog/catalog.py @@ -64,7 +64,6 @@ from datachain.remote.studio import StudioClient from datachain.sql.types import JSON, Boolean, DateTime, Int64, SQLType, String from datachain.storage import Storage, StorageStatus, StorageURI -from datachain.telemetry import api_telemetry from datachain.utils import ( DataChainDir, batched, @@ -1416,7 +1415,6 @@ def remove_dataset( version, ) - @api_telemetry def edit_dataset( self, name: str, @@ -1526,7 +1524,6 @@ def ls( for source in data_sources: # type: ignore [union-attr] yield source, source.ls(fields) - @api_telemetry def pull_dataset( self, dataset_uri: str, @@ -1684,7 +1681,6 @@ def _instantiate_dataset(): _instantiate_dataset() - @api_telemetry def clone( self, sources: list[str], @@ -1735,7 +1731,6 @@ def clone( output, sources, client_config=client_config, recursive=recursive ) - @api_telemetry def apply_udf( self, udf_location: str, @@ -1836,7 +1831,6 @@ def query( return_code=proc.returncode, ) - @api_telemetry def cp( self, sources: list[str], @@ -1944,7 +1938,6 @@ def du_dirs(src, node, subdepth): for src in sources: yield from du_dirs(src, src.node, depth) - @api_telemetry def find( self, sources, @@ -2030,6 +2023,5 @@ def index( only_index=True, ) - @api_telemetry def find_stale_storages(self) -> None: self.metastore.find_stale_storages() diff --git a/src/datachain/cli.py b/src/datachain/cli.py index d6288cc25..e5f5638a7 100644 --- a/src/datachain/cli.py +++ b/src/datachain/cli.py @@ -15,7 +15,7 @@ from datachain import utils from datachain.cli_utils import BooleanOptionalAction, CommaSeparatedArgs, KeyValueArgs from datachain.lib.dc import DataChain -from datachain.telemetry import api_telemetry, telemetry, track_cli_telemetry +from datachain.telemetry import telemetry from datachain.utils import DataChainDir if TYPE_CHECKING: @@ -692,7 +692,6 @@ def ls_remote( first = False -@api_telemetry def ls( sources, long: bool = False, @@ -718,14 +717,12 @@ def ls( ) -@api_telemetry def ls_datasets(catalog: "Catalog"): for d in catalog.ls_datasets(): for v in d.versions: print(f"{d.name} (v{v.version})") -@api_telemetry def rm_dataset( catalog: "Catalog", name: str, @@ -735,7 +732,6 @@ def rm_dataset( catalog.remove_dataset(name, version=version, force=force) -@api_telemetry def dataset_stats( catalog: "Catalog", name: str, @@ -753,7 +749,6 @@ def dataset_stats( print(f"Total objects size: {utils.sizeof_fmt(stats.size, si=si): >7}") -@api_telemetry def du(catalog: "Catalog", sources, show_bytes=False, si=False, **kwargs): for path, size in catalog.du(sources, **kwargs): if show_bytes: @@ -762,7 +757,6 @@ def du(catalog: "Catalog", sources, show_bytes=False, si=False, **kwargs): print(f"{utils.sizeof_fmt(size, si=si): >7} {path}") -@api_telemetry def index( catalog: "Catalog", sources, @@ -771,7 +765,6 @@ def index( catalog.index(sources, **kwargs) -@api_telemetry def show( catalog: "Catalog", name: str, @@ -803,7 +796,6 @@ def show( dc.print_schema() -@api_telemetry def query( catalog: "Catalog", script: str, @@ -851,12 +843,10 @@ def query( catalog.metastore.set_job_status(job_id, JobStatus.COMPLETE) -@api_telemetry def clear_cache(catalog: "Catalog"): catalog.cache.clear() -@api_telemetry def garbage_collect(catalog: "Catalog"): temp_tables = catalog.get_temp_table_names() if not temp_tables: @@ -905,150 +895,152 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09 # This also sets this environment variable for any subprocesses os.environ["DEBUG_SHOW_SQL_QUERIES"] = "True" + error = None try: catalog = get_catalog(client_config=client_config) - with track_cli_telemetry(args.command): - if args.command == "cp": - catalog.cp( - args.sources, - args.output, - force=bool(args.force), - update=bool(args.update), - recursive=bool(args.recursive), - edatachain_file=None, - edatachain_only=False, - no_edatachain_file=True, - no_glob=args.no_glob, - ttl=args.ttl, - ) - elif args.command == "clone": - catalog.clone( - args.sources, - args.output, - force=bool(args.force), - update=bool(args.update), - recursive=bool(args.recursive), - no_glob=args.no_glob, - ttl=args.ttl, - no_cp=args.no_cp, - edatachain=args.edatachain, - edatachain_file=args.edatachain_file, - ) - elif args.command == "pull": - catalog.pull_dataset( - args.dataset, - args.output, - no_cp=args.no_cp, - force=bool(args.force), - edatachain=args.edatachain, - edatachain_file=args.edatachain_file, - ) - elif args.command == "edit-dataset": - catalog.edit_dataset( - args.name, - description=args.description, - new_name=args.new_name, - labels=args.labels, - ) - elif args.command == "ls": - ls( - args.sources, - long=bool(args.long), - remote=args.remote, - ttl=args.ttl, - update=bool(args.update), - client_config=client_config, - ) - elif args.command == "ls-datasets": - ls_datasets(catalog) - elif args.command == "show": - show( - catalog, - args.name, - args.version, - limit=args.limit, - offset=args.offset, - columns=args.columns, - no_collapse=args.no_collapse, - schema=args.schema, - ) - elif args.command == "rm-dataset": - rm_dataset(catalog, args.name, version=args.version, force=args.force) - elif args.command == "dataset-stats": - dataset_stats( - catalog, - args.name, - args.version, - show_bytes=args.bytes, - si=args.si, - ) - elif args.command == "du": - du( - catalog, - args.sources, - show_bytes=args.bytes, - depth=args.depth, - si=args.si, - ttl=args.ttl, - update=bool(args.update), - client_config=client_config, - ) - elif args.command == "find": - results_found = False - for result in catalog.find( - args.sources, - ttl=args.ttl, - update=bool(args.update), - names=args.name, - inames=args.iname, - paths=args.path, - ipaths=args.ipath, - size=args.size, - typ=args.type, - columns=args.columns, - ): - print(result) - results_found = True - if not results_found: - print("No results") - elif args.command == "index": - index( - catalog, - args.sources, - ttl=args.ttl, - update=bool(args.update), - ) - elif args.command == "completion": - print(completion(args.shell)) - elif args.command == "find-stale-storages": - catalog.find_stale_storages() - elif args.command == "query": - query( - catalog, - args.script, - parallel=args.parallel, - params=args.param, - ) - elif args.command == "apply-udf": - catalog.apply_udf( - args.udf, args.source, args.target, args.parallel, args.udf_params - ) - elif args.command == "clear-cache": - clear_cache(catalog) - elif args.command == "gc": - garbage_collect(catalog) - else: - print(f"invalid command: {args.command}", file=sys.stderr) - return 1 - return 0 - except BrokenPipeError: + if args.command == "cp": + catalog.cp( + args.sources, + args.output, + force=bool(args.force), + update=bool(args.update), + recursive=bool(args.recursive), + edatachain_file=None, + edatachain_only=False, + no_edatachain_file=True, + no_glob=args.no_glob, + ttl=args.ttl, + ) + elif args.command == "clone": + catalog.clone( + args.sources, + args.output, + force=bool(args.force), + update=bool(args.update), + recursive=bool(args.recursive), + no_glob=args.no_glob, + ttl=args.ttl, + no_cp=args.no_cp, + edatachain=args.edatachain, + edatachain_file=args.edatachain_file, + ) + elif args.command == "pull": + catalog.pull_dataset( + args.dataset, + args.output, + no_cp=args.no_cp, + force=bool(args.force), + edatachain=args.edatachain, + edatachain_file=args.edatachain_file, + ) + elif args.command == "edit-dataset": + catalog.edit_dataset( + args.name, + description=args.description, + new_name=args.new_name, + labels=args.labels, + ) + elif args.command == "ls": + ls( + args.sources, + long=bool(args.long), + remote=args.remote, + ttl=args.ttl, + update=bool(args.update), + client_config=client_config, + ) + elif args.command == "ls-datasets": + ls_datasets(catalog) + elif args.command == "show": + show( + catalog, + args.name, + args.version, + limit=args.limit, + offset=args.offset, + columns=args.columns, + no_collapse=args.no_collapse, + schema=args.schema, + ) + elif args.command == "rm-dataset": + rm_dataset(catalog, args.name, version=args.version, force=args.force) + elif args.command == "dataset-stats": + dataset_stats( + catalog, + args.name, + args.version, + show_bytes=args.bytes, + si=args.si, + ) + elif args.command == "du": + du( + catalog, + args.sources, + show_bytes=args.bytes, + depth=args.depth, + si=args.si, + ttl=args.ttl, + update=bool(args.update), + client_config=client_config, + ) + elif args.command == "find": + results_found = False + for result in catalog.find( + args.sources, + ttl=args.ttl, + update=bool(args.update), + names=args.name, + inames=args.iname, + paths=args.path, + ipaths=args.ipath, + size=args.size, + typ=args.type, + columns=args.columns, + ): + print(result) + results_found = True + if not results_found: + print("No results") + elif args.command == "index": + index( + catalog, + args.sources, + ttl=args.ttl, + update=bool(args.update), + ) + elif args.command == "completion": + print(completion(args.shell)) + elif args.command == "find-stale-storages": + catalog.find_stale_storages() + elif args.command == "query": + query( + catalog, + args.script, + parallel=args.parallel, + params=args.param, + ) + elif args.command == "apply-udf": + catalog.apply_udf( + args.udf, args.source, args.target, args.parallel, args.udf_params + ) + elif args.command == "clear-cache": + clear_cache(catalog) + elif args.command == "gc": + garbage_collect(catalog) + else: + print(f"invalid command: {args.command}", file=sys.stderr) + return 1 + return 0 + except BrokenPipeError as exc: # Python flushes standard streams on exit; redirect remaining output # to devnull to avoid another BrokenPipeError at shutdown # See: https://docs.python.org/3/library/signal.html#note-on-sigpipe + error = str(exc) devnull = os.open(os.devnull, os.O_WRONLY) os.dup2(devnull, sys.stdout.fileno()) return 141 # 128 + 13 (SIGPIPE) except (KeyboardInterrupt, Exception) as exc: + error = str(exc) if isinstance(exc, KeyboardInterrupt): msg = "Operation cancelled by the user" else: @@ -1066,3 +1058,5 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09 pdb.post_mortem() return 1 + finally: + telemetry.send_cli_call(args.command, error=error) diff --git a/src/datachain/lib/data_model.py b/src/datachain/lib/data_model.py index 46b062f2b..09a92cc60 100644 --- a/src/datachain/lib/data_model.py +++ b/src/datachain/lib/data_model.py @@ -5,7 +5,6 @@ from pydantic import BaseModel, create_model from datachain.lib.model_store import ModelStore -from datachain.telemetry import api_telemetry StandardType = Union[ type[int], @@ -33,7 +32,6 @@ def __pydantic_init_subclass__(cls): ModelStore.register(cls) @staticmethod - @api_telemetry def register(models: Union[DataType, Sequence[DataType]]): """For registering classes manually. It accepts a single class or a sequence of classes.""" diff --git a/src/datachain/lib/dc.py b/src/datachain/lib/dc.py index baadcdc71..2b941b1ec 100644 --- a/src/datachain/lib/dc.py +++ b/src/datachain/lib/dc.py @@ -58,7 +58,7 @@ ) from datachain.query.schema import DEFAULT_DELIMITER, Column, DatasetRow from datachain.sql.functions import path as pathfunc -from datachain.telemetry import api_telemetry +from datachain.telemetry import send_telemetry_once from datachain.utils import inside_notebook if TYPE_CHECKING: @@ -247,6 +247,8 @@ def __init__(self, *args, settings: Optional[dict] = None, **kwargs): **kwargs, indexing_column_types=File._datachain_column_types, ) + send_telemetry_once("datachain_init", **kwargs) + if settings: self._settings = Settings(**settings) else: @@ -349,7 +351,6 @@ def add_schema(self, signals_schema: SignalSchema) -> "Self": # noqa: D102 return self @classmethod - @api_telemetry def from_storage( cls, uri, @@ -426,7 +427,6 @@ def from_storage( return ls(dc, list_path, recursive=recursive, object_name=object_name) @classmethod - @api_telemetry def from_dataset( cls, name: str, @@ -448,7 +448,6 @@ def from_dataset( return DataChain(name=name, version=version, session=session, settings=settings) @classmethod - @api_telemetry def from_json( cls, path, @@ -514,7 +513,6 @@ def jmespath_to_name(s: str): return chain.gen(**signal_dict) # type: ignore[misc, arg-type] @classmethod - @api_telemetry def from_jsonl( cls, path, @@ -575,7 +573,6 @@ def jmespath_to_name(s: str): return chain.gen(**signal_dict) # type: ignore[misc, arg-type] @classmethod - @api_telemetry def datasets( cls, session: Optional[Session] = None, @@ -690,7 +687,6 @@ def print_jsonl_schema( # type: ignore[override] output=str, ) - @api_telemetry def save( # type: ignore[override] self, name: Optional[str] = None, version: Optional[int] = None, **kwargs ) -> "Self": @@ -704,7 +700,6 @@ def save( # type: ignore[override] schema = self.signals_schema.clone_without_sys_signals().serialize() return super().save(name=name, version=version, feature_schema=schema, **kwargs) - @api_telemetry def apply(self, func, *args, **kwargs): """Apply any function to the chain. @@ -727,7 +722,6 @@ def parse_stem(chain): """ return func(self, *args, **kwargs) - @api_telemetry def map( self, func: Optional[Callable] = None, @@ -777,7 +771,6 @@ def map( return chain.add_schema(udf_obj.output).reset_settings(self._settings) - @api_telemetry def gen( self, func: Optional[Callable] = None, @@ -814,7 +807,6 @@ def gen( return chain.reset_schema(udf_obj.output).reset_settings(self._settings) - @api_telemetry def agg( self, func: Optional[Callable] = None, @@ -856,7 +848,6 @@ def agg( return chain.reset_schema(udf_obj.output).reset_settings(self._settings) - @api_telemetry def batch_map( self, func: Optional[Callable] = None, @@ -972,7 +963,6 @@ def select_except(self, *args: str) -> "Self": chain.signals_schema = new_schema return chain - @api_telemetry @detach def mutate(self, **kwargs) -> "Self": """Create new signals based on existing signals. @@ -1102,7 +1092,6 @@ def collect(self, col: str) -> Iterator[DataType]: ... # type: ignore[overload- @overload def collect(self, *cols: str) -> Iterator[tuple[DataType, ...]]: ... - @api_telemetry def collect(self, *cols: str) -> Iterator[Union[DataType, tuple[DataType, ...]]]: # type: ignore[overload-overlap,misc] """Yields rows of values, optionally limited to the specified columns. @@ -1185,7 +1174,6 @@ def remove_file_signals(self) -> "Self": # noqa: D102 schema = self.signals_schema.clone_without_file_signals() return self.select(*schema.values.keys()) - @api_telemetry @detach def merge( self, @@ -1356,7 +1344,6 @@ def subtract( # type: ignore[override] return super()._subtract(other, signals) # type: ignore[arg-type] @classmethod - @api_telemetry def from_values( cls, ds_name: str = "", @@ -1390,7 +1377,6 @@ def _func_fr() -> Iterator[tuple_type]: # type: ignore[valid-type] return chain.gen(_func_fr, output=output) @classmethod - @api_telemetry def from_pandas( # type: ignore[override] cls, df: "pd.DataFrame", @@ -1434,7 +1420,6 @@ def from_pandas( # type: ignore[override] **fr_map, ) - @api_telemetry def to_pandas(self, flatten=False) -> "pd.DataFrame": """Return a pandas DataFrame from the chain. @@ -1449,7 +1434,6 @@ def to_pandas(self, flatten=False) -> "pd.DataFrame": return pd.DataFrame.from_records(self.results(), columns=columns) - @api_telemetry def show( self, limit: int = 20, @@ -1504,7 +1488,6 @@ def show( print(f"\n[Limited by {len(df)} rows]") @classmethod - @api_telemetry def from_hf( cls, dataset: Union[str, "HFDatasetType"], @@ -1636,7 +1619,6 @@ def parse_tabular( ) @classmethod - @api_telemetry def from_csv( cls, path, @@ -1724,7 +1706,6 @@ def from_csv( ) @classmethod - @api_telemetry def from_parquet( cls, path, @@ -1793,7 +1774,6 @@ def to_parquet( ) @classmethod - @api_telemetry def from_records( cls, to_insert: Optional[Union[dict, list[dict]]], diff --git a/src/datachain/lib/udf.py b/src/datachain/lib/udf.py index e30daaedc..46a13b218 100644 --- a/src/datachain/lib/udf.py +++ b/src/datachain/lib/udf.py @@ -17,7 +17,6 @@ from datachain.query.schema import ColumnParameter from datachain.query.udf import UDFBase as _UDFBase from datachain.query.udf import UDFProperties -from datachain.telemetry import api_telemetry if TYPE_CHECKING: from collections.abc import Iterable, Iterator, Sequence @@ -43,7 +42,6 @@ def __init__( self.inner = inner super().__init__(properties) - @api_telemetry def run( self, udf_fields: "Sequence[str]", @@ -71,7 +69,6 @@ def run( if hasattr(self.inner, "teardown") and callable(self.inner.teardown): self.inner.teardown() - @api_telemetry def run_once( self, catalog: "Catalog", diff --git a/src/datachain/telemetry.py b/src/datachain/telemetry.py index b15a40f5a..8977367bf 100644 --- a/src/datachain/telemetry.py +++ b/src/datachain/telemetry.py @@ -1,6 +1,4 @@ -import contextlib import logging -from functools import wraps from importlib.metadata import PackageNotFoundError, version from iterative_telemetry import IterativeTelemetryLogger @@ -43,59 +41,12 @@ def is_enabled(): # Initialize telemetry logger telemetry = IterativeTelemetryLogger("datachain", __version__, is_enabled) -_is_api_running = False -_pass_params = True +_telemetry_sent = False -@contextlib.contextmanager -def track_cli_telemetry(command: str): - global _pass_params # noqa: PLW0603 - pass_params = _pass_params - try: - telemetry.log_param("cli_command", command) - _pass_params = True - yield - finally: - _pass_params = pass_params - -def api_telemetry(f): - """ - Decorator to add telemetry logging to API functions. - """ - - @wraps(f) - def wrapper(*args, **kwargs): - # Use global variables to track state - global _is_api_running, _pass_params # noqa: PLW0603 - # Check if an API call is already running - is_nested = _is_api_running - # Check if parameters should be passed to telemetry - pass_params = _pass_params - # Reset global variables - _pass_params = False - _is_api_running = True - try: - # Start telemetry event scope - with telemetry.event_scope("api", f.__name__) as event: - try: - return f(*args, **kwargs) - except Exception as exc: - event.error = exc.__class__.__name__ - raise - finally: - if not is_nested: - # Send telemetry event if not nested - telemetry.send_event( - event.interface, event.action, event.error, **event.kwargs - ) - - finally: - if pass_params: - # Log parameters if pass_params flag is set - for key, value in event.kwargs.items(): - telemetry.log_param(key, value) - _is_api_running = is_nested # Restore previous API running state - _pass_params = pass_params # Restore previous pass_params state - - return wrapper +def send_telemetry_once(action: str, **kwargs): + global _telemetry_sent # noqa: PLW0603 + if not _telemetry_sent: + telemetry.send_event("api", action, **kwargs) + _telemetry_sent = True diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index bf8de4083..42db0ac72 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -11,11 +11,9 @@ def test_telemetry_api_call(mocker, tmp_dir): DataChain.from_storage(tmp_dir.as_uri()) assert patch_send.call_count == 1 - assert patch_send.call_args_list[0].args == ( - { - "interface": "api", - "action": "from_storage", - "error": None, - "extra": {"client": "file://"}, - }, - ) + args = patch_send.call_args_list[0].args[0] + extra = args.pop("extra") + + assert args == {"interface": "api", "action": "datachain_init", "error": None} + + assert "name" in extra From 606e164de8697ce109475b95f0b046e4ae4c5638 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Tue, 10 Sep 2024 21:24:59 +0545 Subject: [PATCH 05/11] Remove unused log param --- src/datachain/cli.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/datachain/cli.py b/src/datachain/cli.py index e5f5638a7..f8124cf3d 100644 --- a/src/datachain/cli.py +++ b/src/datachain/cli.py @@ -704,7 +704,6 @@ def ls( config = get_remote_config(read_config(DataChainDir.find().root), remote=remote) remote_type = config["type"] - telemetry.log_param("remote_type", remote_type) if remote_type == "local": ls_local(sources, long=long, **kwargs) else: From c5649162959973f454bc1fc8e3454d5e41ed4bae Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 11 Sep 2024 00:13:10 +0545 Subject: [PATCH 06/11] Reset telemetry sent --- tests/test_telemetry.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index 42db0ac72..f6a6e7815 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -1,13 +1,14 @@ +from datachain import telemetry from datachain.lib.dc import DataChain -from datachain.telemetry import is_enabled def test_is_enabled(): - assert not is_enabled() + assert not telemetry.is_enabled() def test_telemetry_api_call(mocker, tmp_dir): patch_send = mocker.patch("iterative_telemetry.IterativeTelemetryLogger.send") + telemetry._telemetry_sent = False DataChain.from_storage(tmp_dir.as_uri()) assert patch_send.call_count == 1 From a46f300b47b4e809944df75f146ad342f3f18829 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 18 Sep 2024 06:22:30 +0545 Subject: [PATCH 07/11] Address PR comments --- pyproject.toml | 2 +- src/datachain/lib/dc.py | 5 +++-- src/datachain/telemetry.py | 18 +----------------- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4daf6b77d..cf5fe605e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ dependencies = [ "msgpack>=1.0.4,<2", "psutil", "huggingface_hub", - "iterative-telemetry>=0.0.8" + "iterative-telemetry>=0.0.9" ] [project.optional-dependencies] diff --git a/src/datachain/lib/dc.py b/src/datachain/lib/dc.py index 2b941b1ec..fb4dc982b 100644 --- a/src/datachain/lib/dc.py +++ b/src/datachain/lib/dc.py @@ -58,7 +58,7 @@ ) from datachain.query.schema import DEFAULT_DELIMITER, Column, DatasetRow from datachain.sql.functions import path as pathfunc -from datachain.telemetry import send_telemetry_once +from datachain.telemetry import telemetry from datachain.utils import inside_notebook if TYPE_CHECKING: @@ -247,7 +247,8 @@ def __init__(self, *args, settings: Optional[dict] = None, **kwargs): **kwargs, indexing_column_types=File._datachain_column_types, ) - send_telemetry_once("datachain_init", **kwargs) + + telemetry.send_event_once("class", "datachain_init", **kwargs) if settings: self._settings = Settings(**settings) diff --git a/src/datachain/telemetry.py b/src/datachain/telemetry.py index 8977367bf..9a5018703 100644 --- a/src/datachain/telemetry.py +++ b/src/datachain/telemetry.py @@ -3,8 +3,7 @@ from iterative_telemetry import IterativeTelemetryLogger -from datachain.config import read_config -from datachain.utils import DataChainDir, env2bool +from datachain.utils import env2bool logger = logging.getLogger(__name__) @@ -23,12 +22,6 @@ def is_enabled(): logger.debug("Telemetry is disabled by environment variable.") return False - # Check if telemetry is disabled by configuration file - config = read_config(DataChainDir.find().root) - if config and config.get("core", {}).get("no_analytics", False): - logger.debug("Telemetry is disabled by configuration.") - return False - logger.debug("Telemetry is enabled.") return True @@ -41,12 +34,3 @@ def is_enabled(): # Initialize telemetry logger telemetry = IterativeTelemetryLogger("datachain", __version__, is_enabled) - -_telemetry_sent = False - - -def send_telemetry_once(action: str, **kwargs): - global _telemetry_sent # noqa: PLW0603 - if not _telemetry_sent: - telemetry.send_event("api", action, **kwargs) - _telemetry_sent = True From f8f3ea40eebf4151dd8a8d4a1e1fc8fd6bff02d4 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 18 Sep 2024 06:33:32 +0545 Subject: [PATCH 08/11] Fix tests --- tests/test_telemetry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index f6a6e7815..5d63731f9 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -1,5 +1,5 @@ -from datachain import telemetry from datachain.lib.dc import DataChain +from datachain.telemetry import telemetry def test_is_enabled(): @@ -8,7 +8,7 @@ def test_is_enabled(): def test_telemetry_api_call(mocker, tmp_dir): patch_send = mocker.patch("iterative_telemetry.IterativeTelemetryLogger.send") - telemetry._telemetry_sent = False + telemetry._event_sent = False DataChain.from_storage(tmp_dir.as_uri()) assert patch_send.call_count == 1 From 8387229d900cde85a58b08fc0c67e9293b4761e6 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 18 Sep 2024 06:46:26 +0545 Subject: [PATCH 09/11] Fix test --- tests/test_telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index 5d63731f9..f446481a0 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -15,6 +15,6 @@ def test_telemetry_api_call(mocker, tmp_dir): args = patch_send.call_args_list[0].args[0] extra = args.pop("extra") - assert args == {"interface": "api", "action": "datachain_init", "error": None} + assert args == {"interface": "class", "action": "datachain_init", "error": None} assert "name" in extra From f90c5afcf6998a512e0124f1a5aef2dede99170e Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 18 Sep 2024 07:03:44 +0545 Subject: [PATCH 10/11] Update telemetry.py Co-authored-by: skshetry <18718008+skshetry@users.noreply.github.com> --- src/datachain/telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datachain/telemetry.py b/src/datachain/telemetry.py index 9a5018703..0e0fe40bd 100644 --- a/src/datachain/telemetry.py +++ b/src/datachain/telemetry.py @@ -17,7 +17,7 @@ def is_enabled(): return False # Check if telemetry is disabled by environment variable - disabled = env2bool("DATACHAIN_NO_ANALYTICS") + disabled = bool(os.getenv("DATACHAIN_NO_ANALYTICS")) if disabled: logger.debug("Telemetry is disabled by environment variable.") return False From 846f90e46314682874b322c73f401cdeaf7e5622 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 18 Sep 2024 13:36:34 +0545 Subject: [PATCH 11/11] Add import --- src/datachain/telemetry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/datachain/telemetry.py b/src/datachain/telemetry.py index 0e0fe40bd..c61eb4c78 100644 --- a/src/datachain/telemetry.py +++ b/src/datachain/telemetry.py @@ -1,4 +1,5 @@ import logging +import os from importlib.metadata import PackageNotFoundError, version from iterative_telemetry import IterativeTelemetryLogger