Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade DataFusion, Arrow, PyO3, ObjectStore #2594

Merged
merged 9 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 37 additions & 36 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ exclude = ["python"]
resolver = "2"

[workspace.package]
version = "0.14.2"
version = "0.15.0"
edition = "2021"
authors = ["Lance Devs <dev@lancedb.com>"]
license = "Apache-2.0"
Expand All @@ -43,33 +43,33 @@ categories = [
rust-version = "1.78"

[workspace.dependencies]
lance = { version = "=0.14.2", path = "./rust/lance" }
lance-arrow = { version = "=0.14.2", path = "./rust/lance-arrow" }
lance-core = { version = "=0.14.2", path = "./rust/lance-core" }
lance-datafusion = { version = "=0.14.2", path = "./rust/lance-datafusion" }
lance-datagen = { version = "=0.14.2", path = "./rust/lance-datagen" }
lance-encoding = { version = "=0.14.2", path = "./rust/lance-encoding" }
lance-encoding-datafusion = { version = "=0.14.2", path = "./rust/lance-encoding-datafusion" }
lance-file = { version = "=0.14.2", path = "./rust/lance-file" }
lance-index = { version = "=0.14.2", path = "./rust/lance-index" }
lance-io = { version = "=0.14.2", path = "./rust/lance-io" }
lance-linalg = { version = "=0.14.2", path = "./rust/lance-linalg" }
lance-table = { version = "=0.14.2", path = "./rust/lance-table" }
lance-test-macros = { version = "=0.14.2", path = "./rust/lance-test-macros" }
lance-testing = { version = "=0.14.2", path = "./rust/lance-testing" }
lance = { version = "=0.15.0", path = "./rust/lance" }
lance-arrow = { version = "=0.15.0", path = "./rust/lance-arrow" }
lance-core = { version = "=0.15.0", path = "./rust/lance-core" }
lance-datafusion = { version = "=0.15.0", path = "./rust/lance-datafusion" }
lance-datagen = { version = "=0.15.0", path = "./rust/lance-datagen" }
lance-encoding = { version = "=0.15.0", path = "./rust/lance-encoding" }
lance-encoding-datafusion = { version = "=0.15.0", path = "./rust/lance-encoding-datafusion" }
lance-file = { version = "=0.15.0", path = "./rust/lance-file" }
lance-index = { version = "=0.15.0", path = "./rust/lance-index" }
lance-io = { version = "=0.15.0", path = "./rust/lance-io" }
lance-linalg = { version = "=0.15.0", path = "./rust/lance-linalg" }
lance-table = { version = "=0.15.0", path = "./rust/lance-table" }
lance-test-macros = { version = "=0.15.0", path = "./rust/lance-test-macros" }
lance-testing = { version = "=0.15.0", path = "./rust/lance-testing" }
approx = "0.5.1"
# Note that this one does not include pyarrow
arrow = { version = "51.0.0", optional = false, features = ["prettyprint"] }
arrow-arith = "51.0"
arrow-array = "51.0"
arrow-buffer = "51.0"
arrow-cast = "51.0"
arrow-data = "51.0"
arrow-ipc = { version = "51.0", features = ["zstd"] }
arrow-ord = "51.0"
arrow-row = "51.0"
arrow-schema = "51.0"
arrow-select = "51.0"
arrow = { version = "52.1", optional = false, features = ["prettyprint"] }
arrow-arith = "52.1"
arrow-array = "52.1"
arrow-buffer = "52.1"
arrow-cast = "52.1"
arrow-data = "52.1"
arrow-ipc = { version = "52.1", features = ["zstd"] }
arrow-ord = "52.1"
arrow-row = "52.1"
arrow-schema = "52.1"
arrow-select = "52.1"
async-recursion = "1.0"
async-trait = "0.1"
aws-config = "0.57"
Expand All @@ -93,17 +93,18 @@ criterion = { version = "0.5", features = [
"html_reports",
] }
crossbeam-queue = "0.3"
datafusion = { version = "37.1", default-features = false, features = [
datafusion = { version = "40.0", default-features = false, features = [
"array_expressions",
"regex_expressions",
"unicode_expressions",
] }
datafusion-common = "37.1"
datafusion-functions = { version = "37.1", features = ["regex_expressions"] }
datafusion-sql = "37.1"
datafusion-expr = "37.1"
datafusion-execution = "37.1"
datafusion-optimizer = "37.1"
datafusion-physical-expr = { version = "37.1", features = [
datafusion-common = "40.0"
datafusion-functions = { version = "40.0", features = ["regex_expressions"] }
datafusion-sql = "40.0"
datafusion-expr = "40.0"
datafusion-execution = "40.0"
datafusion-optimizer = "40.0"
datafusion-physical-expr = { version = "40.0", features = [
"regex_expressions",
] }
deepsize = "0.2.0"
Expand All @@ -119,8 +120,8 @@ mock_instant = { version = "0.3.1", features = ["sync"] }
moka = "0.11"
num-traits = "0.2"
num_cpus = "1.0"
object_store = { version = "0.9.0" }
parquet = "51.0"
object_store = { version = "0.10.1" }
parquet = "52.0"
pin-project = "1.0"
path_abs = "0.5"
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
Expand Down
2 changes: 2 additions & 0 deletions ci/check_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ def parse_version(version: str) -> tuple[int, int, int]:

if __name__ == "__main__":
new_version = parse_version(get_versions())
print(f"New version: {new_version}")

repo = Github().get_repo(os.environ["GITHUB_REPOSITORY"])
latest_release = repo.get_latest_release()
last_version = parse_version(latest_release.tag_name[1:])
print(f"Last version: {last_version}")

# Check for a breaking-change label in the PRs between the last release and the current commit.
commits = repo.compare(latest_release.tag_name, os.environ["GITHUB_SHA"]).commits
Expand Down
17 changes: 7 additions & 10 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pylance"
version = "0.14.2"
version = "0.15.0"
edition = "2021"
authors = ["Lance Devs <dev@lancedb.com>"]
rust-version = "1.65"
Expand All @@ -12,11 +12,11 @@ name = "lance"
crate-type = ["cdylib"]

[dependencies]
arrow = { version = "51.0.0", features = ["pyarrow"] }
arrow-array = "51.0"
arrow-data = "51.0"
arrow-schema = "51.0"
object_store = "0.9.0"
arrow = { version = "52.1", features = ["pyarrow"] }
arrow-array = "52.1"
arrow-data = "52.1"
arrow-schema = "52.1"
object_store = "0.10.1"
async-trait = "0.1"
chrono = "0.4.31"
env_logger = "0.10"
Expand All @@ -42,7 +42,7 @@ lance-table = { path = "../rust/lance-table" }
lazy_static = "1"
log = "0.4"
prost = "0.12.2"
pyo3 = { version = "0.20", features = ["extension-module", "abi3-py39"] }
pyo3 = { version = "0.21", features = ["extension-module", "abi3-py39", "gil-refs"] }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
uuid = "1.3.0"
serde_json = "1"
Expand All @@ -55,9 +55,6 @@ tracing-subscriber = "0.3.17"
tracing = "0.1.37"
url = "2.5.0"

# Prevent dynamic linking of lzma, which comes from datafusion
lzma-sys = { version = "*", features = ["static"] }

[features]
datagen = ["lance-datagen"]
fp16kernels = ["lance/fp16kernels"]
Expand Down
20 changes: 0 additions & 20 deletions python/python/lance/cleanup.py

This file was deleted.

76 changes: 5 additions & 71 deletions python/python/lance/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, Optional

from .lance import _cleanup_partial_writes

if TYPE_CHECKING:
# We don't import directly because of circular import
from .fragment import FragmentMetadata
Expand All @@ -25,29 +23,22 @@ class FragmentWriteProgress(ABC):
This tracking class is experimental and may change in the future.
"""

def _do_begin(
self, fragment_json: str, multipart_id: Optional[str] = None, **kwargs
):
def _do_begin(self, fragment_json: str, **kwargs):
"""Called when a new fragment is created"""
from .fragment import FragmentMetadata

fragment = FragmentMetadata.from_json(fragment_json)
return self.begin(fragment, multipart_id, **kwargs)
return self.begin(fragment, **kwargs)

@abstractmethod
def begin(
self, fragment: "FragmentMetadata", multipart_id: Optional[str] = None, **kwargs
) -> None:
def begin(self, fragment: "FragmentMetadata", **kwargs) -> None:
"""Called when a new fragment is about to be written.

Parameters
----------
fragment : FragmentMetadata
The fragment that is open to write to. The fragment id might not
yet be assigned at this point.
multipart_id : str, optional
The multipart id that will be uploaded to cloud storage. This may be
used later to abort incomplete uploads if this fragment write fails.
kwargs: dict, optional
Extra keyword arguments to pass to the implementation.

Expand Down Expand Up @@ -84,9 +75,7 @@ class NoopFragmentWriteProgress(FragmentWriteProgress):
This is the default implementation.
"""

def begin(
self, fragment: "FragmentMetadata", multipart_id: Optional[str] = None, **kargs
):
def begin(self, fragment: "FragmentMetadata", **kargs):
pass

def complete(self, fragment: "FragmentMetadata", **kwargs):
Expand Down Expand Up @@ -135,25 +124,20 @@ def _in_progress_path(self, fragment: "FragmentMetadata") -> str:
def _fragment_file(self, fragment: "FragmentMetadata") -> str:
return os.path.join(self._base_path, f"fragment_{fragment.id}.json")

def begin(
self, fragment: "FragmentMetadata", multipart_id: Optional[str] = None, **kwargs
):
def begin(self, fragment: "FragmentMetadata", **kwargs):
"""Called when a new fragment is created.

Parameters
----------
fragment : FragmentMetadata
The fragment that is open to write to.
multipart_id : str, optional
The multipart id to upload this fragment to cloud storage.
"""

self._fs.create_dir(self._base_path, recursive=True)

with self._fs.open_output_stream(self._in_progress_path(fragment)) as out:
progress_data = {
"fragment_id": fragment.id,
"multipart_id": multipart_id if multipart_id else "",
"metadata": self._metadata,
}
out.write(json.dumps(progress_data).encode("utf-8"))
Expand All @@ -164,53 +148,3 @@ def begin(
def complete(self, fragment: "FragmentMetadata", **kwargs):
"""Called when a fragment is completed"""
self._fs.delete_file(self._in_progress_path(fragment))

def cleanup_partial_writes(self, dataset_uri: str) -> int:
"""
Finds all in-progress files and cleans up any partially written data
files. This is useful for cleaning up after a failed write.

Parameters
----------
dataset_uri : str
The URI of the table to clean up.

Returns
-------
int
The number of partial writes cleaned up.
"""
from pyarrow.fs import FileSelector

from .fragment import FragmentMetadata

marker_paths = []
objects = []
selector = FileSelector(self._base_path)
for info in self._fs.get_file_info(selector):
path = info.path
if path.endswith(self.PROGRESS_EXT):
marker_paths.append(path)
with self._fs.open_input_stream(path) as f:
progress_data = json.loads(f.read().decode("utf-8"))

json_path = path.rstrip(self.PROGRESS_EXT) + ".json"
with self._fs.open_input_stream(json_path) as f:
fragment_metadata = FragmentMetadata.from_json(
f.read().decode("utf-8")
)
objects.append(
(
fragment_metadata.data_files()[0].path(),
progress_data["multipart_id"],
)
)

_cleanup_partial_writes(dataset_uri, objects)

for path in marker_paths:
self._fs.delete_file(path)
json_path = path.rstrip(self.PROGRESS_EXT) + ".json"
self._fs.delete_file(json_path)

return len(objects)
5 changes: 1 addition & 4 deletions python/python/tests/helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from typing import Optional

from lance.fragment import FragmentMetadata
from lance.progress import FragmentWriteProgress
Expand All @@ -13,9 +12,7 @@ def __init__(self):
self.begin_called = 0
self.complete_called = 0

def begin(
self, fragment: FragmentMetadata, multipart_id: Optional[str] = None, **kwargs
):
def begin(self, fragment: FragmentMetadata, **kwargs):
self.begin_called += 1

def complete(self, fragment: FragmentMetadata):
Expand Down
6 changes: 0 additions & 6 deletions python/python/tests/test_fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ def test_dataset_progress(tmp_path: Path):
with open(progress_uri / "fragment_1.in_progress") as f:
progress_data = json.load(f)
assert progress_data["fragment_id"] == 1
assert isinstance(progress_data["multipart_id"], str)
# progress contains custom metadata
assert progress_data["metadata"]["test_key"] == "test_value"

Expand All @@ -226,11 +225,6 @@ def test_dataset_progress(tmp_path: Path):
metadata = json.load(f)
assert metadata["id"] == 1

progress.cleanup_partial_writes(str(dataset_uri))

assert not (progress_uri / "fragment_1.json").exists()
assert not (progress_uri / "fragment_1.in_progress").exists()


def test_fragment_meta():
# Intentionally leaving off column_indices / version fields to make sure
Expand Down
Loading
Loading