From dcef976779c996cee9d0e205f0f0cfd2142761ed Mon Sep 17 00:00:00 2001 From: Liu Jiajun <939282975@qq.com> Date: Thu, 31 Oct 2024 19:03:40 +0800 Subject: [PATCH] feat (cli): initialize the CLI of GraphAr and support baseETL functions (#616) * init cli * fix include * add ci * add vertex info * change config * finish * license * fix license * remove conda recipe * enbale ci * fix typo * fix dependency * update ci * add data * fix ci * fix cmake * add arrow in cmake * add dependency * fix review * fix ci * fix ci * fix ci * fix ci * Update cli.yml * Update cli.yml * fix config type * fix ci with new testing * use enum * use enum * pin pydantic version --- .github/workflows/cli.yml | 117 ++++++ cli/.clang-format | 33 ++ cli/.gitignore | 144 ++++++++ cli/CMakeLists.txt | 60 +++ cli/README.md | 87 +++++ cli/pyproject.toml | 57 +++ cli/src/graphar_cli/__init__.py | 22 ++ cli/src/graphar_cli/config.py | 265 ++++++++++++++ cli/src/graphar_cli/graphar_cli.py | 172 +++++++++ cli/src/graphar_cli/importer.py | 122 +++++++ cli/src/graphar_cli/logging.py | 40 ++ cli/src/importer.h | 505 ++++++++++++++++++++++++++ cli/src/main.cc | 173 +++++++++ cli/src/util.h | 488 +++++++++++++++++++++++++ cli/test/merge.py | 103 ++++++ cli/test/test_basic.py | 24 ++ cpp/src/graphar/arrow/chunk_writer.cc | 6 +- cpp/src/graphar/arrow/chunk_writer.h | 8 +- cpp/src/graphar/graph_info.h | 2 + licenserc.toml | 2 + testing | 2 +- 21 files changed, 2424 insertions(+), 8 deletions(-) create mode 100644 .github/workflows/cli.yml create mode 100644 cli/.clang-format create mode 100644 cli/.gitignore create mode 100644 cli/CMakeLists.txt create mode 100644 cli/README.md create mode 100644 cli/pyproject.toml create mode 100644 cli/src/graphar_cli/__init__.py create mode 100644 cli/src/graphar_cli/config.py create mode 100644 cli/src/graphar_cli/graphar_cli.py create mode 100644 cli/src/graphar_cli/importer.py create mode 100644 cli/src/graphar_cli/logging.py create mode 100644 cli/src/importer.h create mode 100644 cli/src/main.cc create mode 100644 cli/src/util.h create mode 100644 cli/test/merge.py create mode 100644 cli/test/test_basic.py diff --git a/.github/workflows/cli.yml b/.github/workflows/cli.yml new file mode 100644 index 000000000..afc04eeb6 --- /dev/null +++ b/.github/workflows/cli.yml @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: GraphAr CLI CI + +on: + # Trigger the workflow on push or pull request, + # but only for the main branch + push: + branches: + - main + paths: + - 'cpp/**' + - 'cli/**' + - '.github/workflows/ci.yml' + - '.github/workflows/cli.yml' + pull_request: + branches: + - main + paths: + - 'cpp/**' + - 'cli/**' + - '.github/workflows/ci.yml' + - '.github/workflows/cli.yml' +concurrency: + group: ${{ github.repository }}-${{ github.event.number || github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +jobs: + ubuntu: + name: Ubuntu 22.04 CLI + runs-on: ubuntu-latest + if: ${{ !contains(github.event.pull_request.title, 'WIP') && !github.event.pull_request.draft }} + steps: + - uses: actions/checkout@v3 + with: + submodules: true + + - name: Install dependencies + run: | + + # install the latest arrow deb to test arrow + wget -c https://apache.jfrog.io/artifactory/arrow/"$(lsb_release --id --short | tr 'A-Z' 'a-z')"/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb \ + -P /tmp/ + sudo apt-get install -y /tmp/apache-arrow-apt-source-latest-"$(lsb_release --codename --short)".deb + sudo apt-get update -y + sudo apt install -y libarrow-dev \ + libarrow-dataset-dev \ + libarrow-acero-dev \ + libparquet-dev + sudo apt-get install -y ccache libcurl4-openssl-dev + + - name: Install GraphAr CLI and Run Tests + working-directory: "cli" + run: | + pip install ./ -v + graphar --help + graphar check -p ../testing/neo4j/MovieGraph.graph.yml + graphar show -p ../testing/neo4j/MovieGraph.graph.yml -v Person + graphar show -p ../testing/neo4j/MovieGraph.graph.yml -es Person -e ACTED_IN -ed Movie + graphar import -c ../testing/neo4j/data/import.mini.yml +# TODO: Add unit tests + + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v4.0.1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + + macos: + name: ${{ matrix.architecture }} macOS ${{ matrix.macos-version }} CLI + runs-on: macos-${{ matrix.macos-version }} + # TODO: Remove this when the macos issue is fixed + if: false + strategy: + fail-fast: false + matrix: + include: + - architecture: AMD64 + macos-version: "12" + - architecture: ARM64 + macos-version: "14" + steps: + - uses: actions/checkout@v3 + with: + submodules: true + + - name: Install dependencies + run: | + brew bundle --file=cpp/Brewfile + + + - name: Build GraphAr And Run Tests + working-directory: "cli" + run: | + pip install ./ + graphar --help + graphar check -p ../testing/neo4j/MovieGraph.graph.yml + graphar show -p ../testing/neo4j/MovieGraph.graph.yml -v Person + graphar show -p ../testing/neo4j/MovieGraph.graph.yml -es Person -e ACTED_IN -ed Movie + graphar import -c ../testing/neo4j/data/import.mini.yml + +# TODO: Add unit tests diff --git a/cli/.clang-format b/cli/.clang-format new file mode 100644 index 000000000..233429c9f --- /dev/null +++ b/cli/.clang-format @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +BasedOnStyle: Google +DerivePointerAlignment: false +PointerAlignment: Left +Cpp11BracedListStyle: true +IndentCaseLabels: false +AllowShortBlocksOnASingleLine: true +AllowShortLoopsOnASingleLine: false +AllowShortIfStatementsOnASingleLine: false +Standard: 'Cpp11' +SpaceAfterCStyleCast: true +AlignAfterOpenBracket: Align +SortIncludes: true +IncludeBlocks: Preserve +ForEachMacros: + - BOOST_FOREACH + diff --git a/cli/.gitignore b/cli/.gitignore new file mode 100644 index 000000000..ecba8a4bf --- /dev/null +++ b/cli/.gitignore @@ -0,0 +1,144 @@ +# Using https://github.com/github/gitignore/blob/master/Python.gitignore + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ +docs/_generate/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +_skbuild/ +.pyodide-xbuildenv/ diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt new file mode 100644 index 000000000..3639bbf4e --- /dev/null +++ b/cli/CMakeLists.txt @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Require CMake 3.15+ (matching scikit-build-core) Use new versions of all +# policies up to CMake 3.27 +cmake_minimum_required(VERSION 3.15) + +# Scikit-build-core sets these values for you, or you can just hard-code the +# name and version. +project( + ${SKBUILD_PROJECT_NAME} + VERSION ${SKBUILD_PROJECT_VERSION} + LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 17) +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/../cpp ${CMAKE_BINARY_DIR}/graphar) + +# Find the module development requirements (requires FindPython from 3.17 or +# scikit-build-core's built-in backport) +find_package(Python REQUIRED COMPONENTS Interpreter Development.Module) +find_package(pybind11 CONFIG REQUIRED) +find_package(Arrow REQUIRED) +find_package(ArrowDataset REQUIRED) +find_package(ArrowAcero REQUIRED) +find_package(Parquet REQUIRED) + +# Add a library using FindPython's tooling (pybind11 also provides a helper like +# this) +python_add_library(_core MODULE src/main.cc WITH_SOABI) + +target_link_libraries(_core PRIVATE pybind11::headers graphar Arrow::arrow_shared + Parquet::parquet_shared + ArrowDataset::arrow_dataset_shared + ArrowAcero::arrow_acero_shared + ) +target_include_directories(_core PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src) +target_include_directories(_core PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../cpp/src) +target_include_directories(_core PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../cpp/thirdparty) + +# This is passing in the version as a define just as an example +target_compile_definitions(_core PRIVATE VERSION_INFO=${PROJECT_VERSION}) + +# The install directory is the output (wheel) directory +set_target_properties(_core PROPERTIES INSTALL_RPATH "$ORIGIN") +install(TARGETS graphar DESTINATION graphar_cli) +install(TARGETS _core DESTINATION graphar_cli) diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 000000000..0f2ac54f2 --- /dev/null +++ b/cli/README.md @@ -0,0 +1,87 @@ +# GraphAr Cli + +GraphAr Cli uses [pybind11][] and [scikit-build-core][] to bind C++ code into Python and build command line tools through Python. Command line tools developed using [typer][]. + +[pybind11]: https://pybind11.readthedocs.io +[scikit-build-core]: https://scikit-build-core.readthedocs.io +[typer]: https://typer.tiangolo.com/ + +## Requirements + +- Linux (work fine on Ubuntu 22.04) +- Cmake >= 3.15 +- Arrow >= 12.0 +- Python >= 3.7 +- pip == latest + + +The best testing environment is `ghcr.io/apache/graphar-dev` Docker environment. + +And using Python in conda or venv is a good choice. + +## Installation + +- Clone this repository +- `pip install ./cli` or set verbose level `pip install -v ./cli` + +## Usage + +```bash +graphar --help + +# check the metadata, verify whether the vertex edge information and attribute information of the graph are valid +graphar check -p ../testing/neo4j/MovieGraph.graph.yml + +# show the vertex +graphar show -p ../testing/neo4j/MovieGraph.graph.yml -v Person + +# show the edge +graphar show -p ../testing/neo4j/MovieGraph.graph.yml -es Person -e ACTED_IN -ed Movie + +# import graph data by using a config file +graphar import -c ../testing/neo4j/data/import.mini.yml +``` + +## Import config file + +The config file supports `yaml` data type. We provide two reference templates for it: full and mini. + +The full version of the configuration file contains all configurable fields, and additional fields will be automatically ignored. + +The mini version of the configuration file is a simplified version of the full configuration file, retaining the same functionality. It shows the essential parts of the configuration information. + +For the full configuration file, if all fields can be set to their default values, you can simplify it to the mini version. However, it cannot be further reduced beyond the mini version. + +In the full `yaml` config file, we provide brief comments on the fields, which can be used as a reference. + +**Example** + +To import the movie graph data from the `testing` directory, you first need to prepare data files. Supported file types include `csv`, `json`(as well as`jsonline`, but should have the `.json` extension), `parquet`, and `orc` files. Please ensure the correct file extensions are set in advance, or specify the `file_type` field in the source section of the configuration. The `file_type` field will ignore the file extension. + +Next, write a configuration file following the provided sample. Any empty fields in the `graphar` configuration will be filled with default values. In the `import_schema`, empty fields will use the global configuration values from `graphar`. If fields in `import_schema` are not empty, they will override the values from `graphar`. + +A few important notes: + +1. The sources list specifies configuration for the data source files. For `csv` files, you can set the `delimiter`. The format of the `json` file should be given in the format of `jsonline`. + +2. The columns dictionary maps column names in the data source to node or edge properties. Keys represent column names in the data source, and values represent property names. + +3. Currently, edge properties cannot have the same names as the edge endpoints' properties; doing so will raise an exception. + +4. The following table lists the default fields, more of which are included in the full configuration. + + +| Field | Default value | +| ----------- | ----------- | +| `graphar.vertex_chunk_size` | `100` | +| `graphar.edge_chunk_size` | `1024` | +| `graphar.file_type` | `parquet` | +| `graphar.adj_list_type` | `ordered_by_source` | +| `graphar.validate_level` | `weak` | +| `graphar.version` | `gar/v1` | +| `property.nullable` | `true` | + + + + +Wish you a happy useļ¼ \ No newline at end of file diff --git a/cli/pyproject.toml b/cli/pyproject.toml new file mode 100644 index 000000000..8c0003a0b --- /dev/null +++ b/cli/pyproject.toml @@ -0,0 +1,57 @@ +[build-system] +requires = ["scikit-build-core>=0.3.3", "pybind11"] +build-backend = "scikit_build_core.build" + + +[project] +name = "graphar_cli" +version = "0.0.1" +description = "GraphAr command line tool" +readme = "README.md" +authors = [{ name = "GraphAr community", email = "dev@graphar.apache.org" }] +requires-python = ">=3.7" +dependencies = ["typer ~= 0.1", "pydantic ~= 2.0, < 2.7", "pyyaml ~= 6.0"] + +[project.optional-dependencies] +test = ["pandas ~= 2.0", "typing_extensions ~= 4.0"] + +[project.scripts] +graphar = "graphar_cli.graphar_cli:main" + + +[tool.scikit-build] +build-dir = "build" + +[tool.ruff] +src = ["src"] +line-length = 100 + + +[tool.ruff.lint] +extend-select = [ + "B", # flake8-bugbear + "I", # isort + "ARG", # flake8-unused-arguments + "C4", # flake8-comprehensions + "EM", # flake8-errmsg + "ICN", # flake8-import-conventions + "G", # flake8-logging-format + "PGH", # pygrep-hooks + "PIE", # flake8-pie + "PL", # pylint + "PT", # flake8-pytest-style + "PTH", # flake8-use-pathlib + "RET", # flake8-return + "RUF", # Ruff-specific + "SIM", # flake8-simplify + "T20", # flake8-print + "UP", # pyupgrade + "YTT", # flake8-2020 + "EXE", # flake8-executable + "NPY", # NumPy specific rules + "PD", # pandas-vet +] +ignore = [ + "PLR09", # Too many X + "PLR2004", # Magic comparison +] diff --git a/cli/src/graphar_cli/__init__.py b/cli/src/graphar_cli/__init__.py new file mode 100644 index 000000000..e8091abdb --- /dev/null +++ b/cli/src/graphar_cli/__init__.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ._core import __doc__, __version__ + +__all__ = ["__doc__", "__version__"] diff --git a/cli/src/graphar_cli/config.py b/cli/src/graphar_cli/config.py new file mode 100644 index 000000000..7322ddb41 --- /dev/null +++ b/cli/src/graphar_cli/config.py @@ -0,0 +1,265 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from enum import Enum +from logging import getLogger +from pathlib import Path +from typing import Dict, List, Literal, Optional # TODO: move to the TYPE_CHECKING block + +from pydantic import BaseModel, ConfigDict, field_validator, model_validator +from typing_extensions import Self + +logger = getLogger("graphar_cli") + +# TODO: move them to constants.py + +DEFAULT_FILE_TYPE = "parquet" +DEFAULT_ADJ_LIST_TYPE = "ordered_by_source" +DEFAULT_REGULAR_SEPARATOR = "_" +DEFAULT_VALIDATE_LEVEL = "weak" +DEFAULT_VERSION = "gar/v1" + + +class FileType(str, Enum): + parquet = "parquet" + csv = "csv" + orc = "orc" + json = "json" + + +class GraphArConfig(BaseModel): + path: str + name: str + vertex_chunk_size: Optional[int] = 100 + edge_chunk_size: Optional[int] = 1024 + file_type: FileType = DEFAULT_FILE_TYPE + adj_list_type: Literal[ + "ordered_by_source", "ordered_by_dest", "unordered_by_source", "unordered_by_dest" + ] = DEFAULT_ADJ_LIST_TYPE + validate_level: Literal["no", "weak", "strong"] = DEFAULT_VALIDATE_LEVEL + version: Optional[str] = DEFAULT_VERSION + + @field_validator("path") + def check_path(cls, v): + path = Path(v).resolve().absolute() + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + elif any(path.iterdir()): + msg = f"Warning: Path {v} already exists and contains files." + logger.warning(msg) + return v + + +class Property(BaseModel): + name: str + data_type: Literal["bool", "int32", "int64", "float", "double", "string", "date", "timestamp"] + is_primary: bool = False + nullable: Optional[bool] = None + + @model_validator(mode="after") + def check_nullable(self) -> Self: + if self.is_primary and self.nullable: + msg = f"Primary key `{self.name}` must not be nullable." + raise ValueError(msg) + if self.is_primary: + self.nullable = False + elif self.nullable is None: + self.nullable = True + return self + + +class PropertyGroup(BaseModel): + properties: List[Property] + file_type: Optional[FileType] = None + + @field_validator("properties") + def check_properties_length(cls, v): + if len(v) == 0: + msg = "properties must not be empty." + raise ValueError(msg) + return v + + +class Source(BaseModel): + file_type: Optional[FileType] = None + path: str + delimiter: str = "," + columns: Dict[str, str] + + @field_validator("path") + def check_path(cls, v): + path = Path(v).resolve().absolute() + if not path.is_file(): + msg = f"'{path}' is not a file." + raise ValueError(msg) + return v + + @field_validator("delimiter") + def check_delimiter(cls, v): + if len(v) != 1: + msg = "delimiter must be a single character." + raise ValueError(msg) + return v + + @model_validator(mode="after") + def check_file_type(self) -> Self: + if not self.file_type: + file_type = Path(self.path).suffix.removeprefix(".") + if file_type == "": + msg = f"File {self.path} has no file type suffix" + raise ValueError(msg) + if file_type not in FileType.__members__: + msg = f"Invalid file type '{file_type}'" + raise ValueError(msg) + self.file_type = file_type + return self + + +class Vertex(BaseModel): + type: str + labels: List[str] = [] + chunk_size: Optional[int] = None + validate_level: Optional[Literal["no", "weak", "strong"]] = None + prefix: Optional[str] = None + property_groups: List[PropertyGroup] + sources: List[Source] + + @field_validator("property_groups") + def check_property_groups_length(cls, v): + if len(v) == 0: + msg = "property_groups must not be empty." + raise ValueError(msg) + return v + + @field_validator("sources") + def check_sources_length(cls, v): + if len(v) == 0: + msg = "sources must not be empty." + raise ValueError(msg) + return v + + @model_validator(mode="after") + def check_vertex_prefix(self) -> Self: + prefix = self.prefix + type = self.type + if not prefix: + self.prefix = f"vertex/{type}/" + return self + + +class AdjList(BaseModel): + ordered: bool + aligned_by: Literal["src", "dst"] + file_type: Optional[FileType] = None + + +class Edge(BaseModel): + edge_type: str + src_type: str + src_prop: str + dst_type: str + dst_prop: str + chunk_size: Optional[int] = None + validate_level: Optional[Literal["no", "weak", "strong"]] = None + adj_lists: List[AdjList] = [] + property_groups: List[PropertyGroup] = [] + sources: List[Source] + prefix: Optional[str] = None + + @field_validator("sources") + def check_sources_length(cls, v): + if len(v) == 0: + msg = "sources must not be empty." + raise ValueError(msg) + return v + + @model_validator(mode="after") + def check_prefix(self) -> Self: + prefix = self.prefix + src_type = self.src_type + edge_type = self.edge_type + dst_type = self.dst_type + if not prefix: + self.prefix = ( + f"edge/{src_type}" + f"{DEFAULT_REGULAR_SEPARATOR}{edge_type}" + f"{DEFAULT_REGULAR_SEPARATOR}{dst_type}/" + ) + return self + + +class ImportSchema(BaseModel): + vertices: List[Vertex] + edges: List[Edge] + + @field_validator("vertices") + def check_property_groups_length(cls, v): + if len(v) == 0: + msg = "vertices must not be empty." + raise ValueError(msg) + return v + + +class ImportConfig(BaseModel): + model_config = ConfigDict(use_enum_values=True) + + graphar: GraphArConfig + import_schema: ImportSchema + + @model_validator(mode="after") + def check_none_types(self) -> Self: + for vertex in self.import_schema.vertices: + if vertex.chunk_size is None: + vertex.chunk_size = self.graphar.vertex_chunk_size + if vertex.validate_level is None: + vertex.validate_level = self.graphar.validate_level + for property_group in vertex.property_groups: + if property_group.file_type is None: + property_group.file_type = self.graphar.file_type + for edge in self.import_schema.edges: + if edge.chunk_size is None: + edge.chunk_size = self.graphar.edge_chunk_size + if edge.validate_level is None: + edge.validate_level = self.graphar.validate_level + if len(edge.adj_lists) == 0: + if self.graphar.adj_list_type == "ordered_by_source": + edge.adj_lists.append( + AdjList(ordered=True, aligned_by="src", file_type=self.graphar.file_type) + ) + elif self.graphar.adj_list_type == "ordered_by_dest": + edge.adj_lists.append( + AdjList(ordered=True, aligned_by="dst", file_type=self.graphar.file_type) + ) + elif self.graphar.adj_list_type == "unordered_by_source": + edge.adj_lists.append( + AdjList(ordered=False, aligned_by="src", file_type=self.graphar.file_type) + ) + elif self.graphar.adj_list_type == "unordered_by_dest": + edge.adj_lists.append( + AdjList(ordered=False, aligned_by="dst", file_type=self.graphar.file_type) + ) + else: + msg = f"Invalid adj_list_type '{self.graphar.adj_list_type}'" + raise ValueError(msg) + for adj_list in edge.adj_lists: + if adj_list.file_type is None: + adj_list.file_type = self.graphar.file_type + for property_group in edge.property_groups: + if property_group.file_type is None: + property_group.file_type = self.graphar.file_type + + return self diff --git a/cli/src/graphar_cli/graphar_cli.py b/cli/src/graphar_cli/graphar_cli.py new file mode 100644 index 000000000..1c5be2e6a --- /dev/null +++ b/cli/src/graphar_cli/graphar_cli.py @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from logging import getLogger +from pathlib import Path +from typing import List + +import typer +import yaml + +from ._core import ( # type: ignore # noqa: PGH003 + check_edge, + check_graph, + check_vertex, + do_import, + get_edge_count, + get_edge_types, + get_vertex_count, + get_vertex_types, + show_edge, + show_graph, + show_vertex, +) +from .config import ImportConfig +from .importer import validate +from .logging import setup_logging + +app = typer.Typer( + help="GraphAr Cli", + no_args_is_help=True, + add_completion=False, + context_settings={"help_option_names": ["-h", "--help"]}, +) + +setup_logging() +logger = getLogger(__name__) + + +@app.command( + context_settings={"help_option_names": ["-h", "--help"]}, + help="Show the metadata", + no_args_is_help=True, +) +def show( + path: str = typer.Option(None, "--path", "-p", help="Path to the GraphAr config file"), + vertex: str = typer.Option(None, "--vertex", "-v", help="Vertex type to show"), + edge_src: str = typer.Option(None, "--edge-src", "-es", help="Source of the edge type to show"), + edge: str = typer.Option(None, "--edge", "-e", help="Edge type to show"), + edge_dst: str = typer.Option( + None, "--edge-dst", "-ed", help="Destination of the edge type to show" + ), +) -> None: + if not Path(path).exists(): + logger.error("File not found: %s", path) + raise typer.Exit(1) + path = Path(path).resolve() if Path(path).is_absolute() else Path(Path.cwd(), path).resolve() + path = str(path) + if vertex: + vertex_types = get_vertex_types(path) + if vertex not in vertex_types: + logger.error("Vertex %s not found in the graph", vertex) + raise typer.Exit(1) + logger.info("Vertex count: %s", get_vertex_count(path, vertex)) + logger.info(show_vertex(path, vertex)) + raise typer.Exit() + if edge or edge_src or edge_dst: + if not (edge and edge_src and edge_dst): + logger.error("Edge source, edge, and edge destination must all be set") + raise typer.Exit(1) + edge_types: List[List[str]] = get_edge_types(path) + found = False + for edge_type in edge_types: + if edge_type[0] == edge_src and edge_type[1] == edge and edge_type[2] == edge_dst: + found = True + break + if not found: + logger.error( + "Edge type with source %s, edge %s, and destination %s not found in the graph", + edge_src, + edge, + edge_dst, + ) + raise typer.Exit(1) + logger.info("Edge count: %s", get_edge_count(path, edge_src, edge, edge_dst)) + logger.info(show_edge(path, edge_src, edge, edge_dst)) + raise typer.Exit() + logger.info(show_graph(path)) + + +@app.command( + context_settings={"help_option_names": ["-h", "--help"]}, + help="Check the metadata", + no_args_is_help=True, +) +def check( + path: str = typer.Option(None, "--path", "-p", help="Path to the GraphAr config file"), +): + if not Path(path).exists(): + logger.error("File not found: %s", path) + raise typer.Exit(1) + path = Path(path).resolve() if Path(path).is_absolute() else Path(Path.cwd(), path).resolve() + path = str(path) + vertex_types = get_vertex_types(path) + for vertex_type in vertex_types: + if not check_vertex(path, vertex_type): + logger.error("Vertex type %s is not valid", vertex_type) + raise typer.Exit(1) + edge_types = get_edge_types(path) + for edge_type in edge_types: + if edge_type[0] not in vertex_types: + logger.error("Source vertex type %s not found in the graph", edge_type[0]) + raise typer.Exit(1) + if edge_type[2] not in vertex_types: + logger.error("Destination vertex type %s not found in the graph", edge_type[2]) + raise typer.Exit(1) + if not check_edge(path, edge_type[0], edge_type[1], edge_type[2]): + logger.error( + "Edge type %s_%s_%s is not valid", edge_type[0], edge_type[1], edge_type[2] + ) + raise typer.Exit(1) + if not check_graph(path): + logger.error("Graph is not valid") + raise typer.Exit(1) + logger.info("Graph is valid") + + +@app.command( + "import", + context_settings={"help_option_names": ["-h", "--help"]}, + help="Import data", + no_args_is_help=True, +) +def import_data( + config_file: str = typer.Option(None, "--config", "-c", help="Path of the GraphAr config file"), +): + if not Path(config_file).is_file(): + logger.error("File not found: %s", config_file) + raise typer.Exit(1) + + try: + with Path(config_file).open(encoding="utf-8") as file: + config = yaml.safe_load(file) + import_config = ImportConfig(**config) + validate(import_config) + except Exception as e: + logger.error("Invalid config: %s", e) + raise typer.Exit(1) from None + try: + logger.info("Starting import") + res = do_import(import_config.model_dump()) + logger.info(res) + except Exception as e: + logger.error("Import failed: %s", e) + raise typer.Exit(1) from None + + +def main() -> None: + app() diff --git a/cli/src/graphar_cli/importer.py b/cli/src/graphar_cli/importer.py new file mode 100644 index 000000000..ec568849f --- /dev/null +++ b/cli/src/graphar_cli/importer.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from logging import getLogger + +from .config import ImportConfig + +logger = getLogger("graphar_cli") + + +def validate(import_config: ImportConfig): + vertex_types = set() + for vertex in import_config.import_schema.vertices: + if vertex.type in vertex_types: + msg = f"Duplicate vertex type {vertex.type}" + raise ValueError(msg) + vertex_types.add(vertex.type) + + prop_names = set() + primary_keys = [] + for prop_group in vertex.property_groups: + for prop in prop_group.properties: + if prop.name in prop_names: + msg = f"Duplicate property '{prop.name}' in vertex '{vertex.type}'" + raise ValueError(msg) + prop_names.add(prop.name) + if prop.is_primary: + if len(primary_keys): + msg = ( + f"Multiple primary keys '{primary_keys[0]}' and '{prop.name}' " + f"found in vertex '{vertex.type}'" + ) + raise ValueError(msg) + primary_keys.append(prop.name) + if prop.nullable: + msg = f"Primary key '{prop.name}' in '{vertex.type}' cannot be nullable" + raise ValueError(msg) + source_values = [value for source in vertex.sources for value in source.columns.values()] + for prop_name in prop_names: + if prop_name not in source_values: + msg = ( + f"Property '{prop_name}' in vertex '{vertex.type}' not found in source columns" + ) + raise ValueError(msg) + msg = f"Source columns are more than the properties in vertex '{vertex.type}'" + assert len(source_values) == len(prop_names), msg + logger.debug("Validated vertex %s", vertex.type) + + edge_types = set() + for edge in import_config.import_schema.edges: + if edge.edge_type in edge_types: + msg = f"Duplicate edge type {edge.type}" + raise ValueError(msg) + edge_types.add(edge.edge_type) + + if edge.src_type not in vertex_types: + msg = f"Source vertex type {edge.src_type} not found" + raise ValueError(msg) + if edge.dst_type not in vertex_types: + msg = f"Destination vertex type {edge.dst_type} not found" + raise ValueError(msg) + src_vertex = next( + vertex + for vertex in import_config.import_schema.vertices + if vertex.type == edge.src_type + ) + if edge.src_prop not in [ + prop.name for prop_group in src_vertex.property_groups for prop in prop_group.properties + ]: + msg = ( + f"Source property '{edge.src_prop}' " + f"not found in source vertex '{edge.src_type}' " + f"in edge '{edge.edge_type}'" + ) + raise ValueError(msg) + dst_vertex = next( + vertex + for vertex in import_config.import_schema.vertices + if vertex.type == edge.dst_type + ) + if edge.dst_prop not in [ + prop.name for prop_group in dst_vertex.property_groups for prop in prop_group.properties + ]: + msg = ( + f"Destination property '{edge.dst_prop}' " + f"not found in destination vertex '{edge.dst_type}' " + f"in edge '{edge.edge_type}'" + ) + raise ValueError(msg) + prop_names = set() + for prop_group in edge.property_groups: + for prop in prop_group.properties: + if prop.name in prop_names: + msg = f"Duplicate property '{prop.name}' in edge '{edge.edge_type}'" + raise ValueError(msg) + prop_names.add(prop.name) + + source_values = [value for source in edge.sources for value in source.columns.values()] + for prop_name in prop_names: + if prop_name not in source_values: + msg = ( + f"Property '{prop_name}' in edge " + f"'{edge.dst_prop}_{edge.edge_type}_{edge.edge_type}' " + f"not found in source columns" + ) + raise ValueError(msg) + # TODO: Validate source columns + logger.debug("Validated edge %s %s %s", edge.src_type, edge.edge_type, edge.dst_type) diff --git a/cli/src/graphar_cli/logging.py b/cli/src/graphar_cli/logging.py new file mode 100644 index 000000000..a2a595860 --- /dev/null +++ b/cli/src/graphar_cli/logging.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from typing import Union # TODO: move to the TYPE_CHECKING block + +from rich.console import Console +from rich.logging import RichHandler + + +def setup_logging(terminal_width: Union[int, None] = None) -> None: + logger = logging.getLogger("graphar_cli") + console = Console(width=terminal_width) if terminal_width else None + rich_handler = RichHandler( + show_time=False, + rich_tracebacks=True, + tracebacks_show_locals=True, + markup=True, + show_path=False, + console=console, + ) + rich_handler.setFormatter(logging.Formatter("%(message)s")) + logger.addHandler(rich_handler) + + logger.setLevel(logging.INFO) + logger.propagate = False diff --git a/cli/src/importer.h b/cli/src/importer.h new file mode 100644 index 000000000..11e4a6bd0 --- /dev/null +++ b/cli/src/importer.h @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "arrow/api.h" +#include "graphar/api/arrow_writer.h" +#include "graphar/api/high_level_writer.h" +#include "graphar/convert_to_arrow_type.h" +#include "graphar/graph_info.h" +#include "graphar/high-level/graph_reader.h" +#include "pybind11/pybind11.h" +#include "pybind11/stl.h" + +#include "util.h" + +namespace py = pybind11; +namespace fs = std::filesystem; + +struct GraphArConfig { + std::string path; + std::string name; + std::string version; +}; + +struct Property { + std::string name; + std::string data_type; + bool is_primary; + bool nullable; +}; + +struct PropertyGroup { + std::string file_type; + std::vector properties; +}; + +struct Source { + std::string file_type; + std::string path; + char delimiter; + std::unordered_map columns; +}; + +struct Vertex { + std::string type; + std::vector labels; + int chunk_size; + std::string validate_level; + std::string prefix; + std::vector property_groups; + std::vector sources; +}; + +struct AdjList { + bool ordered; + std::string aligned_by; + std::string file_type; +}; + +struct Edge { + std::string edge_type; + std::string src_type; + std::string src_prop; + std::string dst_type; + std::string dst_prop; + int chunk_size; + std::string validate_level; + std::string prefix; + std::vector adj_lists; + std::vector property_groups; + std::vector sources; +}; + +struct ImportSchema { + std::vector vertices; + std::vector edges; +}; + +struct ImportConfig { + GraphArConfig graphar_config; + ImportSchema import_schema; +}; + +ImportConfig ConvertPyDictToConfig(const py::dict& config_dict) { + ImportConfig import_config; + + auto graphar_dict = config_dict["graphar"].cast(); + import_config.graphar_config.path = graphar_dict["path"].cast(); + import_config.graphar_config.name = graphar_dict["name"].cast(); + import_config.graphar_config.version = + graphar_dict["version"].cast(); + + auto schema_dict = config_dict["import_schema"].cast(); + + auto vertices_list = schema_dict["vertices"].cast>(); + for (const auto& vertex_dict : vertices_list) { + Vertex vertex; + vertex.type = vertex_dict["type"].cast(); + vertex.chunk_size = vertex_dict["chunk_size"].cast(); + vertex.prefix = vertex_dict["prefix"].cast(); + vertex.validate_level = vertex_dict["validate_level"].cast(); + vertex.labels = vertex_dict["labels"].cast>(); + + auto pg_list = vertex_dict["property_groups"].cast>(); + for (const auto& pg_dict : pg_list) { + PropertyGroup pg; + pg.file_type = pg_dict["file_type"].cast(); + + auto prop_list = pg_dict["properties"].cast>(); + for (const auto& prop_dict : prop_list) { + Property prop; + prop.name = prop_dict["name"].cast(); + prop.data_type = prop_dict["data_type"].cast(); + prop.is_primary = prop_dict["is_primary"].cast(); + prop.nullable = prop_dict["nullable"].cast(); + pg.properties.emplace_back(prop); + } + vertex.property_groups.emplace_back(pg); + } + + auto source_list = vertex_dict["sources"].cast>(); + for (const auto& source_dict : source_list) { + Source src; + src.file_type = source_dict["file_type"].cast(); + src.path = source_dict["path"].cast(); + src.delimiter = source_dict["delimiter"].cast(); + src.columns = source_dict["columns"] + .cast>(); + + vertex.sources.emplace_back(src); + } + + import_config.import_schema.vertices.emplace_back(vertex); + } + + auto edges_list = schema_dict["edges"].cast>(); + for (const auto& edge_dict : edges_list) { + Edge edge; + edge.edge_type = edge_dict["edge_type"].cast(); + edge.src_type = edge_dict["src_type"].cast(); + edge.src_prop = edge_dict["src_prop"].cast(); + edge.dst_type = edge_dict["dst_type"].cast(); + edge.dst_prop = edge_dict["dst_prop"].cast(); + edge.chunk_size = edge_dict["chunk_size"].cast(); + edge.validate_level = edge_dict["validate_level"].cast(); + edge.prefix = edge_dict["prefix"].cast(); + + auto adj_list_dicts = edge_dict["adj_lists"].cast>(); + for (const auto& adj_list_dict : adj_list_dicts) { + AdjList adj_list; + adj_list.ordered = adj_list_dict["ordered"].cast(); + adj_list.aligned_by = adj_list_dict["aligned_by"].cast(); + adj_list.file_type = adj_list_dict["file_type"].cast(); + edge.adj_lists.emplace_back(adj_list); + } + + auto edge_pg_list = + edge_dict["property_groups"].cast>(); + for (const auto& edge_pg_dict : edge_pg_list) { + PropertyGroup edge_pg; + edge_pg.file_type = edge_pg_dict["file_type"].cast(); + auto edge_prop_list = + edge_pg_dict["properties"].cast>(); + + for (const auto& prop_dict : edge_prop_list) { + Property edge_prop; + edge_prop.name = prop_dict["name"].cast(); + edge_prop.data_type = prop_dict["data_type"].cast(); + edge_prop.is_primary = prop_dict["is_primary"].cast(); + edge_prop.nullable = prop_dict["nullable"].cast(); + edge_pg.properties.emplace_back(edge_prop); + } + + edge.property_groups.emplace_back(edge_pg); + } + + auto edge_source_list = edge_dict["sources"].cast>(); + for (const auto& edge_source_dict : edge_source_list) { + Source edge_src; + edge_src.file_type = edge_source_dict["file_type"].cast(); + edge_src.path = edge_source_dict["path"].cast(); + edge_src.delimiter = edge_source_dict["delimiter"].cast(); + edge_src.columns = + edge_source_dict["columns"] + .cast>(); + + edge.sources.emplace_back(edge_src); + } + + import_config.import_schema.edges.emplace_back(edge); + } + + return import_config; +} + +std::string DoImport(const py::dict& config_dict) { + auto import_config = ConvertPyDictToConfig(config_dict); + + auto version = + graphar::InfoVersion::Parse(import_config.graphar_config.version).value(); + fs::path save_path = import_config.graphar_config.path; + + std::unordered_map vertex_chunk_sizes; + std::unordered_map vertex_counts; + + std::map, + std::unordered_map, graphar::IdType, + arrow::Scalar::Hash, arrow::Scalar::PtrsEqual>> + vertex_prop_index_map; + + std::unordered_map> + vertex_props_in_edges; + std::map, graphar::Property> + vertex_prop_property_map; + for (const auto& edge : import_config.import_schema.edges) { + vertex_props_in_edges[edge.src_type].emplace_back(edge.src_prop); + vertex_props_in_edges[edge.dst_type].emplace_back(edge.dst_prop); + } + for (const auto& vertex : import_config.import_schema.vertices) { + vertex_chunk_sizes[vertex.type] = vertex.chunk_size; + + auto pgs = std::vector>(); + std::string primary_key; + for (const auto& pg : vertex.property_groups) { + std::vector props; + for (const auto& prop : pg.properties) { + if (prop.is_primary) { + if (!primary_key.empty()) { + throw std::runtime_error("Multiple primary keys found in vertex " + + vertex.type); + } + primary_key = prop.name; + } + graphar::Property property( + prop.name, graphar::DataType::TypeNameToDataType(prop.data_type), + prop.is_primary, prop.nullable); + props.emplace_back(property); + vertex_prop_property_map[std::make_pair(vertex.type, prop.name)] = + property; + } + // TODO: add prefix parameter in config + auto property_group = graphar::CreatePropertyGroup( + props, graphar::StringToFileType(pg.file_type)); + pgs.emplace_back(property_group); + } + + auto vertex_info = + graphar::CreateVertexInfo(vertex.type, vertex.chunk_size, pgs, + vertex.labels, vertex.prefix, version); + auto file_name = vertex.type + ".vertex.yml"; + vertex_info->Save(save_path / file_name); + auto save_path_str = save_path.string(); + save_path_str += "/"; + auto vertex_prop_writer = graphar::VertexPropertyWriter::Make( + vertex_info, save_path_str, + StringToValidateLevel(vertex.validate_level)) + .value(); + + std::vector> vertex_tables; + for (const auto& source : vertex.sources) { + std::vector column_names; + for (const auto& [key, value] : source.columns) { + column_names.emplace_back(key); + } + auto table = GetDataFromFile(source.path, column_names, source.delimiter, + source.file_type); + + std::unordered_map column_prop_map; + std::unordered_map reversed_columns_config; + for (const auto& [key, value] : source.columns) { + reversed_columns_config[value] = key; + } + for (const auto& pg : vertex.property_groups) { + for (const auto& prop : pg.properties) { + column_prop_map[reversed_columns_config[prop.name]] = prop; + } + } + std::unordered_map< + std::string, std::pair>> + columns_to_change; + for (const auto& [column, prop] : column_prop_map) { + auto arrow_data_type = graphar::DataType::DataTypeToArrowDataType( + graphar::DataType::TypeNameToDataType(prop.data_type)); + auto arrow_column = table->GetColumnByName(column); + // TODO: whether need to check duplicate values for primary key? + if (!prop.nullable) { + for (const auto& chunk : arrow_column->chunks()) { + if (chunk->null_count() > 0) { + throw std::runtime_error("Non-nullable column '" + column + + "' has null values"); + } + } + } + // TODO: check this + if (column != prop.name || + arrow_column->type()->id() != arrow_data_type->id()) { + columns_to_change[column] = + std::make_pair(prop.name, arrow_data_type); + } + } + table = ChangeNameAndDataType(table, columns_to_change); + vertex_tables.emplace_back(table); + } + std::shared_ptr merged_vertex_table = + MergeTables(vertex_tables); + // TODO: check all fields in props + // TODO: add start_index in config + graphar::IdType start_chunk_index = 0; + + auto vertex_table_with_index = + vertex_prop_writer + ->AddIndexColumn(merged_vertex_table, start_chunk_index, + vertex_info->GetChunkSize()) + .value(); + for (const auto& property_group : pgs) { + vertex_prop_writer->WriteTable(vertex_table_with_index, property_group, + start_chunk_index); + } + if (vertex_props_in_edges.find(vertex.type) != + vertex_props_in_edges.end()) { + for (const auto& vertex_prop : vertex_props_in_edges[vertex.type]) { + vertex_prop_index_map[std::make_pair(vertex.type, vertex_prop)] = + TableToUnorderedMap(vertex_table_with_index, vertex_prop, + graphar::GeneralParams::kVertexIndexCol); + } + } + auto vertex_count = merged_vertex_table->num_rows(); + vertex_counts[vertex.type] = vertex_count; + vertex_prop_writer->WriteVerticesNum(vertex_count); + } + + for (const auto& edge : import_config.import_schema.edges) { + auto pgs = std::vector>(); + + for (const auto& pg : edge.property_groups) { + std::vector props; + for (const auto& prop : pg.properties) { + props.emplace_back(graphar::Property( + prop.name, graphar::DataType::TypeNameToDataType(prop.data_type), + prop.is_primary, prop.nullable)); + } + // TODO: add prefix parameter in config + auto property_group = graphar::CreatePropertyGroup( + props, graphar::StringToFileType(pg.file_type)); + pgs.emplace_back(property_group); + } + graphar::AdjacentListVector adj_lists; + for (const auto& adj_list : edge.adj_lists) { + // TODO: add prefix parameter in config + adj_lists.emplace_back(graphar::CreateAdjacentList( + graphar::OrderedAlignedToAdjListType(adj_list.ordered, + adj_list.aligned_by), + graphar::StringToFileType(adj_list.file_type))); + } + + // TODO: add directed parameter in config + + bool directed = true; + // TODO: whether prefix has default value? + + auto edge_info = graphar::CreateEdgeInfo( + edge.src_type, edge.edge_type, edge.dst_type, edge.chunk_size, + vertex_chunk_sizes[edge.src_type], vertex_chunk_sizes[edge.dst_type], + directed, adj_lists, pgs, edge.prefix, version); + auto file_name = + ConcatEdgeTriple(edge.src_type, edge.edge_type, edge.dst_type) + + ".edge.yml"; + edge_info->Save(save_path / file_name); + auto save_path_str = save_path.string(); + save_path_str += "/"; + for (const auto& adj_list : adj_lists) { + int64_t vertex_count; + if (adj_list->GetType() == graphar::AdjListType::ordered_by_source || + adj_list->GetType() == graphar::AdjListType::unordered_by_source) { + vertex_count = vertex_counts[edge.src_type]; + } else { + vertex_count = vertex_counts[edge.dst_type]; + } + std::vector> edge_tables; + + for (const auto& source : edge.sources) { + std::vector column_names; + for (const auto& [key, value] : source.columns) { + column_names.emplace_back(key); + } + auto table = GetDataFromFile(source.path, column_names, + source.delimiter, source.file_type); + std::unordered_map column_prop_map; + std::unordered_map reversed_columns; + for (const auto& [key, value] : source.columns) { + reversed_columns[value] = key; + } + + for (const auto& pg : edge.property_groups) { + for (const auto& prop : pg.properties) { + column_prop_map[reversed_columns[prop.name]] = graphar::Property( + prop.name, + graphar::DataType::TypeNameToDataType(prop.data_type), + prop.is_primary, prop.nullable); + } + } + column_prop_map[reversed_columns.at(edge.src_prop)] = + vertex_prop_property_map.at( + std::make_pair(edge.src_type, edge.src_prop)); + column_prop_map[reversed_columns.at(edge.dst_prop)] = + vertex_prop_property_map.at( + std::make_pair(edge.dst_type, edge.dst_prop)); + std::unordered_map< + std::string, + std::pair>> + columns_to_change; + for (const auto& [column, prop] : column_prop_map) { + auto arrow_data_type = + graphar::DataType::DataTypeToArrowDataType(prop.type); + auto arrow_column = table->GetColumnByName(column); + // TODO: is needed? + if (!prop.is_nullable) { + for (const auto& chunk : arrow_column->chunks()) { + if (chunk->null_count() > 0) { + throw std::runtime_error("Non-nullable column '" + column + + "' has null values"); + } + } + } + if (column != prop.name || + arrow_column->type()->id() != arrow_data_type->id()) { + columns_to_change[column] = + std::make_pair(prop.name, arrow_data_type); + } + } + table = ChangeNameAndDataType(table, columns_to_change); + edge_tables.emplace_back(table); + } + std::unordered_map< + std::string, std::pair>> + vertex_columns_to_change; + + std::shared_ptr merged_edge_table = + MergeTables(edge_tables); + // TODO: check all fields in props + + auto combined_edge_table = + merged_edge_table->CombineChunks().ValueOrDie(); + + auto edge_builder = + graphar::builder::EdgesBuilder::Make( + edge_info, save_path_str, adj_list->GetType(), vertex_count, + StringToValidateLevel(edge.validate_level)) + .value(); + + std::vector edge_column_names; + for (const auto& field : combined_edge_table->schema()->fields()) { + edge_column_names.push_back(field->name()); + } + const int64_t num_rows = combined_edge_table->num_rows(); + for (int64_t i = 0; i < num_rows; ++i) { + auto edge_src_column = + combined_edge_table->GetColumnByName(edge.src_prop); + auto edge_dst_column = + combined_edge_table->GetColumnByName(edge.dst_prop); + + graphar::builder::Edge e( + vertex_prop_index_map + .at(std::make_pair(edge.src_type, edge.src_prop)) + .at(edge_src_column->GetScalar(i).ValueOrDie()), + vertex_prop_index_map + .at(std::make_pair(edge.dst_type, edge.dst_prop)) + .at(edge_dst_column->GetScalar(i).ValueOrDie())); + for (const auto& column_name : edge_column_names) { + if (column_name != edge.src_prop && column_name != edge.dst_prop) { + auto column = combined_edge_table->GetColumnByName(column_name); + auto column_type = column->type(); + std::any value; + TryToCastToAny( + graphar::DataType::ArrowDataTypeToDataType(column_type), + column->chunk(0), value); + e.AddProperty(column_name, value); + } + } + edge_builder->AddEdge(e); + } + edge_builder->Dump(); + } + } + return "Imported successfully!"; +} \ No newline at end of file diff --git a/cli/src/main.cc b/cli/src/main.cc new file mode 100644 index 000000000..4a0b03469 --- /dev/null +++ b/cli/src/main.cc @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "pybind11/pybind11.h" +#include "pybind11/stl.h" + +#include "graphar/filesystem.h" +#include "graphar/graph_info.h" +#include "graphar/reader_util.h" +#include "importer.h" + +#define STRINGIFY(x) #x +#define MACRO_STRINGIFY(x) STRINGIFY(x) + +std::string ShowGraph(const std::string& path) { + // TODO: check all the result values + auto graph_info = graphar::GraphInfo::Load(path).value(); + return graph_info->Dump().value(); +} + +std::string ShowVertex(const std::string& path, + const std::string& vertex_type) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto vertex_info = graph_info->GetVertexInfo(vertex_type); + return vertex_info->Dump().value(); +} + +std::string ShowEdge(const std::string& path, const std::string& src_type, + const std::string& edge_type, + const std::string& dst_type) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type); + return edge_info->Dump().value(); +} + +bool CheckGraph(const std::string& path) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + return graph_info->IsValidated(); +} + +bool CheckVertex(const std::string& path, const std::string& vertex_type) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto vertex_info = graph_info->GetVertexInfo(vertex_type); + return vertex_info->IsValidated(); +} + +bool CheckEdge(const std::string& path, const std::string& src_type, + const std::string& edge_type, const std::string& dst_type) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type); + return edge_info->IsValidated(); +} + +int64_t GetVertexCount(const std::string& path, + const std::string& vertex_type) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto graph_prefix = graph_info->GetPrefix(); + auto vertex_info = graph_info->GetVertexInfo(vertex_type); + return graphar::util::GetVertexNum(graph_prefix, vertex_info).value(); +} + +// TODO(ljj): Add this to graphar library + +std::vector _GetAdjListTypes( + const std::shared_ptr& edge_info) { + std::vector adj_list_types; + if (edge_info->HasAdjacentListType(graphar::AdjListType::ordered_by_dest)) { + adj_list_types.push_back(graphar::AdjListType::ordered_by_dest); + } + if (edge_info->HasAdjacentListType(graphar::AdjListType::ordered_by_source)) { + adj_list_types.push_back(graphar::AdjListType::ordered_by_source); + } + if (edge_info->HasAdjacentListType(graphar::AdjListType::unordered_by_dest)) { + adj_list_types.push_back(graphar::AdjListType::unordered_by_dest); + } + if (edge_info->HasAdjacentListType( + graphar::AdjListType::unordered_by_source)) { + adj_list_types.push_back(graphar::AdjListType::unordered_by_source); + } + if (adj_list_types.empty()) { + throw std::runtime_error("No valid adj list type found"); + } + return adj_list_types; +} + +int64_t GetEdgeCount(const std::string& path, const std::string& src_type, + const std::string& edge_type, + const std::string& dst_type) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto graph_prefix = graph_info->GetPrefix(); + auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type); + auto adj_list_types = _GetAdjListTypes(edge_info); + auto adj_list_type = adj_list_types[0]; + auto vertices_num_file_path = + edge_info->GetVerticesNumFilePath(adj_list_type).value(); + std::string base_dir; + auto fs = graphar::FileSystemFromUriOrPath(graph_prefix, &base_dir).value(); + std::string vertices_num_path = base_dir + vertices_num_file_path; + auto vertices_num = fs->ReadFileToValue(vertices_num_path).value(); + int max_chunk_index = (vertices_num + edge_info->GetSrcChunkSize() - 1) / + edge_info->GetSrcChunkSize(); + int64_t edge_count = 0; + for (int i = 0; i < max_chunk_index; i++) { + // TODO: file may not exist + edge_count += + graphar::util::GetEdgeNum(graph_prefix, edge_info, adj_list_type, i) + .value(); + } + return edge_count; +} + +std::vector GetVertexTypes(const std::string& path) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto vertex_infos = graph_info->GetVertexInfos(); + // TODO: change to unordered_set + std::vector vertex_types; + for (const auto& vertex_info : vertex_infos) { + vertex_types.push_back(vertex_info->GetType()); + } + return vertex_types; +} + +std::vector> GetEdgeTypes(const std::string& path) { + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto edge_infos = graph_info->GetEdgeInfos(); + // TODO: change to unordered_set + std::vector> edge_types; + for (const auto& edge_info : edge_infos) { + std::vector edge_type; + edge_type.push_back(edge_info->GetSrcType()); + edge_type.push_back(edge_info->GetEdgeType()); + edge_type.push_back(edge_info->GetDstType()); + edge_types.push_back(edge_type); + } + return edge_types; +} + +namespace py = pybind11; +PYBIND11_MODULE(_core, m) { + m.doc() = "GraphAr Python bindings"; + m.def("show_graph", &ShowGraph, "Show the graph info"); + m.def("show_vertex", &ShowVertex, "Show the vertex info"); + m.def("show_edge", &ShowEdge, "Show the edge info"); + m.def("check_graph", &CheckGraph, "Check the graph info"); + m.def("check_vertex", &CheckVertex, "Check the vertex info"); + m.def("check_edge", &CheckEdge, "Check the edge info"); + m.def("get_vertex_types", &GetVertexTypes, "Get the vertex types"); + m.def("get_edge_types", &GetEdgeTypes, "Get the edge types"); + m.def("get_vertex_count", &GetVertexCount, "Get the vertex count"); + m.def("get_edge_count", &GetEdgeCount, "Get the edge count"); + m.def("do_import", &DoImport, "Do the import"); +#ifdef VERSION_INFO + m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO); +#else + m.attr("__version__") = "dev"; +#endif +} diff --git a/cli/src/util.h b/cli/src/util.h new file mode 100644 index 000000000..600260441 --- /dev/null +++ b/cli/src/util.h @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#ifdef ARROW_ORC +#include "arrow/adapters/orc/adapter.h" +#endif +#include "arrow/api.h" +#include "arrow/compute/api.h" +#include "arrow/csv/api.h" +#include "arrow/io/api.h" +#include "arrow/json/api.h" +#include "graphar/api/arrow_writer.h" +#include "graphar/api/high_level_writer.h" +#include "graphar/graph_info.h" +#include "parquet/arrow/reader.h" + +std::string ConcatEdgeTriple(const std::string& src_type, + const std::string& edge_type, + const std::string& dst_type) { + return src_type + REGULAR_SEPARATOR + edge_type + REGULAR_SEPARATOR + + dst_type; +} + +graphar::ValidateLevel StringToValidateLevel(const std::string& level) { + if (level == "no") { + return graphar::ValidateLevel::no_validate; + } else if (level == "weak") { + return graphar::ValidateLevel::weak_validate; + } else if (level == "strong") { + return graphar::ValidateLevel::strong_validate; + } else { + throw std::runtime_error("Invalid validate level: " + level); + } +} + +// Utility function to filter the columns from a table +std::shared_ptr SelectColumns( + const std::shared_ptr& table, + const std::vector& column_names) { + if (column_names.empty()) { + throw std::runtime_error("No column names provided."); + } + std::vector indices; + for (const auto& name : column_names) { + auto column_index = table->schema()->GetFieldIndex(name); + if (column_index != -1) { + indices.push_back(column_index); + } + } + + if (indices.empty()) { + throw std::runtime_error("None of the column names matched the schema."); + } + + return table->SelectColumns(indices).ValueOrDie(); +} + +std::shared_ptr GetDataFromParquetFile( + const std::string& path, const std::vector& column_names) { + // Open the Parquet file + auto infile = + arrow::io::ReadableFile::Open(path, arrow::default_memory_pool()) + .ValueOrDie(); + + // Create a Parquet FileReader + std::unique_ptr parquet_reader; + auto status = parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), + &parquet_reader); + if (!status.ok()) { + throw std::runtime_error("Failed to create Parquet FileReader: " + + status.ToString()); + } + + // Retrieve the Arrow schema from the Parquet file + std::shared_ptr schema; + status = parquet_reader->GetSchema(&schema); + if (!status.ok()) { + throw std::runtime_error("Failed to retrieve schema from Parquet file: " + + status.ToString()); + } + + // Map column names to their indices in the schema + std::vector column_indices; + for (const auto& col_name : column_names) { + int64_t index = schema->GetFieldIndex(col_name); + if (index == -1) { + throw std::runtime_error("Column not found in schema: " + col_name); + } + column_indices.push_back(index); + } + + // Read the table with the selected columns + std::shared_ptr table; + status = parquet_reader->ReadTable(column_indices, &table); + if (!status.ok()) { + throw std::runtime_error("Failed to read table from Parquet file: " + + status.ToString()); + } + + return table; +} + +std::shared_ptr GetDataFromCsvFile( + const std::string& path, const std::vector& column_names, + const char delimiter) { + // Open the CSV file + auto input_result = + arrow::io::ReadableFile::Open(path, arrow::default_memory_pool()); + if (!input_result.ok()) { + throw std::runtime_error("Failed to open CSV file: " + + input_result.status().ToString()); + } + std::shared_ptr input = input_result.ValueOrDie(); + + // Define CSV parse options with the specified delimiter + arrow::csv::ParseOptions parse_options = arrow::csv::ParseOptions::Defaults(); + parse_options.delimiter = delimiter; + + // Define CSV convert options to include only the specified columns + arrow::csv::ConvertOptions convert_options = + arrow::csv::ConvertOptions::Defaults(); + convert_options.include_columns = column_names; + + // Optional: Define CSV read options (using defaults here) + arrow::csv::ReadOptions read_options = arrow::csv::ReadOptions::Defaults(); + + // Create a CSV TableReader using IOContext + arrow::io::IOContext io_context(arrow::default_memory_pool()); + arrow::Result> reader_result = + arrow::csv::TableReader::Make(io_context, input, read_options, + parse_options, convert_options); + + if (!reader_result.ok()) { + throw std::runtime_error("Failed to create CSV TableReader: " + + reader_result.status().ToString()); + } + std::shared_ptr reader = reader_result.ValueOrDie(); + + // Read the table + arrow::Result> table_result = reader->Read(); + if (!table_result.ok()) { + throw std::runtime_error("Failed to read table from CSV file: " + + table_result.status().ToString()); + } + std::shared_ptr table = table_result.ValueOrDie(); + + // Optional: Validate that all requested columns are present + auto schema = table->schema(); + for (const auto& col_name : column_names) { + if (schema->GetFieldByName(col_name) == nullptr) { + throw std::runtime_error("Column not found in CSV file: " + col_name); + } + } + + return table; +} + +#ifdef ARROW_ORC +std::shared_ptr GetDataFromOrcFile( + const std::string& path, const std::vector& column_names) { + // Open the ORC file + auto infile = + arrow::io::ReadableFile::Open(path, arrow::default_memory_pool()) + .ValueOrDie(); + + // Create an ORC file reader + std::unique_ptr orc_reader = + arrow::adapters::orc::ORCFileReader::Open(infile, + arrow::default_memory_pool()) + .ValueOrDie(); + + // Read the table with the selected columns + arrow::Result> table_result = + orc_reader->Read(column_names); + if (!table_result.ok()) { + throw std::runtime_error("Failed to read table from ORC file: " + + table_result.status().ToString()); + } + std::shared_ptr table = table_result.ValueOrDie(); + + // Optional: Validate that all requested columns are present + auto schema = table->schema(); + for (const auto& col_name : column_names) { + if (schema->GetFieldByName(col_name) == nullptr) { + throw std::runtime_error("Column not found in ORC file: " + col_name); + } + } + + return table; +} +#endif + +std::shared_ptr GetDataFromJsonFile( + const std::string& path, const std::vector& column_names) { + // Open the JSON file + auto infile = + arrow::io::ReadableFile::Open(path, arrow::default_memory_pool()) + .ValueOrDie(); + + // Define JSON read options (using defaults here) + arrow::json::ReadOptions read_options = arrow::json::ReadOptions::Defaults(); + + // Define JSON parse options (using defaults here) + arrow::json::ParseOptions parse_options = + arrow::json::ParseOptions::Defaults(); + + // Create a JSON TableReader + std::shared_ptr json_reader = + arrow::json::TableReader::Make(arrow::default_memory_pool(), infile, + arrow::json::ReadOptions::Defaults(), + arrow::json::ParseOptions::Defaults()) + .ValueOrDie(); + + // Read the table + arrow::Result> table_result = + json_reader->Read(); + if (!table_result.ok()) { + throw std::runtime_error("Failed to read table from ORC file: " + + table_result.status().ToString()); + } + std::shared_ptr table = table_result.ValueOrDie(); + + table = SelectColumns(table, column_names); + + // Optional: Validate that all requested columns are present + // TODO: must be equal + auto schema = table->schema(); + for (const auto& col_name : column_names) { + if (schema->GetFieldByName(col_name) == nullptr) { + throw std::runtime_error("Column not found in JSON file: " + col_name); + } + } + + return table; +} + +std::shared_ptr GetDataFromFile( + const std::string& path, const std::vector& column_names, + const char& delimiter, const std::string& file_type) { + // TODO: use explicit schema + // TODO: use switch case + if (file_type == "parquet") { + return GetDataFromParquetFile(path, column_names); + } else if (file_type == "csv") { + return GetDataFromCsvFile(path, column_names, delimiter); +#ifdef ARROW_ORC + } else if (file_type == "orc") { + return GetDataFromOrcFile(path, column_names); +#endif + } else if (file_type == "json") { + return GetDataFromJsonFile(path, column_names); + } else { + throw std::runtime_error("Unsupported file type: " + file_type); + } +} + +std::shared_ptr ChangeNameAndDataType( + const std::shared_ptr& table, + const std::unordered_map< + std::string, std::pair>>& + columns_to_change) { + // Retrieve original schema and number of columns + auto original_schema = table->schema(); + int64_t num_columns = table->num_columns(); + + // Prepare vectors for new schema fields and new column data + std::vector> new_fields; + std::vector> new_columns; + + for (int64_t i = 0; i < num_columns; ++i) { + auto original_field = original_schema->field(i); + auto original_column = table->column(i); // This is a ChunkedArray + + std::string original_name = original_field->name(); + std::shared_ptr original_type = original_field->type(); + + // Check if this column needs to be changed + auto it = columns_to_change.find(original_name); + if (it != columns_to_change.end()) { + std::string new_name = it->second.first; + std::shared_ptr new_type = it->second.second; + + bool name_changed = (new_name != original_name); + bool type_changed = !original_type->Equals(*new_type); + + std::shared_ptr new_chunked_array; + + // If data type needs to be changed, cast each chunk + if (type_changed) { + std::vector> casted_chunks; + for (const auto& chunk : original_column->chunks()) { + // Perform type casting using Compute API + arrow::compute::CastOptions cast_options; + cast_options.allow_int_overflow = false; // Set as needed + + auto cast_result = + arrow::compute::Cast(*chunk, new_type, cast_options); + if (!cast_result.ok()) { + throw std::runtime_error("Failed to cast column data."); + } + casted_chunks.push_back(cast_result.ValueOrDie()); + } + // Create a new ChunkedArray with casted chunks + new_chunked_array = + std::make_shared(casted_chunks, new_type); + } else { + // If type is not changed, keep the original column + new_chunked_array = original_column; + } + + // Create a new Field with the updated name and type + auto new_field = + arrow::field(new_name, type_changed ? new_type : original_type, + original_field->nullable()); + new_fields.push_back(new_field); + new_columns.push_back(new_chunked_array); + } else { + // Columns not in the change map remain unchanged + new_fields.push_back(original_field); + new_columns.push_back(original_column); + } + } + + // Create the new schema + auto new_schema = arrow::schema(new_fields); + + // Construct the new table with updated schema and columns + auto new_table = arrow::Table::Make(new_schema, new_columns); + + return new_table; +} + +std::shared_ptr MergeTables( + const std::vector>& tables) { + // Check if tables vector is not empty + if (tables.empty()) { + throw std::runtime_error("No tables to merge."); + } + + // Check if all tables have the same number of rows + int64_t num_rows = tables[0]->num_rows(); + for (const auto& table : tables) { + if (table->num_rows() != num_rows) { + throw std::runtime_error("All tables must have the same number of rows."); + } + } + + // Prepare a vector to hold all the columns from the input tables + std::vector> fields; + std::vector> columns; + + for (const auto& table : tables) { + for (int64_t i = 0; i < table->num_columns(); ++i) { + fields.push_back(table->schema()->field(i)); + columns.push_back(table->column(i)); + } + } + + // Create a new schema and table with merged columns + auto merged_schema = std::make_shared(fields); + auto merged_table = arrow::Table::Make(merged_schema, columns, num_rows); + + return merged_table; +} + +std::unordered_map, graphar::IdType, + arrow::Scalar::Hash, arrow::Scalar::PtrsEqual> +TableToUnorderedMap(const std::shared_ptr& table, + const std::string& key_column_name, + const std::string& value_column_name) { + auto combined_table = table->CombineChunks().ValueOrDie(); + // Get the column indices + auto key_column_idx = + combined_table->schema()->GetFieldIndex(key_column_name); + auto value_column_idx = + combined_table->schema()->GetFieldIndex(value_column_name); + if (key_column_idx == -1) { + throw std::runtime_error("Key column '" + key_column_name + + "' not found in the table."); + } + if (value_column_idx == -1) { + throw std::runtime_error("Value column '" + value_column_name + + "' not found in the table."); + } + + // Extract the columns + auto key_column = combined_table->column(key_column_idx); + auto value_column = combined_table->column(value_column_idx); + + std::unordered_map, graphar::IdType, + arrow::Scalar::Hash, arrow::Scalar::PtrsEqual> + result; + + // Ensure both columns have the same length + if (key_column->length() != value_column->length()) { + throw std::runtime_error("Key and value columns have different lengths."); + } + + // Iterate over each row and populate the map + for (int64_t i = 0; i < key_column->length(); ++i) { + auto key_column_chunk = key_column->chunk(0); + auto value_column_chunk = value_column->chunk(0); + // Check for nulls + if (key_column_chunk->IsNull(i)) { + throw std::runtime_error("Null key value at index " + std::to_string(i) + + " in " + key_column_name); + } + if (value_column_chunk->IsNull(i)) { + throw std::runtime_error("Null value at index " + std::to_string(i) + + " in " + value_column_name); + } + + // Extract key and value using the helper function + auto key = key_column_chunk->GetScalar(i).ValueOrDie(); + auto value = std::static_pointer_cast( + value_column_chunk->GetScalar(i).ValueOrDie()) + ->value; + result.emplace(key, value); + } + + return result; +} + +template +graphar::Status CastToAny(std::shared_ptr array, std::any& any, + int64_t index) { // NOLINT + if (array->IsNull(index)) { + any = std::any(); + return graphar::Status::OK(); + } + using ArrayType = typename graphar::TypeToArrowType::ArrayType; + auto column = std::dynamic_pointer_cast(array); + any = column->GetView(index); + return graphar::Status::OK(); +} + +template <> +graphar::Status CastToAny( + std::shared_ptr array, std::any& any, + int64_t index) { // NOLINT + auto column = std::dynamic_pointer_cast(array); + any = column->GetString(index); + return graphar::Status::OK(); +} + +graphar::Status TryToCastToAny(const std::shared_ptr& type, + std::shared_ptr array, + std::any& any, int64_t index = 0) { // NOLINT + switch (type->id()) { + case graphar::Type::BOOL: + return CastToAny(array, any, index); + case graphar::Type::INT32: + return CastToAny(array, any, index); + case graphar::Type::INT64: + return CastToAny(array, any, index); + case graphar::Type::FLOAT: + return CastToAny(array, any, index); + case graphar::Type::DOUBLE: + return CastToAny(array, any, index); + case graphar::Type::STRING: + return CastToAny(array, any, index); + case graphar::Type::DATE: + return CastToAny(array, any, index); + case graphar::Type::TIMESTAMP: + return CastToAny(array, any, index); + default: + return graphar::Status::TypeError("Unsupported type."); + } + return graphar::Status::OK(); +} diff --git a/cli/test/merge.py b/cli/test/merge.py new file mode 100644 index 000000000..53d7a7049 --- /dev/null +++ b/cli/test/merge.py @@ -0,0 +1,103 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from enum import Enum +from pathlib import Path +from typing import List, Optional + +import pandas as pd +import typer +from typing_extensions import Annotated + +app = typer.Typer(no_args_is_help=True, context_settings={"help_option_names": ["-h", "--help"]}) + + +support_file_types = {"parquet", "orc", "csv", "json"} + + +class FileType(str, Enum): + parquet = "parquet" + csv = "csv" + orc = "orc" + json = "json" + + +@app.command( + "merge", + context_settings={"help_option_names": ["-h", "--help"]}, + help="Merge source files", + no_args_is_help=True, +) +def merge_data( + files: Annotated[ + List[str], typer.Option("--file", "-f", help="Files to merge", show_default=False) + ], + output_file: Annotated[ + str, typer.Option("--output", "-o", help="Output file", show_default=False) + ], + type: Annotated[ + Optional[FileType], typer.Option("--type", "-t", help="Type of data to output", show_default=False) + ] = None, +): + if not files: + typer.echo("No files to merge") + raise typer.Exit(1) + if not output_file: + typer.echo("No output file") + raise typer.Exit(1) + data = [] + for file in files: + path = Path(file) + if not path.is_file(): + typer.echo(f"File {file} not found") + raise typer.Exit(1) + file_type = path.suffix.removeprefix(".") + if file_type == "": + typer.echo(f"File {file} has no file type suffix") + raise typer.Exit(1) + if file_type not in support_file_types: + typer.echo(f"File type {file_type} not supported") + raise typer.Exit(1) + if file_type == "parquet": + data.append(pd.read_parquet(file)) + elif file_type == "csv": + data.append(pd.read_csv(file)) + elif file_type == "orc": + data.append(pd.read_orc(file)) + elif file_type == "json": + data.append(pd.read_json(file)) + output_path = Path(output_file) + if output_path.is_file(): + typer.echo(f"Output file {output_file} already exists") + if not typer.prompt("Do you want to overwrite it?", default=False): + raise typer.Exit(1) + if not type: + type = output_path.suffix.removeprefix(".") + result = pd.concat(data, ignore_index=True) + if type == "parquet": + result.to_parquet(output_file) + elif type == "csv": + result.to_csv(output_file) + elif type == "orc": + result.to_orc(output_file) + elif type == "json": + result.to_json(output_file, orient="records", lines=True) + typer.echo(f"Data merged to {output_file}") + + +if __name__ == "__main__": + app() diff --git a/cli/test/test_basic.py b/cli/test/test_basic.py new file mode 100644 index 000000000..44afa889c --- /dev/null +++ b/cli/test/test_basic.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import graphar_cli as m + + +def test_version(): + assert m.__version__ == "0.0.1" diff --git a/cpp/src/graphar/arrow/chunk_writer.cc b/cpp/src/graphar/arrow/chunk_writer.cc index 5be9d5e2c..964886b06 100644 --- a/cpp/src/graphar/arrow/chunk_writer.cc +++ b/cpp/src/graphar/arrow/chunk_writer.cc @@ -278,7 +278,7 @@ Status VertexPropertyWriter::WriteTable( if (indice == -1) { // add index column GAR_ASSIGN_OR_RAISE(table_with_index, - addIndexColumn(input_table, start_chunk_index, + AddIndexColumn(input_table, start_chunk_index, vertex_info_->GetChunkSize())); } IdType chunk_size = vertex_info_->GetChunkSize(); @@ -298,7 +298,7 @@ Status VertexPropertyWriter::WriteTable( ValidateLevel validate_level) const { auto property_groups = vertex_info_->GetPropertyGroups(); GAR_ASSIGN_OR_RAISE(auto table_with_index, - addIndexColumn(input_table, start_chunk_index, + AddIndexColumn(input_table, start_chunk_index, vertex_info_->GetChunkSize())); for (auto& property_group : property_groups) { GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group, @@ -428,7 +428,7 @@ Result> VertexPropertyWriter::Make( return Make(vertex_info, graph_info->GetPrefix(), validate_level); } -Result> VertexPropertyWriter::addIndexColumn( +Result> VertexPropertyWriter::AddIndexColumn( const std::shared_ptr& table, IdType chunk_index, IdType chunk_size) const { arrow::Int64Builder array_builder; diff --git a/cpp/src/graphar/arrow/chunk_writer.h b/cpp/src/graphar/arrow/chunk_writer.h index 23c7f415f..89d331261 100644 --- a/cpp/src/graphar/arrow/chunk_writer.h +++ b/cpp/src/graphar/arrow/chunk_writer.h @@ -231,6 +231,10 @@ class VertexPropertyWriter { const std::shared_ptr& graph_info, const std::string& type, const ValidateLevel& validate_level = ValidateLevel::no_validate); + Result> AddIndexColumn( + const std::shared_ptr& table, IdType chunk_index, + IdType chunk_size) const; + private: /** * @brief Check if the operation of writing vertices number is allowed. @@ -265,10 +269,6 @@ class VertexPropertyWriter { const std::shared_ptr& property_group, IdType chunk_index, ValidateLevel validate_level) const; - Result> addIndexColumn( - const std::shared_ptr& table, IdType chunk_index, - IdType chunk_size) const; - private: std::shared_ptr vertex_info_; std::string prefix_; diff --git a/cpp/src/graphar/graph_info.h b/cpp/src/graphar/graph_info.h index b2b372b0c..c067810c5 100644 --- a/cpp/src/graphar/graph_info.h +++ b/cpp/src/graphar/graph_info.h @@ -38,6 +38,8 @@ class Property { bool is_primary; // primary key tag bool is_nullable; // nullable tag for non-primary key + Property() = default; + explicit Property(const std::string& name, const std::shared_ptr& type = nullptr, bool is_primary = false, bool is_nullable = true) diff --git a/licenserc.toml b/licenserc.toml index 5353b95ba..33d89581c 100644 --- a/licenserc.toml +++ b/licenserc.toml @@ -61,4 +61,6 @@ excludes = [ "java/build.xml", "**/*.json", "pyspark/poetry.lock", + "cli/*.yml", + "cli/*.toml", ] diff --git a/testing b/testing index 491fce725..955596c32 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 491fce7259406ff33986229590576589b84230d8 +Subproject commit 955596c325ceba7b607e285738e3dd0ce4ff424e