Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce telemetry in datachain #411

Merged
merged 11 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ dependencies = [
"Pillow>=10.0.0,<11",
"msgpack>=1.0.4,<2",
"psutil",
"huggingface_hub"
"huggingface_hub",
"iterative-telemetry>=0.0.9"
]

[project.optional-dependencies]
Expand Down
8 changes: 7 additions & 1 deletion src/datachain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datachain import utils
from datachain.cli_utils import BooleanOptionalAction, CommaSeparatedArgs, KeyValueArgs
from datachain.lib.dc import DataChain
from datachain.telemetry import telemetry
from datachain.utils import DataChainDir

if TYPE_CHECKING:
Expand Down Expand Up @@ -893,6 +894,7 @@
# This also sets this environment variable for any subprocesses
os.environ["DEBUG_SHOW_SQL_QUERIES"] = "True"

error = None
try:
catalog = get_catalog(client_config=client_config)
if args.command == "cp":
Expand Down Expand Up @@ -1028,14 +1030,16 @@
print(f"invalid command: {args.command}", file=sys.stderr)
return 1
return 0
except BrokenPipeError:
except BrokenPipeError as exc:

Check warning on line 1033 in src/datachain/cli.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/cli.py#L1033

Added line #L1033 was not covered by tests
# Python flushes standard streams on exit; redirect remaining output
# to devnull to avoid another BrokenPipeError at shutdown
# See: https://docs.python.org/3/library/signal.html#note-on-sigpipe
error = str(exc)
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())
return 141 # 128 + 13 (SIGPIPE)
except (KeyboardInterrupt, Exception) as exc:
error = str(exc)
if isinstance(exc, KeyboardInterrupt):
msg = "Operation cancelled by the user"
else:
Expand All @@ -1053,3 +1057,5 @@

pdb.post_mortem()
return 1
finally:
telemetry.send_cli_call(args.command, error=error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's drop it for now. CLI is not exposed/priority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it is not exposed, I believe it would be a good idea to get information if someone is using it IMO.

Copy link
Member

@skshetry skshetry Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is using it?

4 changes: 4 additions & 0 deletions src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from datachain.query.schema import DEFAULT_DELIMITER, Column, DatasetRow
from datachain.sql.functions import path as pathfunc
from datachain.telemetry import telemetry
from datachain.utils import inside_notebook

if TYPE_CHECKING:
Expand Down Expand Up @@ -246,6 +247,9 @@ def __init__(self, *args, settings: Optional[dict] = None, **kwargs):
**kwargs,
indexing_column_types=File._datachain_column_types,
)

telemetry.send_event_once("class", "datachain_init", **kwargs)

if settings:
self._settings = Settings(**settings)
else:
Expand Down
2 changes: 2 additions & 0 deletions src/datachain/lib/listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datachain.lib.file import File
from datachain.query.schema import Column
from datachain.sql.functions import path as pathfunc
from datachain.telemetry import telemetry
from datachain.utils import uses_glob

if TYPE_CHECKING:
Expand Down Expand Up @@ -79,6 +80,7 @@ def parse_listing_uri(uri: str, cache, client_config) -> tuple[str, str, str]:
"""
client = Client.get_client(uri, cache, **client_config)
storage_uri, path = Client.parse_url(uri)
telemetry.log_param("client", client.PREFIX)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have telemetry call inside a utility function?


# clean path without globs
lst_uri_path = (
Expand Down
14 changes: 2 additions & 12 deletions src/datachain/progress.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
"""Manages progress bars."""

import logging
import os
import re
import sys
from threading import RLock
from typing import Any, ClassVar

from fsspec.callbacks import TqdmCallback
from tqdm import tqdm

from datachain.utils import env2bool

logger = logging.getLogger(__name__)
tqdm.set_lock(RLock())


def env2bool(var, undefined=False):
"""
undefined: return value if env var is unset
"""
var = os.getenv(var, None)
if var is None:
return undefined
return bool(re.search("1|y|yes|true", var, flags=re.IGNORECASE))


class Tqdm(tqdm):
"""
maximum-compatibility tqdm-based progressbars
Expand Down
36 changes: 36 additions & 0 deletions src/datachain/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
from importlib.metadata import PackageNotFoundError, version

from iterative_telemetry import IterativeTelemetryLogger

from datachain.utils import env2bool

logger = logging.getLogger(__name__)


def is_enabled():
"""
Determine if telemetry is enabled based on environment variables and configuration.
"""
# Disable telemetry if running in test mode
if env2bool("DATACHAIN_TEST"):
return False

# Check if telemetry is disabled by environment variable
disabled = env2bool("DATACHAIN_NO_ANALYTICS")

Check warning on line 20 in src/datachain/telemetry.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/telemetry.py#L20

Added line #L20 was not covered by tests
amritghimire marked this conversation as resolved.
Show resolved Hide resolved
if disabled:
logger.debug("Telemetry is disabled by environment variable.")
return False

Check warning on line 23 in src/datachain/telemetry.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/telemetry.py#L22-L23

Added lines #L22 - L23 were not covered by tests

logger.debug("Telemetry is enabled.")
return True

Check warning on line 26 in src/datachain/telemetry.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/telemetry.py#L25-L26

Added lines #L25 - L26 were not covered by tests


# Try to get the version of the datachain package
try:
__version__ = version("datachain")
except PackageNotFoundError:
__version__ = "unknown"

Check warning on line 33 in src/datachain/telemetry.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/telemetry.py#L32-L33

Added lines #L32 - L33 were not covered by tests

# Initialize telemetry logger
telemetry = IterativeTelemetryLogger("datachain", __version__, is_enabled)
11 changes: 11 additions & 0 deletions src/datachain/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import os.path as osp
import random
import re
import stat
import sys
import time
Expand Down Expand Up @@ -450,3 +451,13 @@ def get_datachain_executable() -> list[str]:
def uses_glob(path: str) -> bool:
"""Checks if some URI path has glob syntax in it"""
return glob.has_magic(os.path.basename(os.path.normpath(path)))


def env2bool(var, undefined=False):
"""
undefined: return value if env var is unset
"""
var = os.getenv(var, None)
if var is None:
return undefined
return bool(re.search("1|y|yes|true", var, flags=re.IGNORECASE))
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
collect_ignore = ["setup.py"]


@pytest.fixture(scope="session", autouse=True)
def add_test_env():
os.environ["DATACHAIN_TEST"] = "true"


@pytest.fixture(autouse=True)
def virtual_memory(mocker):
class VirtualMemory(NamedTuple):
Expand Down
20 changes: 20 additions & 0 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from datachain.lib.dc import DataChain
from datachain.telemetry import telemetry


def test_is_enabled():
assert not telemetry.is_enabled()


def test_telemetry_api_call(mocker, tmp_dir):
patch_send = mocker.patch("iterative_telemetry.IterativeTelemetryLogger.send")
telemetry._event_sent = False

DataChain.from_storage(tmp_dir.as_uri())
shcheklein marked this conversation as resolved.
Show resolved Hide resolved
assert patch_send.call_count == 1
args = patch_send.call_args_list[0].args[0]
extra = args.pop("extra")

assert args == {"interface": "class", "action": "datachain_init", "error": None}

assert "name" in extra