diff --git a/chart/static-files/openapi.json b/chart/static-files/openapi.json index 11b6b1fedb..59a058d956 100644 --- a/chart/static-files/openapi.json +++ b/chart/static-files/openapi.json @@ -925,11 +925,11 @@ "properties": { "parquet_files": { "type": "array", - "items": { "$ref": "#/components/schemas/ParquetFileItem" } + "items": { "$ref": "#/components/schemas/SplitHubFile" } } } }, - "ParquetFileItem": { + "SplitHubFile": { "type": "object", "required": ["dataset", "config", "split", "url", "filename", "size"], "properties": { diff --git a/chart/templates/_envWorker.tpl b/chart/templates/_envWorker.tpl index 6f2d141154..77a03377f1 100644 --- a/chart/templates/_envWorker.tpl +++ b/chart/templates/_envWorker.tpl @@ -87,4 +87,25 @@ - name: CONFIG_NAMES_MAX_NUMBER value: {{ .Values.configNames.maxNumber | quote }} +# specific to 'split-duckdb-index' job runner +- name: DUCKDB_INDEX_COMMIT_MESSAGE + value: {{ .Values.duckDBIndex.commitMessage | quote }} +- name: DUCKDB_INDEX_COMMITTER_HF_TOKEN + {{- if .Values.secrets.appParquetConverterHfToken.fromSecret }} + valueFrom: + secretKeyRef: + name: {{ .Values.secrets.appParquetConverterHfToken.secretName | quote }} + key: HF_TOKEN + optional: false + {{- else }} + value: {{ .Values.secrets.appParquetConverterHfToken.value }} + {{- end }} +- name: DUCKDB_INDEX_TARGET_REVISION + value: {{ .Values.duckDBIndex.targetRevision | quote }} +- name: DUCKDB_INDEX_URL_TEMPLATE + value: {{ .Values.duckDBIndex.urlTemplate | quote }} +- name: DUCKDB_INDEX_MAX_PARQUET_SIZE_BYTES + value: {{ .Values.duckDBIndex.maxParquetSizeBytes | quote }} +- name: DUCKDB_INDEX_STORAGE_DIRECTORY + value: {{ .Values.duckDBIndex.storageDirectory | quote }} {{- end -}} diff --git a/chart/templates/_helpers.tpl b/chart/templates/_helpers.tpl index 12aeec4c7d..ac9370e664 100644 --- a/chart/templates/_helpers.tpl +++ b/chart/templates/_helpers.tpl @@ -169,6 +169,15 @@ The parquet-metadata/ subpath in the NFS {{- printf "%s/%s/%s/" .Chart.Name .Release.Name "parquet-metadata" }} {{- end }} +{{/* +The duckdb-index/ subpath in the NFS +- in a subdirectory named as the chart (datasets-server/), and below it, +- in a subdirectory named as the Release, so that Releases will not share the same dir +*/}} +{{- define "duckDBIndex.subpath" -}} +{{- printf "%s/%s/%s/" .Chart.Name .Release.Name "duckdb-index" }} +{{- end }} + {{/* The datasets library will use this directory as a cache - in a subdirectory named as the chart (datasets-server/), and below it, diff --git a/chart/templates/_initContainerDuckDBIndex.tpl b/chart/templates/_initContainerDuckDBIndex.tpl new file mode 100644 index 0000000000..ed7cb43bc3 --- /dev/null +++ b/chart/templates/_initContainerDuckDBIndex.tpl @@ -0,0 +1,21 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2023 The HuggingFace Authors. + +{{- define "initContainerDuckDBIndex" -}} +- name: prepare-duckdb-index + image: ubuntu:focal + imagePullPolicy: {{ .Values.images.pullPolicy }} + command: ["/bin/sh", "-c"] + args: + - chown {{ .Values.uid }}:{{ .Values.gid }} /mounted-path; + volumeMounts: + - mountPath: /mounted-path + mountPropagation: None + name: data + subPath: "{{ include "duckDBIndex.subpath" . }}" + readOnly: false + securityContext: + runAsNonRoot: false + runAsUser: 0 + runAsGroup: 0 +{{- end -}} diff --git a/chart/templates/_volumeMountDuckDBIndex.tpl b/chart/templates/_volumeMountDuckDBIndex.tpl new file mode 100644 index 0000000000..01c37b8919 --- /dev/null +++ b/chart/templates/_volumeMountDuckDBIndex.tpl @@ -0,0 +1,10 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2023 The HuggingFace Authors. + +{{- define "volumeMountDuckDBIndexRW" -}} +- mountPath: {{ .Values.duckDBIndex.storageDirectory | quote }} + mountPropagation: None + name: data + subPath: "{{ include "duckDBIndex.subpath" . }}" + readOnly: false +{{- end -}} diff --git a/chart/templates/worker/_container.tpl b/chart/templates/worker/_container.tpl index 9f83bad84d..f9b86817a9 100644 --- a/chart/templates/worker/_container.tpl +++ b/chart/templates/worker/_container.tpl @@ -24,6 +24,7 @@ {{ include "volumeMountAssetsRW" . | nindent 2 }} {{ include "volumeMountCache" . | nindent 2 }} {{ include "volumeMountParquetMetadataRW" . | nindent 2 }} + {{ include "volumeMountDuckDBIndexRW" . | nindent 2 }} securityContext: allowPrivilegeEscalation: false resources: {{ toYaml .workerValues.resources | nindent 4 }} diff --git a/chart/templates/worker/_deployment.yaml b/chart/templates/worker/_deployment.yaml index e06d319c65..03a70646ae 100644 --- a/chart/templates/worker/_deployment.yaml +++ b/chart/templates/worker/_deployment.yaml @@ -26,6 +26,7 @@ spec: {{ include "initContainerAssets" . | nindent 8 }} {{ include "initContainerCache" . | nindent 8 }} {{ include "initContainerParquetMetadata" . | nindent 8 }} + {{ include "initContainerDuckDBIndex" . | nindent 8 }} containers: {{ include "containerWorker" . | nindent 8 }} nodeSelector: {{ toYaml .workerValues.nodeSelector | nindent 8 }} tolerations: {{ toYaml .workerValues.tolerations | nindent 8 }} diff --git a/chart/values.yaml b/chart/values.yaml index 4a57fc7a37..98cc505b1e 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -214,6 +214,17 @@ parquetMetadata: # Directory on the shared storage (parquet metadata files used for random access in /rows) storageDirectory: "/parquet-metadata" +duckDBIndex: + # Directory on the shared storage (used temporarily to prepare the duckdb indexes before sending to the Hub) + storageDirectory: "/duckdb-index" + # the git commit message when the duckdb index file is uploaded to the Hub. Defaults to `Update duckdb index files`. + commitMessage: "Update duckdb index files" + # the git revision of the dataset where to store the duckdb index file. Defaults to `refs/convert/parquet`. + targetRevision: "refs/convert/parquet" + # the URL template to build the duckdb index file URL. Defaults to `/datasets/%s/resolve/%s/%s`. + urlTemplate: "/datasets/%s/resolve/%s/%s" + # the maximum size of the split parquets. + maxParquetSizeBytes: "100_000_000" # Directory where the cache data will be stored cacheDirectory: "/datasets-server-cache" diff --git a/libs/libcommon/src/libcommon/config.py b/libs/libcommon/src/libcommon/config.py index e4d63e70ac..231ec1f55f 100644 --- a/libs/libcommon/src/libcommon/config.py +++ b/libs/libcommon/src/libcommon/config.py @@ -24,6 +24,7 @@ PROCESSING_STEP_DATASET_PARQUET_VERSION, PROCESSING_STEP_DATASET_SIZE_VERSION, PROCESSING_STEP_DATASET_SPLIT_NAMES_VERSION, + PROCESSING_STEP_SPLIT_DUCKDB_INDEX_VERSION, PROCESSING_STEP_SPLIT_FIRST_ROWS_FROM_PARQUET_VERSION, PROCESSING_STEP_SPLIT_FIRST_ROWS_FROM_STREAMING_VERSION, PROCESSING_STEP_SPLIT_IMAGE_URL_COLUMNS_VERSION, @@ -104,6 +105,39 @@ def from_env(cls) -> "ParquetMetadataConfig": ) +DUCKDB_INDEX_STORAGE_DIRECTORY = None +DUCKDB_INDEX_COMMIT_MESSAGE = "Update duckdb index file" +DUCKDB_INDEX_COMMITTER_HF_TOKEN = None +DUCKDB_INDEX_MAX_PARQUET_SIZE_BYTES = 100_000_000 +DUCKDB_INDEX_TARGET_REVISION = "refs/convert/parquet" +DUCKDB_INDEX_URL_TEMPLATE = "/datasets/%s/resolve/%s/%s" + + +@dataclass(frozen=True) +class DuckDbIndexConfig: + storage_directory: Optional[str] = DUCKDB_INDEX_STORAGE_DIRECTORY + commit_message: str = DUCKDB_INDEX_COMMIT_MESSAGE + committer_hf_token: Optional[str] = DUCKDB_INDEX_COMMITTER_HF_TOKEN + target_revision: str = DUCKDB_INDEX_TARGET_REVISION + url_template: str = DUCKDB_INDEX_URL_TEMPLATE + max_parquet_size_bytes: int = DUCKDB_INDEX_MAX_PARQUET_SIZE_BYTES + + @classmethod + def from_env(cls) -> "DuckDbIndexConfig": + env = Env(expand_vars=True) + with env.prefixed("DUCKDB_INDEX_"): + return cls( + storage_directory=env.str(name="STORAGE_DIRECTORY", default=DUCKDB_INDEX_STORAGE_DIRECTORY), + commit_message=env.str(name="COMMIT_MESSAGE", default=DUCKDB_INDEX_COMMIT_MESSAGE), + committer_hf_token=env.str(name="COMMITTER_HF_TOKEN", default=DUCKDB_INDEX_COMMITTER_HF_TOKEN), + target_revision=env.str(name="TARGET_REVISION", default=DUCKDB_INDEX_TARGET_REVISION), + url_template=env.str(name="URL_TEMPLATE", default=DUCKDB_INDEX_URL_TEMPLATE), + max_parquet_size_bytes=env.int( + name="MAX_PARQUET_SIZE_BYTES", default=DUCKDB_INDEX_MAX_PARQUET_SIZE_BYTES + ), + ) + + COMMON_HF_ENDPOINT = "https://huggingface.co" COMMON_HF_TOKEN = None @@ -319,6 +353,13 @@ class ProcessingGraphConfig: "triggered_by": ["dataset-config-names", "config-opt-in-out-urls-count"], "job_runner_version": PROCESSING_STEP_DATASET_OPT_IN_OUT_URLS_COUNT_VERSION, }, + "split-duckdb-index": { + "input_type": "split", + "triggered_by": [ + "config-split-names-from-info", + ], + "job_runner_version": PROCESSING_STEP_SPLIT_DUCKDB_INDEX_VERSION, + }, } ) diff --git a/libs/libcommon/src/libcommon/constants.py b/libs/libcommon/src/libcommon/constants.py index cd41126f4a..66a609b67e 100644 --- a/libs/libcommon/src/libcommon/constants.py +++ b/libs/libcommon/src/libcommon/constants.py @@ -6,6 +6,7 @@ CACHE_MONGOENGINE_ALIAS = "cache" CACHED_ASSETS_CACHE_APPNAME = "datasets_server_cached_assets" PARQUET_METADATA_CACHE_APPNAME = "datasets_server_parquet_metadata" +DUCKDB_INDEX_CACHE_APPNAME = "datasets_server_duckdb_index" METRICS_COLLECTION_CACHE_TOTAL_METRIC = "cacheTotalMetric" METRICS_COLLECTION_JOB_TOTAL_METRIC = "jobTotalMetric" METRICS_MONGOENGINE_ALIAS = "metrics" @@ -36,6 +37,7 @@ PROCESSING_STEP_SPLIT_OPT_IN_OUT_URLS_COUNT_VERSION = 2 PROCESSING_STEP_SPLIT_OPT_IN_OUT_URLS_SCAN_VERSION = 4 PROCESSING_STEP_SPLIT_IMAGE_URL_COLUMNS_VERSION = 1 +PROCESSING_STEP_SPLIT_DUCKDB_INDEX_VERSION = 1 PROCESSING_STEP_CONFIG_PARQUET_AND_INFO_ROW_GROUP_SIZE_FOR_AUDIO_DATASETS = 100 PROCESSING_STEP_CONFIG_PARQUET_AND_INFO_ROW_GROUP_SIZE_FOR_IMAGE_DATASETS = 100 diff --git a/libs/libcommon/src/libcommon/exceptions.py b/libs/libcommon/src/libcommon/exceptions.py index f9de8f3fae..46e66228de 100644 --- a/libs/libcommon/src/libcommon/exceptions.py +++ b/libs/libcommon/src/libcommon/exceptions.py @@ -73,6 +73,7 @@ def as_response(self) -> ErrorResponse: CacheableErrorCode = Literal[ + "CacheDirectoryNotInitializedError", "ConfigNamesError", "CreateCommitError", "DatasetInBlockListError", @@ -89,6 +90,7 @@ def as_response(self) -> ErrorResponse: "DatasetWithTooManyConfigsError", "DatasetWithTooManyParquetFilesError", "DisabledViewerError", + "DuckDBIndexFileNotFoundError", "EmptyDatasetError", "ExternalFilesSizeRequestConnectionError", "ExternalFilesSizeRequestError", @@ -102,6 +104,7 @@ def as_response(self) -> ErrorResponse: "JobManagerExceededMaximumDurationError", "LockedDatasetTimeoutError", "MissingSpawningTokenError", + "NoIndexableColumnsError", "NormalRowsError", "ParameterMissingError", "ParquetResponseEmptyError", @@ -112,6 +115,7 @@ def as_response(self) -> ErrorResponse: "SplitsNamesError", "SplitNamesFromStreamingError", "SplitNotFoundError", + "SplitWithTooBigParquetError", "StreamingRowsError", "TooBigContentError", "TooManyColumnsError", @@ -136,6 +140,13 @@ def __init__( ) +class CacheDirectoryNotInitializedError(CacheableError): + """Raised when the cache directory has not been initialized before job compute.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "CacheDirectoryNotInitializedError", cause, True) + + class ConfigNamesError(CacheableError): """Raised when the config names could not be fetched.""" @@ -232,6 +243,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "DatasetWithTooBigExternalFilesError", cause, True) +class DatasetWithTooManyConfigsError(CacheableError): + """Raised when the number of configs of a dataset exceeded the limit.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "DatasetWithTooManyConfigsError", cause, True) + + class DatasetWithTooManyExternalFilesError(CacheableError): """Raised when the number of external data files of a dataset is too big.""" @@ -246,11 +264,11 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "DatasetWithTooManyParquetFilesError", cause, True) -class LockedDatasetTimeoutError(CacheableError): - """Raised when a dataset is locked by another job.""" +class DuckDBIndexFileNotFoundError(CacheableError): + """Raised when no duckdb index file was found for split.""" def __init__(self, message: str, cause: Optional[BaseException] = None): - super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "LockedDatasetTimeoutError", cause, True) + super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "DuckDBIndexFileNotFoundError", cause, False) class DisabledViewerError(CacheableError): @@ -355,6 +373,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): ) +class LockedDatasetTimeoutError(CacheableError): + """Raised when a dataset is locked by another job.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "LockedDatasetTimeoutError", cause, True) + + class MissingSpawningTokenError(CacheableError): """Raised when the spawning.ai token is not set.""" @@ -369,6 +394,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "NormalRowsError", cause, True) +class NoIndexableColumnsError(CacheableError): + """Raised when split does not have string columns to index.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "NoIndexableColumnsError", cause, True) + + class ParameterMissingError(CacheableError): """Raised when request is missing some parameter.""" @@ -450,6 +482,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): ) +class SplitWithTooBigParquetError(CacheableError): + """Raised when the split parquet size (sum of parquet sizes given) is too big.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "SplitWithTooBigParquetError", cause, False) + + class StreamingRowsError(CacheableError): """Raised when the rows could not be fetched in streaming mode.""" @@ -496,10 +535,3 @@ class UnsupportedExternalFilesError(CacheableError): def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "UnsupportedExternalFilesError", cause, True) - - -class DatasetWithTooManyConfigsError(CacheableError): - """Raised when the number of configs of a dataset exceeded the limit.""" - - def __init__(self, message: str, cause: Optional[BaseException] = None): - super().__init__(message, HTTPStatus.NOT_IMPLEMENTED, "DatasetWithTooManyConfigsError", cause, True) diff --git a/libs/libcommon/src/libcommon/parquet_utils.py b/libs/libcommon/src/libcommon/parquet_utils.py index 0f8ca107af..351cb65e64 100644 --- a/libs/libcommon/src/libcommon/parquet_utils.py +++ b/libs/libcommon/src/libcommon/parquet_utils.py @@ -20,6 +20,7 @@ from libcommon.processing_graph import ProcessingGraph from libcommon.prometheus import StepProfiler from libcommon.simple_cache import get_previous_step_or_raise +from libcommon.utils import SplitHubFile StrPath = Union[str, PathLike[str]] @@ -36,15 +37,6 @@ class FileSystemError(Exception): pass -class ParquetFileItem(TypedDict): - dataset: str - config: str - split: str - url: str - filename: str - size: int - - class ParquetFileMetadataItem(TypedDict): dataset: str config: str @@ -134,7 +126,7 @@ def query(self, offset: int, length: int) -> pa.Table: @staticmethod def from_parquet_file_items( - parquet_file_items: List[ParquetFileItem], + parquet_file_items: List[SplitHubFile], dataset: str, config: str, split: str, diff --git a/libs/libcommon/src/libcommon/storage.py b/libs/libcommon/src/libcommon/storage.py index bbef1442be..63d9c10853 100644 --- a/libs/libcommon/src/libcommon/storage.py +++ b/libs/libcommon/src/libcommon/storage.py @@ -12,6 +12,7 @@ from libcommon.constants import ( ASSETS_CACHE_APPNAME, CACHED_ASSETS_CACHE_APPNAME, + DUCKDB_INDEX_CACHE_APPNAME, PARQUET_METADATA_CACHE_APPNAME, ) @@ -81,6 +82,20 @@ def init_parquet_metadata_dir(directory: Optional[StrPath] = None) -> StrPath: return init_dir(directory, appname=PARQUET_METADATA_CACHE_APPNAME) +def init_duckdb_index_cache_dir(directory: Optional[StrPath] = None) -> StrPath: + """Initialize the duckdb index directory. + + If directory is None, it will be set to the default duckdb index location on the machine. + + Args: + directory (Optional[Union[str, PathLike[str]]], optional): The directory to initialize. Defaults to None. + + Returns: + Union[str, PathLike[str]]: The directory. + """ + return init_dir(directory, appname=DUCKDB_INDEX_CACHE_APPNAME) + + def exists(path: StrPath) -> bool: """Check if a path exists. diff --git a/libs/libcommon/src/libcommon/utils.py b/libs/libcommon/src/libcommon/utils.py index 301a760956..b921ea787b 100644 --- a/libs/libcommon/src/libcommon/utils.py +++ b/libs/libcommon/src/libcommon/utils.py @@ -65,6 +65,15 @@ class JobResult(TypedDict): output: Optional[JobOutput] +class SplitHubFile(TypedDict): + dataset: str + config: str + split: str + url: str + filename: str + size: int + + # orjson is used to get rid of errors with datetime (see allenai/c4) def orjson_default(obj: Any) -> Any: if isinstance(obj, bytes): diff --git a/libs/libcommon/tests/test_processing_graph.py b/libs/libcommon/tests/test_processing_graph.py index 49bd37fbb1..c75d2104c2 100644 --- a/libs/libcommon/tests/test_processing_graph.py +++ b/libs/libcommon/tests/test_processing_graph.py @@ -93,13 +93,18 @@ def graph() -> ProcessingGraph: "config-opt-in-out-urls-count", "split-first-rows-from-streaming", "dataset-split-names", + "split-duckdb-index", ], ["config-info"], ["dataset-config-names", "config-parquet-and-info", "config-info"], ), ( "config-split-names-from-streaming", - ["split-first-rows-from-streaming", "dataset-split-names", "config-opt-in-out-urls-count"], + [ + "split-first-rows-from-streaming", + "dataset-split-names", + "config-opt-in-out-urls-count", + ], ["dataset-config-names"], ["dataset-config-names"], ), @@ -287,6 +292,17 @@ def graph() -> ProcessingGraph: "split-image-url-columns", ], ), + ( + "split-duckdb-index", + [], + ["config-split-names-from-info"], + [ + "config-split-names-from-info", + "config-parquet-and-info", + "config-info", + "dataset-config-names", + ], + ), ], ) def test_default_graph_steps( diff --git a/services/worker/poetry.lock b/services/worker/poetry.lock index 0ece7030b5..f2363ed9e7 100644 --- a/services/worker/poetry.lock +++ b/services/worker/poetry.lock @@ -994,6 +994,68 @@ files = [ dnssec = ["ecdsa (>=0.13)", "pycryptodome"] idna = ["idna (>=2.1)"] +[[package]] +name = "duckdb" +version = "0.8.1" +description = "DuckDB embedded database" +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "duckdb-0.8.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:14781d21580ee72aba1f5dcae7734674c9b6c078dd60470a08b2b420d15b996d"}, + {file = "duckdb-0.8.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f13bf7ab0e56ddd2014ef762ae4ee5ea4df5a69545ce1191b8d7df8118ba3167"}, + {file = "duckdb-0.8.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e4032042d8363e55365bbca3faafc6dc336ed2aad088f10ae1a534ebc5bcc181"}, + {file = "duckdb-0.8.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:31a71bd8f0b0ca77c27fa89b99349ef22599ffefe1e7684ae2e1aa2904a08684"}, + {file = "duckdb-0.8.1-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:24568d6e48f3dbbf4a933109e323507a46b9399ed24c5d4388c4987ddc694fd0"}, + {file = "duckdb-0.8.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:297226c0dadaa07f7c5ae7cbdb9adba9567db7b16693dbd1b406b739ce0d7924"}, + {file = "duckdb-0.8.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:5792cf777ece2c0591194006b4d3e531f720186102492872cb32ddb9363919cf"}, + {file = "duckdb-0.8.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:12803f9f41582b68921d6b21f95ba7a51e1d8f36832b7d8006186f58c3d1b344"}, + {file = "duckdb-0.8.1-cp310-cp310-win32.whl", hash = "sha256:d0953d5a2355ddc49095e7aef1392b7f59c5be5cec8cdc98b9d9dc1f01e7ce2b"}, + {file = "duckdb-0.8.1-cp310-cp310-win_amd64.whl", hash = "sha256:6e6583c98a7d6637e83bcadfbd86e1f183917ea539f23b6b41178f32f813a5eb"}, + {file = "duckdb-0.8.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:fad7ed0d4415f633d955ac24717fa13a500012b600751d4edb050b75fb940c25"}, + {file = "duckdb-0.8.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:81ae602f34d38d9c48dd60f94b89f28df3ef346830978441b83c5b4eae131d08"}, + {file = "duckdb-0.8.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7d75cfe563aaa058d3b4ccaaa371c6271e00e3070df5de72361fd161b2fe6780"}, + {file = "duckdb-0.8.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8dbb55e7a3336f2462e5e916fc128c47fe1c03b6208d6bd413ac11ed95132aa0"}, + {file = "duckdb-0.8.1-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a6df53efd63b6fdf04657385a791a4e3c4fb94bfd5db181c4843e2c46b04fef5"}, + {file = "duckdb-0.8.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1b188b80b70d1159b17c9baaf541c1799c1ce8b2af4add179a9eed8e2616be96"}, + {file = "duckdb-0.8.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:5ad481ee353f31250b45d64b4a104e53b21415577943aa8f84d0af266dc9af85"}, + {file = "duckdb-0.8.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d1d1b1729993611b1892509d21c21628917625cdbe824a61ce891baadf684b32"}, + {file = "duckdb-0.8.1-cp311-cp311-win32.whl", hash = "sha256:2d8f9cc301e8455a4f89aa1088b8a2d628f0c1f158d4cf9bc78971ed88d82eea"}, + {file = "duckdb-0.8.1-cp311-cp311-win_amd64.whl", hash = "sha256:07457a43605223f62d93d2a5a66b3f97731f79bbbe81fdd5b79954306122f612"}, + {file = "duckdb-0.8.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:d2c8062c3e978dbcd80d712ca3e307de8a06bd4f343aa457d7dd7294692a3842"}, + {file = "duckdb-0.8.1-cp36-cp36m-win32.whl", hash = "sha256:fad486c65ae944eae2de0d590a0a4fb91a9893df98411d66cab03359f9cba39b"}, + {file = "duckdb-0.8.1-cp36-cp36m-win_amd64.whl", hash = "sha256:86fa4506622c52d2df93089c8e7075f1c4d0ba56f4bf27faebde8725355edf32"}, + {file = "duckdb-0.8.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:60e07a62782f88420046e30cc0e3de842d0901c4fd5b8e4d28b73826ec0c3f5e"}, + {file = "duckdb-0.8.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f18563675977f8cbf03748efee0165b4c8ef64e0cbe48366f78e2914d82138bb"}, + {file = "duckdb-0.8.1-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:16e179443832bea8439ae4dff93cf1e42c545144ead7a4ef5f473e373eea925a"}, + {file = "duckdb-0.8.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a413d5267cb41a1afe69d30dd6d4842c588256a6fed7554c7e07dad251ede095"}, + {file = "duckdb-0.8.1-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:3784680df59eadd683b0a4c2375d451a64470ca54bd171c01e36951962b1d332"}, + {file = "duckdb-0.8.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:67a1725c2b01f9b53571ecf3f92959b652f60156c1c48fb35798302e39b3c1a2"}, + {file = "duckdb-0.8.1-cp37-cp37m-win32.whl", hash = "sha256:197d37e2588c5ad063e79819054eedb7550d43bf1a557d03ba8f8f67f71acc42"}, + {file = "duckdb-0.8.1-cp37-cp37m-win_amd64.whl", hash = "sha256:3843feb79edf100800f5037c32d5d5a5474fb94b32ace66c707b96605e7c16b2"}, + {file = "duckdb-0.8.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:624c889b0f2d656794757b3cc4fc58030d5e285f5ad2ef9fba1ea34a01dab7fb"}, + {file = "duckdb-0.8.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:fcbe3742d77eb5add2d617d487266d825e663270ef90253366137a47eaab9448"}, + {file = "duckdb-0.8.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:47516c9299d09e9dbba097b9fb339b389313c4941da5c54109df01df0f05e78c"}, + {file = "duckdb-0.8.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cf1ba718b7522d34399446ebd5d4b9fcac0b56b6ac07bfebf618fd190ec37c1d"}, + {file = "duckdb-0.8.1-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e36e35d38a9ae798fe8cf6a839e81494d5b634af89f4ec9483f4d0a313fc6bdb"}, + {file = "duckdb-0.8.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:23493313f88ce6e708a512daacad13e83e6d1ea0be204b175df1348f7fc78671"}, + {file = "duckdb-0.8.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:1fb9bf0b6f63616c8a4b9a6a32789045e98c108df100e6bac783dc1e36073737"}, + {file = "duckdb-0.8.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:12fc13ecd5eddd28b203b9e3999040d3a7374a8f4b833b04bd26b8c5685c2635"}, + {file = "duckdb-0.8.1-cp38-cp38-win32.whl", hash = "sha256:a12bf4b18306c9cb2c9ba50520317e6cf2de861f121d6f0678505fa83468c627"}, + {file = "duckdb-0.8.1-cp38-cp38-win_amd64.whl", hash = "sha256:e4e809358b9559c00caac4233e0e2014f3f55cd753a31c4bcbbd1b55ad0d35e4"}, + {file = "duckdb-0.8.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:7acedfc00d97fbdb8c3d120418c41ef3cb86ef59367f3a9a30dff24470d38680"}, + {file = "duckdb-0.8.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:99bfe264059cdc1e318769103f656f98e819cd4e231cd76c1d1a0327f3e5cef8"}, + {file = "duckdb-0.8.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:538b225f361066231bc6cd66c04a5561de3eea56115a5dd773e99e5d47eb1b89"}, + {file = "duckdb-0.8.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ae0be3f71a18cd8492d05d0fc1bc67d01d5a9457b04822d025b0fc8ee6efe32e"}, + {file = "duckdb-0.8.1-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:cd82ba63b58672e46c8ec60bc9946aa4dd7b77f21c1ba09633d8847ad9eb0d7b"}, + {file = "duckdb-0.8.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:780a34559aaec8354e83aa4b7b31b3555f1b2cf75728bf5ce11b89a950f5cdd9"}, + {file = "duckdb-0.8.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:01f0d4e9f7103523672bda8d3f77f440b3e0155dd3b2f24997bc0c77f8deb460"}, + {file = "duckdb-0.8.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:31f692decb98c2d57891da27180201d9e93bb470a3051fcf413e8da65bca37a5"}, + {file = "duckdb-0.8.1-cp39-cp39-win32.whl", hash = "sha256:e7fe93449cd309bbc67d1bf6f6392a6118e94a9a4479ab8a80518742e855370a"}, + {file = "duckdb-0.8.1-cp39-cp39-win_amd64.whl", hash = "sha256:81d670bc6807672f038332d9bf587037aabdd741b0810de191984325ed307abd"}, + {file = "duckdb-0.8.1.tar.gz", hash = "sha256:a54d37f4abc2afc4f92314aaa56ecf215a411f40af4bffe1e86bd25e62aceee9"}, +] + [[package]] name = "environs" version = "9.5.0" @@ -5591,4 +5653,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = "3.9.15" -content-hash = "109af95b92d54671ee4da9ddd322e3b03b5f9882ec38c9ace064afcbe37b5a2b" +content-hash = "3aa60ce2866418d5594a71e79a63dbd8e2bd3991c079c53bc055a7c584b3f69e" diff --git a/services/worker/pyproject.toml b/services/worker/pyproject.toml index f5feb93856..a2e6034dfc 100644 --- a/services/worker/pyproject.toml +++ b/services/worker/pyproject.toml @@ -12,6 +12,7 @@ aiohttp = "^3.8.4" aiolimiter = "^1.0.0" bs4 = "^0.0.1" conllu = "^4.5.2" +duckdb = "^0.8.1" environs = "^9.5.0" gdown = "^4.6.3" huggingface-hub = { git = "https://github.com/huggingface/huggingface_hub", rev = "1055a56b2d2723b55ba4fdf1f3296e04cfd8d6db" } diff --git a/services/worker/src/worker/config.py b/services/worker/src/worker/config.py index bc3fd39713..0a57557ed0 100644 --- a/services/worker/src/worker/config.py +++ b/services/worker/src/worker/config.py @@ -9,6 +9,7 @@ AssetsConfig, CacheConfig, CommonConfig, + DuckDbIndexConfig, LogConfig, ParquetMetadataConfig, ProcessingGraphConfig, @@ -249,6 +250,7 @@ class AppConfig: worker: WorkerConfig = field(default_factory=WorkerConfig) urls_scan: OptInOutUrlsScanConfig = field(default_factory=OptInOutUrlsScanConfig) parquet_metadata: ParquetMetadataConfig = field(default_factory=ParquetMetadataConfig) + duckdb_index: DuckDbIndexConfig = field(default_factory=DuckDbIndexConfig) @classmethod def from_env(cls) -> "AppConfig": @@ -267,4 +269,5 @@ def from_env(cls) -> "AppConfig": worker=WorkerConfig.from_env(), urls_scan=OptInOutUrlsScanConfig.from_env(), parquet_metadata=ParquetMetadataConfig.from_env(), + duckdb_index=DuckDbIndexConfig.from_env(), ) diff --git a/services/worker/src/worker/dtos.py b/services/worker/src/worker/dtos.py index ef4a4cc0b3..5eb630d1b3 100644 --- a/services/worker/src/worker/dtos.py +++ b/services/worker/src/worker/dtos.py @@ -4,6 +4,8 @@ from dataclasses import dataclass, field from typing import Any, Dict, List, Mapping, Optional, TypedDict +from libcommon.utils import SplitHubFile + class JobRunnerInfo(TypedDict): job_type: str @@ -110,14 +112,8 @@ class ConfigInfoResponse(TypedDict): dataset_info: Dict[str, Any] -class ParquetFileItem(SplitItem): - url: str - filename: str - size: int - - class ConfigParquetAndInfoResponse(TypedDict): - parquet_files: List[ParquetFileItem] + parquet_files: List[SplitHubFile] dataset_info: Dict[str, Any] @@ -134,7 +130,7 @@ class ConfigParquetMetadataResponse(TypedDict): class ConfigParquetResponse(TypedDict): - parquet_files: List[ParquetFileItem] + parquet_files: List[SplitHubFile] class ConfigSize(TypedDict): @@ -183,7 +179,7 @@ class DatasetIsValidResponse(TypedDict): class DatasetParquetResponse(TypedDict): - parquet_files: List[ParquetFileItem] + parquet_files: List[SplitHubFile] pending: list[PreviousJob] failed: list[PreviousJob] diff --git a/services/worker/src/worker/job_runner_factory.py b/services/worker/src/worker/job_runner_factory.py index 2352944c8e..87c48d9019 100644 --- a/services/worker/src/worker/job_runner_factory.py +++ b/services/worker/src/worker/job_runner_factory.py @@ -34,6 +34,7 @@ from worker.job_runners.dataset.parquet import DatasetParquetJobRunner from worker.job_runners.dataset.size import DatasetSizeJobRunner from worker.job_runners.dataset.split_names import DatasetSplitNamesJobRunner +from worker.job_runners.split.duckdb_index import SplitDuckDbIndexJobRunner from worker.job_runners.split.first_rows_from_parquet import ( SplitFirstRowsFromParquetJobRunner, ) @@ -73,6 +74,7 @@ class JobRunnerFactory(BaseJobRunnerFactory): hf_datasets_cache: Path assets_directory: StrPath parquet_metadata_directory: StrPath + duckdb_index_cache_directory: StrPath def _create_job_runner(self, job_info: JobInfo) -> JobRunner: job_type = job_info["type"] @@ -215,6 +217,14 @@ def _create_job_runner(self, job_info: JobInfo) -> JobRunner: processing_step=processing_step, ) + if job_type == SplitDuckDbIndexJobRunner.get_job_type(): + return SplitDuckDbIndexJobRunner( + job_info=job_info, + app_config=self.app_config, + processing_step=processing_step, + duckdb_index_cache_directory=self.duckdb_index_cache_directory, + ) + supported_job_types = [ DatasetConfigNamesJobRunner.get_job_type(), ConfigSplitNamesFromStreamingJobRunner.get_job_type(), @@ -234,5 +244,6 @@ def _create_job_runner(self, job_info: JobInfo) -> JobRunner: SplitOptInOutUrlsCountJobRunner.get_job_type(), ConfigOptInOutUrlsCountJobRunner.get_job_type(), DatasetOptInOutUrlsCountJobRunner.get_job_type(), + SplitDuckDbIndexJobRunner.get_job_type(), ] raise ValueError(f"Unsupported job type: '{job_type}'. The supported job types are: {supported_job_types}") diff --git a/services/worker/src/worker/job_runners/config/parquet_and_info.py b/services/worker/src/worker/job_runners/config/parquet_and_info.py index 8df7ea7a67..8e82aa947b 100644 --- a/services/worker/src/worker/job_runners/config/parquet_and_info.py +++ b/services/worker/src/worker/job_runners/config/parquet_and_info.py @@ -8,7 +8,7 @@ from multiprocessing.pool import ThreadPool from pathlib import Path from typing import Any, List, Optional, Set, Tuple -from urllib.parse import quote, unquote +from urllib.parse import unquote import datasets import datasets.config @@ -71,13 +71,13 @@ from libcommon.processing_graph import ProcessingStep from libcommon.queue import lock from libcommon.simple_cache import get_previous_step_or_raise -from libcommon.utils import JobInfo +from libcommon.utils import JobInfo, SplitHubFile from tqdm.contrib.concurrent import thread_map from worker.config import AppConfig, ParquetAndInfoConfig -from worker.dtos import CompleteJobResult, ConfigParquetAndInfoResponse, ParquetFileItem +from worker.dtos import CompleteJobResult, ConfigParquetAndInfoResponse from worker.job_runners.config.config_job_runner import ConfigJobRunnerWithDatasetsCache -from worker.utils import retry +from worker.utils import LOCK_GIT_BRANCH_RETRY_SLEEPS, create_branch, hf_hub_url, retry DATASET_TYPE = "dataset" MAX_FILES_PER_DIRECTORY = 10_000 # hf hub limitation @@ -97,12 +97,6 @@ def path_in_repo(self) -> str: return f'{self.config}/{self.local_file.removeprefix(f"{self.local_dir}/")}' -# TODO: use huggingface_hub's hf_hub_url after -# https://github.com/huggingface/huggingface_hub/issues/1082 -def hf_hub_url(repo_id: str, filename: str, hf_endpoint: str, revision: str, url_template: str) -> str: - return (hf_endpoint + url_template) % (repo_id, quote(revision, safe=""), filename) - - p = re.compile(r"(?P[\w-]+?)-(?P\w+(\.\w+)*?)(-[0-9]{5}-of-[0-9]{5})?.parquet") @@ -125,7 +119,7 @@ def create_parquet_file_item( hf_endpoint: str, target_revision: str, url_template: str, -) -> ParquetFileItem: +) -> SplitHubFile: if repo_file.size is None: raise ValueError(f"Cannot get size of {repo_file.rfilename}") _, split = parse_repo_filename(repo_file.rfilename) @@ -1034,23 +1028,19 @@ def compute_config_parquet_and_info_response( parquet_operations = convert_to_parquet(builder) try: - sleeps = [1, 1, 1, 1, 1, 10, 10, 10, 10, 100] * 3 # ^ timeouts after ~7 minutes - with lock.git_branch(dataset=dataset, branch=target_revision, owner=job_id, sleeps=sleeps): + with lock.git_branch( + dataset=dataset, branch=target_revision, owner=job_id, sleeps=LOCK_GIT_BRANCH_RETRY_SLEEPS + ): # create the target revision if we managed to get the parquet files and it does not exist yet # (clone from initial commit to avoid cloning all repo's files) - refs = retry(on=[requests.exceptions.ConnectionError], sleeps=[1, 1, 1, 10, 10])(hf_api.list_repo_refs)( - repo_id=dataset, repo_type=DATASET_TYPE + create_branch( + dataset=dataset, + target_revision=target_revision, + hf_api=hf_api, + committer_hf_api=committer_hf_api, ) - if all(ref.ref != target_revision for ref in refs.converts): - initial_commit = hf_api.list_repo_commits(repo_id=dataset, repo_type=DATASET_TYPE)[-1].commit_id - committer_hf_api.create_branch( - repo_id=dataset, - branch=target_revision, - repo_type=DATASET_TYPE, - revision=initial_commit, - exist_ok=True, - ) + # commit the parquet files commit_parquet_conversion( hf_api=hf_api, diff --git a/services/worker/src/worker/job_runners/config/parquet_metadata.py b/services/worker/src/worker/job_runners/config/parquet_metadata.py index 29a34f9a3f..55de2fe3da 100644 --- a/services/worker/src/worker/job_runners/config/parquet_metadata.py +++ b/services/worker/src/worker/job_runners/config/parquet_metadata.py @@ -5,7 +5,6 @@ from functools import partial from typing import List, Optional -from datasets.utils.file_utils import get_authentication_headers_for_url from fsspec.implementations.http import HTTPFileSystem from libcommon.constants import PROCESSING_STEP_CONFIG_PARQUET_METADATA_VERSION from libcommon.exceptions import ( @@ -16,7 +15,7 @@ from libcommon.processing_graph import ProcessingStep from libcommon.simple_cache import get_previous_step_or_raise from libcommon.storage import StrPath -from libcommon.utils import JobInfo +from libcommon.utils import JobInfo, SplitHubFile from libcommon.viewer_utils.parquet_metadata import create_parquet_metadata_file from pyarrow.parquet import ParquetFile from tqdm.contrib.concurrent import thread_map @@ -25,15 +24,10 @@ from worker.dtos import ( CompleteJobResult, ConfigParquetMetadataResponse, - ParquetFileItem, ParquetFileMetadataItem, ) from worker.job_runners.config.config_job_runner import ConfigJobRunner - - -def get_parquet_file(url: str, fs: HTTPFileSystem, hf_token: Optional[str]) -> ParquetFile: - headers = get_authentication_headers_for_url(url, use_auth_token=hf_token) - return ParquetFile(fs.open(url, headers=headers)) +from worker.utils import get_parquet_file def compute_parquet_metadata_response( @@ -70,7 +64,7 @@ def compute_parquet_metadata_response( config_parquet_best_response = get_previous_step_or_raise(kinds=["config-parquet"], dataset=dataset, config=config) try: parquet_files_content = config_parquet_best_response.response["content"]["parquet_files"] - parquet_file_items: List[ParquetFileItem] = [ + parquet_file_items: List[SplitHubFile] = [ parquet_file_item for parquet_file_item in parquet_files_content if parquet_file_item["config"] == config ] if not parquet_file_items: diff --git a/services/worker/src/worker/job_runners/dataset/parquet.py b/services/worker/src/worker/job_runners/dataset/parquet.py index 9ba4eddc61..000388ff39 100644 --- a/services/worker/src/worker/job_runners/dataset/parquet.py +++ b/services/worker/src/worker/job_runners/dataset/parquet.py @@ -12,12 +12,12 @@ get_previous_step_or_raise, get_response, ) +from libcommon.utils import SplitHubFile from worker.dtos import ( ConfigParquetResponse, DatasetParquetResponse, JobResult, - ParquetFileItem, PreviousJob, ) from worker.job_runners.dataset.dataset_job_runner import DatasetJobRunner @@ -46,7 +46,7 @@ def compute_sizes_response(dataset: str) -> Tuple[DatasetParquetResponse, float] raise PreviousStepFormatError("Previous step did not return the expected content: 'config_names'.") try: - parquet_files: list[ParquetFileItem] = [] + parquet_files: list[SplitHubFile] = [] total = 0 pending = [] failed = [] diff --git a/services/worker/src/worker/job_runners/split/duckdb_index.py b/services/worker/src/worker/job_runners/split/duckdb_index.py new file mode 100644 index 0000000000..b715de6095 --- /dev/null +++ b/services/worker/src/worker/job_runners/split/duckdb_index.py @@ -0,0 +1,265 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2023 The HuggingFace Authors. + +import logging +from pathlib import Path +from typing import List, Optional, Set + +import duckdb +from huggingface_hub._commit_api import ( + CommitOperation, + CommitOperationAdd, + CommitOperationDelete, +) +from huggingface_hub.hf_api import HfApi +from huggingface_hub.utils._errors import RepositoryNotFoundError +from libcommon.config import DuckDbIndexConfig +from libcommon.constants import PROCESSING_STEP_SPLIT_DUCKDB_INDEX_VERSION +from libcommon.exceptions import ( + CacheDirectoryNotInitializedError, + DatasetNotFoundError, + DuckDBIndexFileNotFoundError, + LockedDatasetTimeoutError, + NoIndexableColumnsError, + ParquetResponseEmptyError, + PreviousStepFormatError, + SplitNotFoundError, + SplitWithTooBigParquetError, +) +from libcommon.processing_graph import ProcessingStep +from libcommon.queue import lock +from libcommon.simple_cache import get_previous_step_or_raise +from libcommon.storage import StrPath +from libcommon.utils import JobInfo, SplitHubFile + +from worker.config import AppConfig +from worker.dtos import CompleteJobResult +from worker.job_runners.split.split_job_runner import SplitJobRunnerWithCache +from worker.utils import LOCK_GIT_BRANCH_RETRY_SLEEPS, create_branch, hf_hub_url + +DATASET_TYPE = "dataset" +STRING_FEATURE_DTYPE = "string" +VALUE_FEATURE_TYPE = "Value" +DUCKDB_DEFAULT_INDEX_FILENAME = "index.duckdb" +CREATE_SEQUENCE_COMMAND = "CREATE OR REPLACE SEQUENCE serial START 1;" +CREATE_INDEX_COMMAND = "PRAGMA create_fts_index('data', '__hf_index_id', '*', overwrite=1);" +CREATE_TABLE_COMMAND = "CREATE OR REPLACE TABLE data AS SELECT nextval('serial') AS __hf_index_id, {columns} FROM" +INSTALL_EXTENSION_COMMAND = "INSTALL '{extension}';" +LOAD_EXTENSION_COMMAND = "LOAD '{extension}';" + + +def compute_index_rows( + job_id: str, + dataset: str, + config: str, + split: str, + duckdb_index_file_directory: Path, + target_revision: str, + hf_endpoint: str, + commit_message: str, + url_template: str, + hf_token: Optional[str], + max_parquet_size_bytes: int, + committer_hf_token: Optional[str], +) -> SplitHubFile: + logging.info(f"get split-duckdb-index for dataset={dataset} config={config} split={split}") + + # validate split + split_names_best_response = get_previous_step_or_raise( + kinds=["config-split-names-from-streaming", "config-split-names-from-info"], dataset=dataset, config=config + ) + try: + splits_content = split_names_best_response.response["content"]["splits"] + except Exception as e: + raise PreviousStepFormatError("Previous step did not return the expected content.", e) from e + + if split not in [split_item["split"] for split_item in splits_content]: + raise SplitNotFoundError(f"The split '{split}' does not exist for the config '{config}' of the dataset.") + + # get parquet urls and dataset_info + config_parquet_and_info_step = "config-parquet-and-info" + parquet_and_info_best_response = get_previous_step_or_raise( + kinds=[config_parquet_and_info_step], + dataset=dataset, + config=config, + ) + content_parquet_and_info = parquet_and_info_best_response.response["content"] + try: + split_parquet_files = [ + parquet_file + for parquet_file in content_parquet_and_info["parquet_files"] + if parquet_file["config"] == config and parquet_file["split"] == split + ] + + split_parquets_size = sum(parquet_file["size"] for parquet_file in split_parquet_files) + + if split_parquets_size > max_parquet_size_bytes: + raise SplitWithTooBigParquetError( + f"The indexing is limited to split parquets under {max_parquet_size_bytes} bytes. " + f"Current size of sum of split parquets is {split_parquets_size} bytes." + ) + + parquet_urls = [parquet_file["url"] for parquet_file in split_parquet_files] + + if not parquet_urls: + raise ParquetResponseEmptyError("No parquet files found.") + + # get the features + features = content_parquet_and_info["dataset_info"]["features"] + column_names = ",".join(list(features.keys())) + + # look for string columns + string_columns = [ + column + for column, feature in features.items() + if "dtype" in feature + and "_type" in feature + and feature["dtype"] == STRING_FEATURE_DTYPE + and feature["_type"] == VALUE_FEATURE_TYPE + ] + if not string_columns: + raise NoIndexableColumnsError("No string columns available to index.") + + except KeyError as e: + raise PreviousStepFormatError( + f"Previous step '{config_parquet_and_info_step}' did not return the expected content.", e + ) from e + + # configure duckdb extensions + duckdb.execute(INSTALL_EXTENSION_COMMAND.format(extension="httpfs")) + duckdb.execute(LOAD_EXTENSION_COMMAND.format(extension="httpfs")) + duckdb.execute(INSTALL_EXTENSION_COMMAND.format(extension="fts")) + duckdb.execute(LOAD_EXTENSION_COMMAND.format(extension="fts")) + + # index all columns + db_path = duckdb_index_file_directory.resolve() / DUCKDB_DEFAULT_INDEX_FILENAME + + con = duckdb.connect(str(db_path.resolve())) + logging.debug(CREATE_SEQUENCE_COMMAND) + con.sql(CREATE_SEQUENCE_COMMAND) + + create_command_sql = f"{CREATE_TABLE_COMMAND.format(columns=column_names)} read_parquet({parquet_urls});" + logging.debug(create_command_sql) + con.sql(create_command_sql) + + # TODO: by default, 'porter' stemmer is being used, use a specific one by dataset language in the future + # see https://duckdb.org/docs/extensions/full_text_search.html for more details about 'stemmer' parameter + logging.debug(CREATE_INDEX_COMMAND) + con.sql(CREATE_INDEX_COMMAND) + con.close() + + hf_api = HfApi(endpoint=hf_endpoint, token=hf_token) + committer_hf_api = HfApi(endpoint=hf_endpoint, token=committer_hf_token) + index_file_location = f"{config}/{split}/{DUCKDB_DEFAULT_INDEX_FILENAME}" + + try: + with lock.git_branch( + dataset=dataset, branch=target_revision, owner=job_id, sleeps=LOCK_GIT_BRANCH_RETRY_SLEEPS + ): + create_branch( + dataset=dataset, + target_revision=target_revision, + hf_api=hf_api, + committer_hf_api=committer_hf_api, + ) + + target_dataset_info = hf_api.dataset_info(repo_id=dataset, revision=target_revision, files_metadata=False) + all_repo_files: Set[str] = {f.rfilename for f in target_dataset_info.siblings} + delete_operations: List[CommitOperation] = [] + if index_file_location in all_repo_files: + delete_operations.append(CommitOperationDelete(path_in_repo=index_file_location)) + + # send the files to the target revision + add_operations: List[CommitOperation] = [ + CommitOperationAdd(path_in_repo=index_file_location, path_or_fileobj=db_path.resolve()) + ] + + committer_hf_api.create_commit( + repo_id=dataset, + repo_type=DATASET_TYPE, + revision=target_revision, + operations=delete_operations + add_operations, + commit_message=commit_message, + parent_commit=target_dataset_info.sha, + ) + + # call the API again to get the index file + target_dataset_info = hf_api.dataset_info(repo_id=dataset, revision=target_revision, files_metadata=True) + except TimeoutError as err: + raise LockedDatasetTimeoutError("the dataset is currently locked, please try again later.") from err + except RepositoryNotFoundError as err: + raise DatasetNotFoundError("The dataset does not exist on the Hub.") from err + + repo_files = [ + repo_file for repo_file in target_dataset_info.siblings if repo_file.rfilename == index_file_location + ] + + if not repo_files or len(repo_files) != 1: + logging.warning(f"Found {len(repo_files)} index files, should be only 1") + raise DuckDBIndexFileNotFoundError("No index file was found") + + repo_file = repo_files[0] + if repo_file.size is None: + raise ValueError(f"Cannot get size of {repo_file.rfilename}") + + return SplitHubFile( + dataset=dataset, + config=config, + split=split, + url=hf_hub_url( + repo_id=dataset, + filename=repo_file.rfilename, + hf_endpoint=hf_endpoint, + revision=target_revision, + url_template=url_template, + ), + filename=Path(repo_file.rfilename).name, + size=repo_file.size, + ) + + +class SplitDuckDbIndexJobRunner(SplitJobRunnerWithCache): + duckdb_index_config: DuckDbIndexConfig + + def __init__( + self, + job_info: JobInfo, + app_config: AppConfig, + processing_step: ProcessingStep, + duckdb_index_cache_directory: StrPath, + ) -> None: + super().__init__( + job_info=job_info, + app_config=app_config, + processing_step=processing_step, + cache_directory=Path(duckdb_index_cache_directory), + ) + self.duckdb_index_config = app_config.duckdb_index + + @staticmethod + def get_job_type() -> str: + return "split-duckdb-index" + + @staticmethod + def get_job_runner_version() -> int: + return PROCESSING_STEP_SPLIT_DUCKDB_INDEX_VERSION + + def compute(self) -> CompleteJobResult: + if self.cache_subdirectory is None: + raise CacheDirectoryNotInitializedError("Cache directory has not been initialized.") + return CompleteJobResult( + compute_index_rows( + job_id=self.job_info["job_id"], + dataset=self.dataset, + config=self.config, + split=self.split, + duckdb_index_file_directory=self.cache_subdirectory, + hf_token=self.app_config.common.hf_token, + url_template=self.duckdb_index_config.url_template, + commit_message=self.duckdb_index_config.commit_message, + committer_hf_token=self.duckdb_index_config.committer_hf_token, + hf_endpoint=self.app_config.common.hf_endpoint, + target_revision=self.duckdb_index_config.target_revision, + max_parquet_size_bytes=self.duckdb_index_config.max_parquet_size_bytes, + ) + ) diff --git a/services/worker/src/worker/main.py b/services/worker/src/worker/main.py index da297ccc67..4d207280e5 100644 --- a/services/worker/src/worker/main.py +++ b/services/worker/src/worker/main.py @@ -6,7 +6,11 @@ from libcommon.log import init_logging from libcommon.processing_graph import ProcessingGraph from libcommon.resources import CacheMongoResource, QueueMongoResource -from libcommon.storage import init_assets_dir, init_parquet_metadata_dir +from libcommon.storage import ( + init_assets_dir, + init_duckdb_index_cache_dir, + init_parquet_metadata_dir, +) from worker.config import AppConfig from worker.executor import WorkerExecutor @@ -27,6 +31,7 @@ # ^ set first to have logs as soon as possible assets_directory = init_assets_dir(directory=app_config.assets.storage_directory) parquet_metadata_directory = init_parquet_metadata_dir(directory=app_config.parquet_metadata.storage_directory) + duckdb_index_cache_directory = init_duckdb_index_cache_dir(directory=app_config.duckdb_index.storage_directory) processing_graph = ProcessingGraph(app_config.processing_graph.specification) @@ -54,6 +59,7 @@ hf_datasets_cache=libraries_resource.hf_datasets_cache, assets_directory=assets_directory, parquet_metadata_directory=parquet_metadata_directory, + duckdb_index_cache_directory=duckdb_index_cache_directory, ) worker_executor = WorkerExecutor( app_config=app_config, diff --git a/services/worker/src/worker/start_worker_loop.py b/services/worker/src/worker/start_worker_loop.py index 5aa4246d76..d5e69a829c 100644 --- a/services/worker/src/worker/start_worker_loop.py +++ b/services/worker/src/worker/start_worker_loop.py @@ -6,7 +6,11 @@ from libcommon.log import init_logging from libcommon.processing_graph import ProcessingGraph from libcommon.resources import CacheMongoResource, QueueMongoResource -from libcommon.storage import init_assets_dir, init_parquet_metadata_dir +from libcommon.storage import ( + init_assets_dir, + init_duckdb_index_cache_dir, + init_parquet_metadata_dir, +) from worker.config import AppConfig from worker.job_runner_factory import JobRunnerFactory @@ -26,6 +30,7 @@ # ^ set first to have logs as soon as possible assets_directory = init_assets_dir(directory=app_config.assets.storage_directory) parquet_metadata_directory = init_parquet_metadata_dir(directory=app_config.parquet_metadata.storage_directory) + duckdb_index_cache_directory = init_duckdb_index_cache_dir(directory=app_config.duckdb_index.storage_directory) processing_graph = ProcessingGraph(app_config.processing_graph.specification) @@ -53,6 +58,7 @@ hf_datasets_cache=libraries_resource.hf_datasets_cache, assets_directory=assets_directory, parquet_metadata_directory=parquet_metadata_directory, + duckdb_index_cache_directory=duckdb_index_cache_directory, ) loop = Loop( library_cache_paths=libraries_resource.storage_paths, diff --git a/services/worker/src/worker/utils.py b/services/worker/src/worker/utils.py index fab6b0316e..c3b985d8b9 100644 --- a/services/worker/src/worker/utils.py +++ b/services/worker/src/worker/utils.py @@ -18,8 +18,10 @@ Union, cast, ) +from urllib.parse import quote import PIL +import requests from datasets import ( Dataset, DatasetInfo, @@ -28,8 +30,17 @@ IterableDataset, load_dataset, ) -from libcommon.exceptions import NormalRowsError, StreamingRowsError +from datasets.utils.file_utils import get_authentication_headers_for_url +from fsspec.implementations.http import HTTPFileSystem +from huggingface_hub.hf_api import HfApi +from huggingface_hub.utils._errors import RepositoryNotFoundError +from libcommon.exceptions import ( + DatasetNotFoundError, + NormalRowsError, + StreamingRowsError, +) from libcommon.utils import orjson_dumps +from pyarrow.parquet import ParquetFile from worker.dtos import FeatureItem, Row, RowItem, RowsContent @@ -310,3 +321,34 @@ def get_rows_or_raise( "Cannot load the dataset split (in normal download mode) to extract the first rows.", cause=err, ) from err + + +# TODO: use huggingface_hub's hf_hub_url after +# https://github.com/huggingface/huggingface_hub/issues/1082 +def hf_hub_url(repo_id: str, filename: str, hf_endpoint: str, revision: str, url_template: str) -> str: + return (hf_endpoint + url_template) % (repo_id, quote(revision, safe=""), filename) + + +def get_parquet_file(url: str, fs: HTTPFileSystem, hf_token: Optional[str]) -> ParquetFile: + headers = get_authentication_headers_for_url(url, use_auth_token=hf_token) + return ParquetFile(fs.open(url, headers=headers)) + + +DATASET_TYPE = "dataset" + +LIST_REPO_REFS_RETRY_SLEEPS = [1, 1, 1, 10, 10] +LOCK_GIT_BRANCH_RETRY_SLEEPS = [1, 1, 1, 1, 1, 10, 10, 10, 10, 100] * 3 + + +def create_branch(dataset: str, target_revision: str, hf_api: HfApi, committer_hf_api: HfApi) -> None: + try: + refs = retry(on=[requests.exceptions.ConnectionError], sleeps=LIST_REPO_REFS_RETRY_SLEEPS)( + hf_api.list_repo_refs + )(repo_id=dataset, repo_type=DATASET_TYPE) + if all(ref.ref != target_revision for ref in refs.converts): + initial_commit = hf_api.list_repo_commits(repo_id=dataset, repo_type=DATASET_TYPE)[-1].commit_id + committer_hf_api.create_branch( + repo_id=dataset, branch=target_revision, repo_type=DATASET_TYPE, revision=initial_commit, exist_ok=True + ) + except RepositoryNotFoundError as err: + raise DatasetNotFoundError("The dataset does not exist on the Hub (was deleted during job).") from err diff --git a/services/worker/tests/conftest.py b/services/worker/tests/conftest.py index 851a99c66e..fab2648725 100644 --- a/services/worker/tests/conftest.py +++ b/services/worker/tests/conftest.py @@ -8,7 +8,12 @@ from libcommon.queue import _clean_queue_database from libcommon.resources import CacheMongoResource, QueueMongoResource from libcommon.simple_cache import _clean_cache_database -from libcommon.storage import StrPath, init_assets_dir, init_parquet_metadata_dir +from libcommon.storage import ( + StrPath, + init_assets_dir, + init_duckdb_index_cache_dir, + init_parquet_metadata_dir, +) from pytest import MonkeyPatch, fixture from worker.config import AppConfig @@ -65,6 +70,7 @@ def set_env_vars( mp.setenv("PARQUET_AND_INFO_MAX_DATASET_SIZE", "10_000") mp.setenv("PARQUET_AND_INFO_MAX_EXTERNAL_DATA_FILES", "10") mp.setenv("PARQUET_AND_INFO_COMMITTER_HF_TOKEN", CI_PARQUET_CONVERTER_APP_TOKEN) + mp.setenv("DUCKDB_INDEX_COMMITTER_HF_TOKEN", CI_PARQUET_CONVERTER_APP_TOKEN) mp.setenv("DATASETS_BASED_HF_DATASETS_CACHE", str(datasets_cache_directory)) mp.setenv("HF_MODULES_CACHE", str(modules_cache_directory)) mp.setenv("WORKER_CONTENT_MAX_BYTES", "10_000_000") @@ -119,6 +125,11 @@ def parquet_metadata_directory(app_config: AppConfig) -> StrPath: return init_parquet_metadata_dir(app_config.parquet_metadata.storage_directory) +@fixture +def duckdb_index_cache_directory(app_config: AppConfig) -> StrPath: + return init_duckdb_index_cache_dir(app_config.duckdb_index.storage_directory) + + @fixture def test_processing_graph() -> ProcessingGraph: return ProcessingGraph( diff --git a/services/worker/tests/fixtures/datasets.py b/services/worker/tests/fixtures/datasets.py index 18edaad1f1..6e987e20e5 100644 --- a/services/worker/tests/fixtures/datasets.py +++ b/services/worker/tests/fixtures/datasets.py @@ -143,4 +143,21 @@ def datasets() -> Mapping[str, Dataset]: dtype=pd.StringDtype(storage="python"), ) ), + "duckdb_index": Dataset.from_pandas( + pd.DataFrame( + { + "text": [ + ( + "Grand Moff Tarkin and Lord Vader are interrupted in their discussion by the buzz of the" + " comlink" + ), + "There goes another one.", + "Vader turns round and round in circles as his ship spins into space.", + "We count thirty Rebel ships, Lord Vader.", + "The wingman spots the pirateship coming at him and warns the Dark Lord", + ] + }, + dtype=pd.StringDtype(storage="python"), + ) + ), } diff --git a/services/worker/tests/fixtures/hub.py b/services/worker/tests/fixtures/hub.py index f34ca587ff..11e9d9a054 100644 --- a/services/worker/tests/fixtures/hub.py +++ b/services/worker/tests/fixtures/hub.py @@ -281,6 +281,13 @@ def hub_public_spawning_opt_in_out(datasets: Mapping[str, Dataset]) -> Iterator[ delete_hub_dataset_repo(repo_id=repo_id) +@pytest.fixture(scope="session") +def hub_public_duckdb_index(datasets: Mapping[str, Dataset]) -> Iterator[str]: + repo_id = create_hub_dataset_repo(prefix="duckdb_index", dataset=datasets["duckdb_index"]) + yield repo_id + delete_hub_dataset_repo(repo_id=repo_id) + + class HubDatasetTest(TypedDict): name: str config_names_response: Any @@ -764,3 +771,16 @@ def hub_reponses_spawning_opt_in_out(hub_public_spawning_opt_in_out: str) -> Hub ), "parquet_and_info_response": None, } + + +@pytest.fixture +def hub_responses_duckdb_index(hub_public_duckdb_index: str) -> HubDatasetTest: + return { + "name": hub_public_duckdb_index, + "config_names_response": create_config_names_response(hub_public_duckdb_index), + "splits_response": create_splits_response(hub_public_duckdb_index), + "first_rows_response": create_first_rows_response(hub_public_duckdb_index, TEXT_cols, TEXT_rows), + "parquet_and_info_response": create_parquet_and_info_response( + dataset=hub_public_duckdb_index, data_type="csv" + ), + } diff --git a/services/worker/tests/job_runners/config/test_parquet.py b/services/worker/tests/job_runners/config/test_parquet.py index 031fee96da..e314000ac1 100644 --- a/services/worker/tests/job_runners/config/test_parquet.py +++ b/services/worker/tests/job_runners/config/test_parquet.py @@ -9,14 +9,10 @@ from libcommon.processing_graph import ProcessingGraph from libcommon.resources import CacheMongoResource, QueueMongoResource from libcommon.simple_cache import CachedArtifactError, upsert_response -from libcommon.utils import Priority +from libcommon.utils import Priority, SplitHubFile from worker.config import AppConfig -from worker.dtos import ( - ConfigParquetAndInfoResponse, - ConfigParquetResponse, - ParquetFileItem, -) +from worker.dtos import ConfigParquetAndInfoResponse, ConfigParquetResponse from worker.job_runners.config.parquet import ConfigParquetJobRunner @@ -78,10 +74,10 @@ def _get_job_runner( HTTPStatus.OK, ConfigParquetAndInfoResponse( parquet_files=[ - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_1", split="train", url="url1", filename="filename1", size=0 ), - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_1", split="train", url="url2", filename="filename2", size=0 ), ], @@ -90,10 +86,10 @@ def _get_job_runner( None, ConfigParquetResponse( parquet_files=[ - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_1", split="train", url="url1", filename="filename1", size=0 ), - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_1", split="train", url="url2", filename="filename2", size=0 ), ] diff --git a/services/worker/tests/job_runners/config/test_parquet_metadata.py b/services/worker/tests/job_runners/config/test_parquet_metadata.py index 613d616a76..96f1cfbc7e 100644 --- a/services/worker/tests/job_runners/config/test_parquet_metadata.py +++ b/services/worker/tests/job_runners/config/test_parquet_metadata.py @@ -19,13 +19,12 @@ from libcommon.resources import CacheMongoResource, QueueMongoResource from libcommon.simple_cache import CachedArtifactError, upsert_response from libcommon.storage import StrPath -from libcommon.utils import Priority +from libcommon.utils import Priority, SplitHubFile from worker.config import AppConfig from worker.dtos import ( ConfigParquetMetadataResponse, ConfigParquetResponse, - ParquetFileItem, ParquetFileMetadataItem, ) from worker.job_runners.config.parquet_metadata import ConfigParquetMetadataJobRunner @@ -97,10 +96,10 @@ def _get_job_runner( HTTPStatus.OK, ConfigParquetResponse( parquet_files=[ - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_1", split="train", url="url1", filename="filename1", size=0 ), - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_1", split="train", url="url2", filename="filename2", size=0 ), ], diff --git a/services/worker/tests/job_runners/dataset/test_parquet.py b/services/worker/tests/job_runners/dataset/test_parquet.py index 61cfbfbc77..8f63b188b0 100644 --- a/services/worker/tests/job_runners/dataset/test_parquet.py +++ b/services/worker/tests/job_runners/dataset/test_parquet.py @@ -9,10 +9,10 @@ from libcommon.processing_graph import ProcessingGraph from libcommon.resources import CacheMongoResource, QueueMongoResource from libcommon.simple_cache import CachedArtifactError, upsert_response -from libcommon.utils import Priority +from libcommon.utils import Priority, SplitHubFile from worker.config import AppConfig -from worker.dtos import ConfigParquetResponse, DatasetParquetResponse, ParquetFileItem +from worker.dtos import ConfigParquetResponse, DatasetParquetResponse from worker.job_runners.dataset.parquet import DatasetParquetJobRunner from ..utils import UpstreamResponse @@ -89,7 +89,7 @@ def _get_job_runner( http_status=HTTPStatus.OK, content=ConfigParquetResponse( parquet_files=[ - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_1", split="train", @@ -107,7 +107,7 @@ def _get_job_runner( http_status=HTTPStatus.OK, content=ConfigParquetResponse( parquet_files=[ - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_2", split="train", @@ -122,10 +122,10 @@ def _get_job_runner( None, DatasetParquetResponse( parquet_files=[ - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_1", split="train", url="url1", filename="filename1", size=0 ), - ParquetFileItem( + SplitHubFile( dataset="ok", config="config_2", split="train", url="url2", filename="filename2", size=0 ), ], diff --git a/services/worker/tests/job_runners/split/test_duckdb_index.py b/services/worker/tests/job_runners/split/test_duckdb_index.py new file mode 100644 index 0000000000..a8fe40cfc3 --- /dev/null +++ b/services/worker/tests/job_runners/split/test_duckdb_index.py @@ -0,0 +1,242 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2023 The HuggingFace Authors. + +import os +from dataclasses import replace +from http import HTTPStatus +from typing import Callable, Optional + +import duckdb +import pytest +import requests +from libcommon.processing_graph import ProcessingGraph +from libcommon.resources import CacheMongoResource, QueueMongoResource +from libcommon.simple_cache import upsert_response +from libcommon.storage import StrPath +from libcommon.utils import Priority + +from worker.config import AppConfig +from worker.job_runners.config.parquet_and_info import ConfigParquetAndInfoJobRunner +from worker.job_runners.split.duckdb_index import SplitDuckDbIndexJobRunner +from worker.resources import LibrariesResource + +from ...fixtures.hub import HubDatasetTest + +GetJobRunner = Callable[[str, str, str, AppConfig], SplitDuckDbIndexJobRunner] + +GetParquetJobRunner = Callable[[str, str, AppConfig], ConfigParquetAndInfoJobRunner] + + +@pytest.fixture +def get_job_runner( + duckdb_index_cache_directory: StrPath, + cache_mongo_resource: CacheMongoResource, + queue_mongo_resource: QueueMongoResource, +) -> GetJobRunner: + def _get_job_runner( + dataset: str, + config: str, + split: str, + app_config: AppConfig, + ) -> SplitDuckDbIndexJobRunner: + processing_step_name = SplitDuckDbIndexJobRunner.get_job_type() + processing_graph = ProcessingGraph( + { + "dataset-step": {"input_type": "dataset"}, + "config-parquet": { + "input_type": "config", + "triggered_by": "dataset-step", + "provides_config_parquet": True, + }, + "config-split-names-from-streaming": { + "input_type": "config", + "triggered_by": "dataset-step", + }, + processing_step_name: { + "input_type": "dataset", + "job_runner_version": SplitDuckDbIndexJobRunner.get_job_runner_version(), + "triggered_by": ["config-parquet", "config-split-names-from-streaming"], + }, + } + ) + return SplitDuckDbIndexJobRunner( + job_info={ + "type": SplitDuckDbIndexJobRunner.get_job_type(), + "params": { + "dataset": dataset, + "revision": "revision", + "config": config, + "split": split, + }, + "job_id": "job_id", + "priority": Priority.NORMAL, + }, + app_config=app_config, + processing_step=processing_graph.get_processing_step(processing_step_name), + duckdb_index_cache_directory=duckdb_index_cache_directory, + ) + + return _get_job_runner + + +@pytest.fixture +def get_parquet_job_runner( + libraries_resource: LibrariesResource, + cache_mongo_resource: CacheMongoResource, + queue_mongo_resource: QueueMongoResource, +) -> GetParquetJobRunner: + def _get_job_runner( + dataset: str, + config: str, + app_config: AppConfig, + ) -> ConfigParquetAndInfoJobRunner: + processing_step_name = ConfigParquetAndInfoJobRunner.get_job_type() + processing_graph = ProcessingGraph( + { + "dataset-level": {"input_type": "dataset"}, + processing_step_name: { + "input_type": "config", + "job_runner_version": ConfigParquetAndInfoJobRunner.get_job_runner_version(), + "triggered_by": "dataset-level", + }, + } + ) + return ConfigParquetAndInfoJobRunner( + job_info={ + "type": ConfigParquetAndInfoJobRunner.get_job_type(), + "params": { + "dataset": dataset, + "revision": "revision", + "config": config, + "split": None, + }, + "job_id": "job_id", + "priority": Priority.NORMAL, + }, + app_config=app_config, + processing_step=processing_graph.get_processing_step(processing_step_name), + hf_datasets_cache=libraries_resource.hf_datasets_cache, + ) + + return _get_job_runner + + +@pytest.mark.parametrize( + "hub_dataset_name,max_parquet_size_bytes,expected_error_code", + [ + ("duckdb_index", None, None), + ("duckdb_index", 1_000, "SplitWithTooBigParquetError"), # parquet size is 2812 + ("public", None, "NoIndexableColumnsError"), # dataset does not have string columns to index + ], +) +def test_compute( + get_parquet_job_runner: GetParquetJobRunner, + get_job_runner: GetJobRunner, + app_config: AppConfig, + hub_responses_public: HubDatasetTest, + hub_responses_duckdb_index: HubDatasetTest, + hub_dataset_name: str, + max_parquet_size_bytes: Optional[int], + expected_error_code: str, +) -> None: + hub_datasets = {"public": hub_responses_public, "duckdb_index": hub_responses_duckdb_index} + dataset = hub_datasets[hub_dataset_name]["name"] + config_names = hub_datasets[hub_dataset_name]["config_names_response"] + config = hub_datasets[hub_dataset_name]["config_names_response"]["config_names"][0]["config"] + splits_response = hub_datasets[hub_dataset_name]["splits_response"] + split = "train" + + upsert_response( + "dataset-config-names", + dataset=dataset, + http_status=HTTPStatus.OK, + content=config_names, + ) + + upsert_response( + "config-split-names-from-streaming", + dataset=dataset, + config=config, + http_status=HTTPStatus.OK, + content=splits_response, + ) + + app_config = ( + app_config + if max_parquet_size_bytes is None + else replace( + app_config, duckdb_index=replace(app_config.duckdb_index, max_parquet_size_bytes=max_parquet_size_bytes) + ) + ) + + parquet_job_runner = get_parquet_job_runner(dataset, config, app_config) + parquet_response = parquet_job_runner.compute() + config_parquet = parquet_response.content + + # simulate more than one parquet file to index + extra_parquet_file = config_parquet["parquet_files"][0] + config_parquet["parquet_files"].append(extra_parquet_file) + + upsert_response( + "config-parquet-and-info", + dataset=dataset, + config=config, + http_status=HTTPStatus.OK, + content=config_parquet, + ) + + assert parquet_response + job_runner = get_job_runner(dataset, config, split, app_config) + job_runner.pre_compute() + + if expected_error_code: + with pytest.raises(Exception) as e: + job_runner.compute() + assert e.typename == expected_error_code + else: + job_runner.pre_compute() + response = job_runner.compute() + assert response + content = response.content + url = content["url"] + file_name = content["filename"] + assert url is not None + assert file_name is not None + job_runner.post_compute() + + # download locally duckdb index file + duckdb_file = requests.get(url) + with open(file_name, "wb") as f: + f.write(duckdb_file.content) + + duckdb.execute("INSTALL 'fts';") + duckdb.execute("LOAD 'fts';") + con = duckdb.connect(file_name) + + # validate number of inserted records + record_count = con.sql("SELECT COUNT(*) FROM data;").fetchall() + assert record_count is not None + assert isinstance(record_count, list) + assert record_count[0] == (10,) # dataset has 5 rows but since parquet file was duplicate it is 10 + + # perform a search to validate fts feature + query = "Lord Vader" + result = con.execute( + "SELECT text FROM data WHERE fts_main_data.match_bm25(__hf_index_id, ?) IS NOT NULL;", + [query], + ) + rows = result.df() + assert rows is not None + assert (rows["text"].eq("Vader turns round and round in circles as his ship spins into space.")).any() + assert (rows["text"].eq("The wingman spots the pirateship coming at him and warns the Dark Lord")).any() + assert (rows["text"].eq("We count thirty Rebel ships, Lord Vader.")).any() + assert ( + rows["text"].eq( + "Grand Moff Tarkin and Lord Vader are interrupted in their discussion by the buzz of the comlink" + ) + ).any() + assert not (rows["text"].eq("There goes another one.")).any() + + con.close() + os.remove(file_name) + job_runner.post_compute() diff --git a/services/worker/tests/test_executor.py b/services/worker/tests/test_executor.py index 87f44cd5f5..d83cbf2688 100644 --- a/services/worker/tests/test_executor.py +++ b/services/worker/tests/test_executor.py @@ -199,6 +199,7 @@ def job_runner_factory( libraries_resource: LibrariesResource, assets_directory: StrPath, parquet_metadata_directory: StrPath, + duckdb_index_cache_directory: StrPath, ) -> JobRunnerFactory: processing_graph = ProcessingGraph(app_config.processing_graph.specification) return JobRunnerFactory( @@ -207,6 +208,7 @@ def job_runner_factory( hf_datasets_cache=libraries_resource.hf_datasets_cache, assets_directory=assets_directory, parquet_metadata_directory=parquet_metadata_directory, + duckdb_index_cache_directory=duckdb_index_cache_directory, ) diff --git a/services/worker/tests/test_job_runner_factory.py b/services/worker/tests/test_job_runner_factory.py index 3ed3b0e7e6..e10bc8c0f6 100644 --- a/services/worker/tests/test_job_runner_factory.py +++ b/services/worker/tests/test_job_runner_factory.py @@ -39,6 +39,7 @@ def test_create_job_runner( libraries_resource: LibrariesResource, assets_directory: StrPath, parquet_metadata_directory: StrPath, + duckdb_index_cache_directory: StrPath, job_type: str, expected_job_runner: Optional[str], ) -> None: @@ -48,6 +49,7 @@ def test_create_job_runner( hf_datasets_cache=libraries_resource.hf_datasets_cache, assets_directory=assets_directory, parquet_metadata_directory=parquet_metadata_directory, + duckdb_index_cache_directory=duckdb_index_cache_directory, ) job_info: JobInfo = { "type": job_type, diff --git a/tools/docker-compose-datasets-server.yml b/tools/docker-compose-datasets-server.yml index d6b2bd92c6..37b1c87d1f 100644 --- a/tools/docker-compose-datasets-server.yml +++ b/tools/docker-compose-datasets-server.yml @@ -91,6 +91,7 @@ services: volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - parquet-metadata:${PARQUET_METADATA_STORAGE_DIRECTORY-/parquet_metadata}:rw + - duckdb-index:${DUCKDB_INDEX_STORAGE_DIRECTORY-/duckdb-index}:rw extends: file: docker-compose-base.yml service: datasets-worker @@ -112,6 +113,12 @@ services: PARQUET_AND_INFO_TARGET_REVISION: ${PARQUET_AND_INFO_TARGET_REVISION-refs/convert/parquet} PARQUET_AND_INFO_URL_TEMPLATE: ${PARQUET_AND_INFO_URL_TEMPLATE-/datasets/%s/resolve/%s/%s} PARQUET_METADATA_STORAGE_DIRECTORY: ${PARQUET_METADATA_STORAGE_DIRECTORY-/parquet_metadata} + DUCKDB_INDEX_STORAGE_DIRECTORY: ${DUCKDB_INDEX_STORAGE_DIRECTORY-/duckdb-index} + DUCKDB_INDEX_COMMIT_MESSAGE: ${DUCKDB_INDEX_COMMIT_MESSAGE-Update duckdb index file} + DUCKDB_INDEX_COMMITTER_HF_TOKEN: ${DUCKDB_INDEX_COMMITTER_HF_TOKEN-} + DUCKDB_INDEX_TARGET_REVISION: ${DUCKDB_INDEX_TARGET_REVISION-refs/convert/parquet} + DUCKDB_INDEX_URL_TEMPLATE: ${DUCKDB_INDEX_URL_TEMPLATE-/datasets/%s/resolve/%s/%s} + DUCKDB_INDEX_MAX_PARQUET_SIZE_BYTES: ${DUCKDB_INDEX_MAX_PARQUET_SIZE_BYTES-100_000_000} WORKER_STORAGE_PATHS: ${ASSETS_STORAGE_DIRECTORY-/assets} # ^ note: the datasets cache is automatically added, so no need to add it here OPT_IN_OUT_URLS_SCAN_COLUMNS_MAX_NUMBER: ${OPT_IN_OUT_URLS_SCAN_COLUMNS_MAX_NUMBER-10} @@ -145,3 +152,4 @@ volumes: parquet-modules-cache: parquet-numba-cache: parquet-metadata: + duckdb-index: diff --git a/tools/docker-compose-dev-datasets-server.yml b/tools/docker-compose-dev-datasets-server.yml index c50d781ff9..233e90f253 100644 --- a/tools/docker-compose-dev-datasets-server.yml +++ b/tools/docker-compose-dev-datasets-server.yml @@ -95,6 +95,7 @@ services: volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - parquet-metadata:${PARQUET_METADATA_STORAGE_DIRECTORY-/parquet_metadata}:rw + - duckdb-index:${DUCKDB_INDEX_STORAGE_DIRECTORY-/duckdb-index}:rw extends: file: docker-compose-dev-base.yml service: datasets-worker @@ -116,6 +117,12 @@ services: PARQUET_AND_INFO_TARGET_REVISION: ${PARQUET_AND_INFO_TARGET_REVISION-refs/convert/parquet} PARQUET_AND_INFO_URL_TEMPLATE: ${PARQUET_AND_INFO_URL_TEMPLATE-/datasets/%s/resolve/%s/%s} PARQUET_METADATA_STORAGE_DIRECTORY: ${PARQUET_METADATA_STORAGE_DIRECTORY-/parquet_metadata} + DUCKDB_INDEX_STORAGE_DIRECTORY: ${DUCKDB_INDEX_STORAGE_DIRECTORY-/duckdb-index} + DUCKDB_INDEX_COMMIT_MESSAGE: ${DUCKDB_INDEX_COMMIT_MESSAGE-Update duckdb index files} + DUCKDB_INDEX_COMMITTER_HF_TOKEN: ${DUCKDB_INDEX_COMMITTER_HF_TOKEN-} + DUCKDB_INDEX_TARGET_REVISION: ${DUCKDB_INDEX_TARGET_REVISION-refs/convert/parquet} + DUCKDB_INDEX_URL_TEMPLATE: ${DUCKDB_INDEX_URL_TEMPLATE-/datasets/%s/resolve/%s/%s} + DUCKDB_INDEX_MAX_PARQUET_SIZE_BYTES: ${DUCKDB_INDEX_MAX_PARQUET_SIZE_BYTES-100_000_000} WORKER_STORAGE_PATHS: ${ASSETS_STORAGE_DIRECTORY-/assets} # ^ note: the datasets cache is automatically added, so no need to add it here OPT_IN_OUT_URLS_SCAN_COLUMNS_MAX_NUMBER: ${OPT_IN_OUT_URLS_SCAN_COLUMNS_MAX_NUMBER-10} @@ -147,3 +154,4 @@ volumes: parquet-modules-cache: parquet-numba-cache: parquet-metadata: + duckdb-index: