Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-1458] Versioning extremely slow on DBFS #275

Closed
2 tasks
deepyaman opened this issue Mar 5, 2020 · 5 comments
Closed
2 tasks

[KED-1458] Versioning extremely slow on DBFS #275

deepyaman opened this issue Mar 5, 2020 · 5 comments
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed

Comments

@deepyaman
Copy link
Member

Description

Our pipeline running on Azure Databricks has gotten progressively slower. Somebody noticed that running on a fresh set of paths (without so many versions) was significantly faster. Further investigation yielded that it wasn't because of the data itself; instead, it's finding the existing versions that's prohibitively slow. Specifically, underlying functions used by iglob (like os.scandir) are much slower than their DBFS-native counterparts (e.g. dbutils.fs.ls).

Context

Pipelines that should take less than 2 hours are taking 3-5 times that.

Steps to Reproduce

On a Databricks cluster:

  1. Save a versioned SparkDataSet.
  2. Load it back.
  3. Save the same dataset 500 more times.
  4. Load it back.

Expected Result

Time taken for Step 4 should remain quite similar to that for Step 2.

Actual Result

Time explodes. 🌋

Possible Implementation

See DBFSDirEntry, _get_dbutils, _dbfs_scandir, and patches below:

# Copyright 2020 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
#     or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.

"""``AbstractDataSet`` implementation to access Spark data frames using
``pyspark``
"""

from contextlib import contextmanager
from copy import deepcopy
from fnmatch import fnmatch
from pathlib import Path, PurePosixPath
from typing import Any, Dict, List, Tuple
from unittest.mock import patch
from warnings import warn

import IPython
from hdfs import HdfsError, InsecureClient
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.utils import AnalysisException
from s3fs import S3FileSystem

from kedro.contrib.io import DefaultArgumentsMixIn  # isort:skip
from kedro.io import AbstractVersionedDataSet, Version  # isort:skip


def _parse_glob_pattern(pattern: str) -> str:
    special = ("*", "?", "[")
    clean = []
    for part in pattern.split("/"):
        if any(char in part for char in special):
            break
        clean.append(part)
    return "/".join(clean)


def _split_filepath(filepath: str) -> Tuple[str, str]:
    split_ = filepath.split("://", 1)
    if len(split_) == 2:
        return split_[0] + "://", split_[1]
    return "", split_[0]


def _strip_dbfs_prefix(path: str) -> str:
    return path[len("/dbfs") :] if path.startswith("/dbfs") else path


class DBFSDirEntry:
    """Mock ``DirEntry`` object yielded by DBUtils-based ``scandir``."""

    def __init__(self, file_info):
        self.name = file_info.name
        self.path = file_info.path
        self._is_dir = file_info.isDir()

    def is_dir(self):
        """Return True if the entry is a directory."""
        return self._is_dir


def _get_dbutils():
    return IPython.get_ipython().user_ns["dbutils"]


@contextmanager
def _dbfs_scandir(path):
    yield map(DBFSDirEntry, _get_dbutils().fs.ls(_strip_dbfs_prefix(path)))


class KedroHdfsInsecureClient(InsecureClient):
    """Subclasses ``hdfs.InsecureClient`` and implements ``hdfs_exists``
    and ``hdfs_glob`` methods required by ``SparkDataSet``"""

    def hdfs_exists(self, hdfs_path: str) -> bool:
        """Determines whether given ``hdfs_path`` exists in HDFS.

        Args:
            hdfs_path: Path to check.

        Returns:
            True if ``hdfs_path`` exists in HDFS, False otherwise.
        """
        return bool(self.status(hdfs_path, strict=False))

    def hdfs_glob(self, pattern: str) -> List[str]:
        """Perform a glob search in HDFS using the provided pattern.

        Args:
            pattern: Glob pattern to search for.

        Returns:
            List of HDFS paths that satisfy the glob pattern.
        """
        prefix = _parse_glob_pattern(pattern) or "/"
        matched = set()
        try:
            for dpath, _, fnames in self.walk(prefix):
                if fnmatch(dpath, pattern):
                    matched.add(dpath)
                matched |= set(
                    "{}/{}".format(dpath, fname)
                    for fname in fnames
                    if fnmatch("{}/{}".format(dpath, fname), pattern)
                )
        except HdfsError:  # pragma: no cover
            # HdfsError is raised by `self.walk()` if prefix does not exist in HDFS.
            # Ignore and return an empty list.
            pass
        return sorted(matched)


class SparkDataSet(DefaultArgumentsMixIn, AbstractVersionedDataSet):
    """``SparkDataSet`` loads and saves Spark data frames.

    Example:
    ::

        >>> from pyspark.sql import SparkSession
        >>> from pyspark.sql.types import (StructField, StringType,
        >>>                                IntegerType, StructType)
        >>>
        >>> from kedro.contrib.io.pyspark import SparkDataSet
        >>>
        >>> schema = StructType([StructField("name", StringType(), True),
        >>>                      StructField("age", IntegerType(), True)])
        >>>
        >>> data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]
        >>>
        >>> spark_df = SparkSession.builder.getOrCreate()\
        >>>                        .createDataFrame(data, schema)
        >>>
        >>> data_set = SparkDataSet(filepath="test_data")
        >>> data_set.save(spark_df)
        >>> reloaded = data_set.load()
        >>>
        >>> reloaded.take(4)
    """

    def _describe(self) -> Dict[str, Any]:
        return dict(
            filepath=self._fs_prefix + str(self._filepath),
            file_format=self._file_format,
            load_args=self._load_args,
            save_args=self._save_args,
            version=self._version,
        )

    def __init__(  # pylint: disable=too-many-arguments
        self,
        filepath: str,
        file_format: str = "parquet",
        load_args: Dict[str, Any] = None,
        save_args: Dict[str, Any] = None,
        version: Version = None,
        credentials: Dict[str, Any] = None,
    ) -> None:
        """Creates a new instance of ``SparkDataSet``.

        Args:
            filepath: path to a Spark data frame. When using Databricks
                and working with data written to mount path points,
                specify ``filepath``s for (versioned) ``SparkDataSet``s
                starting with ``/dbfs/mnt``.
            file_format: file format used during load and save
                operations. These are formats supported by the running
                SparkContext include parquet, csv. For a list of supported
                formats please refer to Apache Spark documentation at
                https://spark.apache.org/docs/latest/sql-programming-guide.html
            load_args: Load args passed to Spark DataFrameReader load method.
                It is dependent on the selected file format. You can find
                a list of read options for each supported format
                in Spark DataFrame read documentation:
                https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
            save_args: Save args passed to Spark DataFrame write options.
                Similar to load_args this is dependent on the selected file
                format. You can pass ``mode`` and ``partitionBy`` to specify
                your overwrite mode and partitioning respectively. You can find
                a list of options for each format in Spark DataFrame
                write documentation:
                https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
            version: If specified, should be an instance of
                ``kedro.io.core.Version``. If its ``load`` attribute is
                None, the latest version will be loaded. If its ``save``
                attribute is None, save version will be autogenerated.
            credentials: Credentials to access the S3 bucket, such as
                ``aws_access_key_id``, ``aws_secret_access_key``, if ``filepath``
                prefix is ``s3a://`` or ``s3n://``. Optional keyword arguments passed to
                ``hdfs.client.InsecureClient`` if ``filepath`` prefix is ``hdfs://``.
                Ignored otherwise.
        """
        credentials = deepcopy(credentials) or {}
        fs_prefix, filepath = _split_filepath(filepath)

        if fs_prefix in ("s3a://", "s3n://"):
            if fs_prefix == "s3n://":
                warn(
                    "`s3n` filesystem has now been deprecated by Spark, "
                    "please consider switching to `s3a`",
                    DeprecationWarning,
                )
            _s3 = S3FileSystem(client_kwargs=credentials)
            exists_function = _s3.exists
            glob_function = _s3.glob
            path = PurePosixPath(filepath)

        elif fs_prefix == "hdfs://" and version:
            warn(
                "HDFS filesystem support for versioned {} is in beta and uses "
                "`hdfs.client.InsecureClient`, please use with caution".format(
                    self.__class__.__name__
                )
            )

            # default namenode address
            credentials.setdefault("url", "http://localhost:9870")
            credentials.setdefault("user", "hadoop")

            _hdfs_client = KedroHdfsInsecureClient(**credentials)
            exists_function = _hdfs_client.hdfs_exists
            glob_function = _hdfs_client.hdfs_glob
            path = PurePosixPath(filepath)

        else:
            exists_function = glob_function = None  # type: ignore
            path = Path(filepath)  # type: ignore

        super().__init__(
            load_args=load_args,
            save_args=save_args,
            filepath=path,
            version=version,
            exists_function=exists_function,
            glob_function=glob_function,
        )

        self._file_format = file_format
        self._fs_prefix = fs_prefix

    @staticmethod
    def _get_spark():
        return SparkSession.builder.getOrCreate()

    @patch("os.scandir", _dbfs_scandir)
    def _load(self) -> DataFrame:
        with patch("os.path.lexists", return_value=True):
            load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_load_path()))
        self._logger.info('`load_path = "%s"`', load_path)

        return self._get_spark().read.load(
            load_path, self._file_format, **self._load_args
        )

    @patch("os.scandir", _dbfs_scandir)
    def _save(self, data: DataFrame) -> None:
        with patch("os.path.lexists", return_value=True):
            save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_save_path()))
        self._logger.info('`save_path = "%s"`', save_path)
        data.write.save(save_path, self._file_format, **self._save_args)

    @patch("os.scandir", _dbfs_scandir)
    def _exists(self) -> bool:
        with patch("os.path.lexists", return_value=True):
            load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_load_path()))
        self._logger.info('`load_path = "%s"`', load_path)

        try:
            self._get_spark().read.load(load_path, self._file_format)
        except AnalysisException as exception:
            if exception.desc.startswith("Path does not exist:"):
                return False
            raise
        return True

I believe it's fine to have DBFS-specific code, as we already do. However, a few necessary enhancements:

  • Detect DBFS (e.g. look for DATABRICKS_RUNTIME_VERSION environment variable) and apply patches conditionally.
  • Apply to any dataset, not just SparkDataSet, when the filesystem is DBFS (versioning should be the same). => Actually implement this code as part of the versioning mix-in?

Possible Alternatives

  • Define a DBFS glob function from scratch (DBUtils doesn't provide one AFAIK).
  • Actually manage (archive?) versions instead of keeping hundreds. ¯\(ツ)

Your Environment

Include as many relevant details about the environment in which you experienced the bug:

  • Kedro version used (pip show kedro or kedro -V): 0.15.5
  • Python version used (python -V): Python 3.7.3
  • Operating system and version: Databricks Runtime Version 5.5 Conda Beta (includes Apache Spark 2.4.3, Scala 2.11)
@deepyaman deepyaman added the Issue: Bug Report 🐞 Bug that needs to be fixed label Mar 5, 2020
@deepyaman
Copy link
Member Author

Line-by-line profiling of iglob

_iterdir only:

Total time: 30.5995 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _iterdir at line 114

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   114                                           def _iterdir(dirname, dironly):
   115         1          0.0      0.0      0.0      if not dirname:
   116                                                   if isinstance(dirname, bytes):
   117                                                       dirname = bytes(os.curdir, 'ASCII')
   118                                                   else:
   119                                                       dirname = os.curdir
   120         1          0.0      0.0      0.0      try:
   121         1     738357.0 738357.0      2.4          with os.scandir(dirname) as it:
   122       321     200485.0    624.6      0.7              for entry in it:
   123       320        269.0      0.8      0.0                  try:
   124       320   29659507.0  92686.0     96.9                      if not dironly or entry.is_dir():
   125       320        877.0      2.7      0.0                          yield entry.name
   126                                                           except OSError:
   127                                                               pass
   128                                               except OSError:
   129                                                   return

Full output:

Total time: 91.9984 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _iglob at line 39

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    39                                           def _iglob(pathname, recursive, dironly):
    40         2         14.0      7.0      0.0      dirname, basename = os.path.split(pathname)
    41         2          8.0      4.0      0.0      if not has_magic(pathname):
    42                                                   assert not dironly
    43                                                   if basename:
    44                                                       if os.path.lexists(pathname):
    45                                                           yield pathname
    46                                                   else:
    47                                                       # Patterns ending with a slash should match only directories
    48                                                       if os.path.isdir(dirname):
    49                                                           yield pathname
    50                                                   return
    51         2          2.0      1.0      0.0      if not dirname:
    52                                                   if recursive and _isrecursive(basename):
    53                                                       yield from _glob2(dirname, basename, dironly)
    54                                                   else:
    55                                                       yield from _glob1(dirname, basename, dironly)
    56                                                   return
    57                                               # `os.path.split()` returns the argument itself as a dirname if it is a
    58                                               # drive or UNC path.  Prevent an infinite recursion if a drive or UNC path
    59                                               # contains magic characters (i.e. r'\\?\C:').
    60         2          4.0      2.0      0.0      if dirname != pathname and has_magic(dirname):
    61         1          1.0      1.0      0.0          dirs = _iglob(dirname, recursive, True)
    62                                               else:
    63         1          1.0      1.0      0.0          dirs = [dirname]
    64         2          3.0      1.5      0.0      if has_magic(basename):
    65         1          0.0      0.0      0.0          if recursive and _isrecursive(basename):
    66                                                       glob_in_dir = _glob2
    67                                                   else:
    68         1          0.0      0.0      0.0              glob_in_dir = _glob1
    69                                               else:
    70         1          0.0      0.0      0.0          glob_in_dir = _glob0
    71       323        325.0      1.0      0.0      for dirname in dirs:
    72       960   91990364.0  95823.3    100.0          for name in glob_in_dir(dirname, basename, dironly):
    73       639       7706.0     12.1      0.0              yield os.path.join(dirname, name)

Total time: 30.6017 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _glob1 at line 79

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    79                                           def _glob1(dirname, pattern, dironly):
    80         1   30601157.0 30601157.0    100.0      names = list(_iterdir(dirname, dironly))
    81         1          6.0      6.0      0.0      if not _ishidden(pattern):
    82         1         18.0     18.0      0.0          names = (x for x in names if not _ishidden(x))
    83         1        516.0    516.0      0.0      return fnmatch.filter(names, pattern)

Total time: 61.3859 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _glob0 at line 85

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    85                                           def _glob0(dirname, basename, dironly):
    86       320        322.0      1.0      0.0      if not basename:
    87                                                   # `os.path.split()` returns an empty basename for paths ending with a
    88                                                   # directory separator.  'q*x/' should match only directories.
    89                                                   if os.path.isdir(dirname):
    90                                                       return [basename]
    91                                               else:
    92       320   61384780.0 191827.4    100.0          if os.path.lexists(os.path.join(dirname, basename)):
    93       319        778.0      2.4      0.0              return [basename]
    94         1          2.0      2.0      0.0      return []

Total time: 0 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _glob2 at line 107

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   107                                           def _glob2(dirname, pattern, dironly):
   108                                               assert _isrecursive(pattern)
   109                                               yield pattern[:0]
   110                                               yield from _rlistdir(dirname, dironly)

Total time: 30.5995 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _iterdir at line 114

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   114                                           def _iterdir(dirname, dironly):
   115         1          0.0      0.0      0.0      if not dirname:
   116                                                   if isinstance(dirname, bytes):
   117                                                       dirname = bytes(os.curdir, 'ASCII')
   118                                                   else:
   119                                                       dirname = os.curdir
   120         1          0.0      0.0      0.0      try:
   121         1     738357.0 738357.0      2.4          with os.scandir(dirname) as it:
   122       321     200485.0    624.6      0.7              for entry in it:
   123       320        269.0      0.8      0.0                  try:
   124       320   29659507.0  92686.0     96.9                      if not dironly or entry.is_dir():
   125       320        877.0      2.7      0.0                          yield entry.name
   126                                                           except OSError:
   127                                                               pass
   128                                               except OSError:
   129                                                   return

Total time: 92.0008 s

@deepyaman
Copy link
Member Author

Improving os.scandir performance

_iterdir only:

Total time: 0.468829 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _iterdir at line 114

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   114                                           def _iterdir(dirname, dironly):
   115         1          1.0      1.0      0.0      if not dirname:
   116                                                   if isinstance(dirname, bytes):
   117                                                       dirname = bytes(os.curdir, 'ASCII')
   118                                                   else:
   119                                                       dirname = os.curdir
   120         1          0.0      0.0      0.0      try:
   121         1     467526.0 467526.0     99.7          with os.scandir(dirname) as it:
   122       321        790.0      2.5      0.2              for entry in it:
   123       320        132.0      0.4      0.0                  try:
   124       320        236.0      0.7      0.1                      if not dironly or entry.is_dir():
   125       320        144.0      0.5      0.0                          yield entry.name
   126                                                           except OSError:
   127                                                               pass
   128                                               except OSError:
   129                                                   return

Full output:

Timer unit: 1e-06 s

Total time: 55.0211 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _iglob at line 39

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    39                                           def _iglob(pathname, recursive, dironly):
    40         2         16.0      8.0      0.0      dirname, basename = os.path.split(pathname)
    41         2         11.0      5.5      0.0      if not has_magic(pathname):
    42                                                   assert not dironly
    43                                                   if basename:
    44                                                       if os.path.lexists(pathname):
    45                                                           yield pathname
    46                                                   else:
    47                                                       # Patterns ending with a slash should match only directories
    48                                                       if os.path.isdir(dirname):
    49                                                           yield pathname
    50                                                   return
    51         2          1.0      0.5      0.0      if not dirname:
    52                                                   if recursive and _isrecursive(basename):
    53                                                       yield from _glob2(dirname, basename, dironly)
    54                                                   else:
    55                                                       yield from _glob1(dirname, basename, dironly)
    56                                                   return
    57                                               # `os.path.split()` returns the argument itself as a dirname if it is a
    58                                               # drive or UNC path.  Prevent an infinite recursion if a drive or UNC path
    59                                               # contains magic characters (i.e. r'\\?\C:').
    60         2          4.0      2.0      0.0      if dirname != pathname and has_magic(dirname):
    61         1          1.0      1.0      0.0          dirs = _iglob(dirname, recursive, True)
    62                                               else:
    63         1          1.0      1.0      0.0          dirs = [dirname]
    64         2          3.0      1.5      0.0      if has_magic(basename):
    65         1          1.0      1.0      0.0          if recursive and _isrecursive(basename):
    66                                                       glob_in_dir = _glob2
    67                                                   else:
    68         1          1.0      1.0      0.0              glob_in_dir = _glob1
    69                                               else:
    70         1          0.0      0.0      0.0          glob_in_dir = _glob0
    71       323        399.0      1.2      0.0      for dirname in dirs:
    72       960   55012053.0  57304.2    100.0          for name in glob_in_dir(dirname, basename, dironly):
    73       639       8617.0     13.5      0.0              yield os.path.join(dirname, name)

Total time: 0.470102 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _glob1 at line 79

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    79                                           def _glob1(dirname, pattern, dironly):
    80         1     469590.0 469590.0     99.9      names = list(_iterdir(dirname, dironly))
    81         1          3.0      3.0      0.0      if not _ishidden(pattern):
    82         1          1.0      1.0      0.0          names = (x for x in names if not _ishidden(x))
    83         1        508.0    508.0      0.1      return fnmatch.filter(names, pattern)

Total time: 54.5389 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _glob0 at line 85

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    85                                           def _glob0(dirname, basename, dironly):
    86       320        367.0      1.1      0.0      if not basename:
    87                                                   # `os.path.split()` returns an empty basename for paths ending with a
    88                                                   # directory separator.  'q*x/' should match only directories.
    89                                                   if os.path.isdir(dirname):
    90                                                       return [basename]
    91                                               else:
    92       320   54537456.0 170429.5    100.0          if os.path.lexists(os.path.join(dirname, basename)):
    93       319       1056.0      3.3      0.0              return [basename]
    94         1          2.0      2.0      0.0      return []

Total time: 0 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _glob2 at line 107

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   107                                           def _glob2(dirname, pattern, dironly):
   108                                               assert _isrecursive(pattern)
   109                                               yield pattern[:0]
   110                                               yield from _rlistdir(dirname, dironly)

Total time: 0.468829 s
File: /local_disk0/pythonVirtualEnvDirs/virtualEnv-063104b0-b822-45b8-9af8-42b9823051f9/lib/python3.7/glob.py
Function: _iterdir at line 114

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   114                                           def _iterdir(dirname, dironly):
   115         1          1.0      1.0      0.0      if not dirname:
   116                                                   if isinstance(dirname, bytes):
   117                                                       dirname = bytes(os.curdir, 'ASCII')
   118                                                   else:
   119                                                       dirname = os.curdir
   120         1          0.0      0.0      0.0      try:
   121         1     467526.0 467526.0     99.7          with os.scandir(dirname) as it:
   122       321        790.0      2.5      0.2              for entry in it:
   123       320        132.0      0.4      0.0                  try:
   124       320        236.0      0.7      0.1                      if not dironly or entry.is_dir():
   125       320        144.0      0.5      0.0                          yield entry.name
   126                                                           except OSError:
   127                                                               pass
   128                                               except OSError:
   129                                                   return

Total time: 55.0237 s

@deepyaman
Copy link
Member Author

Between #275 (comment) and #275 (comment), the time spent on line 124 has disappeared! The remaining bottleneck is line 92. However, since we've patched the code that creates dirs, we don't even need this check anymore (i.e., dbutils.fs.ls will only return files that exist)! So we just patch it to always return True. 😁

@mzjp2
Copy link
Contributor

mzjp2 commented Mar 5, 2020

This is a great writeup, thanks for such a detailed issue 👌

@yetudada yetudada changed the title Versioning extremely slow on DBFS [KED-1458] Versioning extremely slow on DBFS Mar 9, 2020
@andrii-ivaniuk
Copy link
Contributor

andrii-ivaniuk commented Mar 31, 2020

Fixed in merge commit: 6bf1066

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed
Projects
None yet
Development

No branches or pull requests

3 participants