Skip to content

Commit

Permalink
Introduce telemetry in datachain (#411)
Browse files Browse the repository at this point in the history
* Introduce telemetry in datachain

* Add test and implement telemetry to api call

* Env

* Try new alternative

* Remove unused log param

* Reset telemetry sent

* Address PR comments

* Fix tests

* Fix test

* Update telemetry.py

Co-authored-by: skshetry <18718008+skshetry@users.noreply.github.com>

* Add import

---------

Co-authored-by: skshetry <18718008+skshetry@users.noreply.github.com>
  • Loading branch information
amritghimire and skshetry authored Sep 18, 2024
1 parent 27ce790 commit 320360f
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ dependencies = [
"Pillow>=10.0.0,<11",
"msgpack>=1.0.4,<2",
"psutil",
"huggingface_hub"
"huggingface_hub",
"iterative-telemetry>=0.0.9"
]

[project.optional-dependencies]
Expand Down
8 changes: 7 additions & 1 deletion src/datachain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datachain import utils
from datachain.cli_utils import BooleanOptionalAction, CommaSeparatedArgs, KeyValueArgs
from datachain.lib.dc import DataChain
from datachain.telemetry import telemetry
from datachain.utils import DataChainDir

if TYPE_CHECKING:
Expand Down Expand Up @@ -872,6 +873,7 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09
# This also sets this environment variable for any subprocesses
os.environ["DEBUG_SHOW_SQL_QUERIES"] = "True"

error = None
try:
catalog = get_catalog(client_config=client_config)
if args.command == "cp":
Expand Down Expand Up @@ -1003,14 +1005,16 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09
print(f"invalid command: {args.command}", file=sys.stderr)
return 1
return 0
except BrokenPipeError:
except BrokenPipeError as exc:
# Python flushes standard streams on exit; redirect remaining output
# to devnull to avoid another BrokenPipeError at shutdown
# See: https://docs.python.org/3/library/signal.html#note-on-sigpipe
error = str(exc)
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())
return 141 # 128 + 13 (SIGPIPE)
except (KeyboardInterrupt, Exception) as exc:
error = str(exc)
if isinstance(exc, KeyboardInterrupt):
msg = "Operation cancelled by the user"
else:
Expand All @@ -1028,3 +1032,5 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09

pdb.post_mortem()
return 1
finally:
telemetry.send_cli_call(args.command, error=error)
4 changes: 4 additions & 0 deletions src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from datachain.query.schema import DEFAULT_DELIMITER, Column, DatasetRow
from datachain.sql.functions import path as pathfunc
from datachain.telemetry import telemetry
from datachain.utils import inside_notebook

if TYPE_CHECKING:
Expand Down Expand Up @@ -246,6 +247,9 @@ def __init__(self, *args, settings: Optional[dict] = None, **kwargs):
**kwargs,
indexing_column_types=File._datachain_column_types,
)

telemetry.send_event_once("class", "datachain_init", **kwargs)

if settings:
self._settings = Settings(**settings)
else:
Expand Down
2 changes: 2 additions & 0 deletions src/datachain/lib/listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datachain.lib.file import File
from datachain.query.schema import Column
from datachain.sql.functions import path as pathfunc
from datachain.telemetry import telemetry
from datachain.utils import uses_glob

if TYPE_CHECKING:
Expand Down Expand Up @@ -80,6 +81,7 @@ def parse_listing_uri(uri: str, cache, client_config) -> tuple[str, str, str]:
client_config = client_config or {}
client = Client.get_client(uri, cache, **client_config)
storage_uri, path = Client.parse_url(uri)
telemetry.log_param("client", client.PREFIX)

# clean path without globs
lst_uri_path = (
Expand Down
14 changes: 2 additions & 12 deletions src/datachain/progress.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
"""Manages progress bars."""

import logging
import os
import re
import sys
from threading import RLock
from typing import Any, ClassVar

from fsspec.callbacks import TqdmCallback
from tqdm import tqdm

from datachain.utils import env2bool

logger = logging.getLogger(__name__)
tqdm.set_lock(RLock())


def env2bool(var, undefined=False):
"""
undefined: return value if env var is unset
"""
var = os.getenv(var, None)
if var is None:
return undefined
return bool(re.search("1|y|yes|true", var, flags=re.IGNORECASE))


class Tqdm(tqdm):
"""
maximum-compatibility tqdm-based progressbars
Expand Down
37 changes: 37 additions & 0 deletions src/datachain/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
import os
from importlib.metadata import PackageNotFoundError, version

from iterative_telemetry import IterativeTelemetryLogger

from datachain.utils import env2bool

logger = logging.getLogger(__name__)


def is_enabled():
"""
Determine if telemetry is enabled based on environment variables and configuration.
"""
# Disable telemetry if running in test mode
if env2bool("DATACHAIN_TEST"):
return False

# Check if telemetry is disabled by environment variable
disabled = bool(os.getenv("DATACHAIN_NO_ANALYTICS"))
if disabled:
logger.debug("Telemetry is disabled by environment variable.")
return False

logger.debug("Telemetry is enabled.")
return True


# Try to get the version of the datachain package
try:
__version__ = version("datachain")
except PackageNotFoundError:
__version__ = "unknown"

# Initialize telemetry logger
telemetry = IterativeTelemetryLogger("datachain", __version__, is_enabled)
11 changes: 11 additions & 0 deletions src/datachain/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import os.path as osp
import random
import re
import stat
import sys
import time
Expand Down Expand Up @@ -410,3 +411,13 @@ def get_datachain_executable() -> list[str]:
def uses_glob(path: str) -> bool:
"""Checks if some URI path has glob syntax in it"""
return glob.has_magic(os.path.basename(os.path.normpath(path)))


def env2bool(var, undefined=False):
"""
undefined: return value if env var is unset
"""
var = os.getenv(var, None)
if var is None:
return undefined
return bool(re.search("1|y|yes|true", var, flags=re.IGNORECASE))
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
collect_ignore = ["setup.py"]


@pytest.fixture(scope="session", autouse=True)
def add_test_env():
os.environ["DATACHAIN_TEST"] = "true"


@pytest.fixture(autouse=True)
def virtual_memory(mocker):
class VirtualMemory(NamedTuple):
Expand Down
20 changes: 20 additions & 0 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from datachain.lib.dc import DataChain
from datachain.telemetry import telemetry


def test_is_enabled():
assert not telemetry.is_enabled()


def test_telemetry_api_call(mocker, tmp_dir):
patch_send = mocker.patch("iterative_telemetry.IterativeTelemetryLogger.send")
telemetry._event_sent = False

DataChain.from_storage(tmp_dir.as_uri())
assert patch_send.call_count == 1
args = patch_send.call_args_list[0].args[0]
extra = args.pop("extra")

assert args == {"interface": "class", "action": "datachain_init", "error": None}

assert "name" in extra

0 comments on commit 320360f

Please sign in to comment.