Skip to content

Commit

Permalink
New "Cloud Native" mode for ingesting remote files from a cloud envir…
Browse files Browse the repository at this point in the history
…onment (#467)

* Update SEG-Y import for cloud native support and add test

* Add s3fs dependency to test suite for cloud read test

* Enable multiprocessing with spawn context for cloud safety

* Add docs about buffered reads optimization for SEG-Y ingestion
  • Loading branch information
tasansal authored Nov 21, 2024
1 parent f4a648d commit f904855
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 15 deletions.
29 changes: 27 additions & 2 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,32 @@ Some useful examples are:
- File Buffering and random access
- Mount anything with FUSE

````{note}
#### Buffered Reads in Ingestion

MDIO v0.8.2 introduces the `MDIO__IMPORT__CLOUD_NATIVE` environment variable to optimize
SEG-Y header scans by balancing bandwidth usage with read latency through buffered reads.

**When to Use:** This variable is most effective in high-throughput environments like cloud-based ingestion
systems but can also improve performance for mechanical drives or slow connections.

**How to Enable:** Set the variable to `{"True", "1", "true"}`. For example:

```console
$ export MDIO__IMPORT__CLOUD_NATIVE="true"
```

**How It Works:** Buffered reads minimize millions of remote requests during SEG-Y header scans:

- **Cloud Environments:** Ideal for high-throughput connections between cloud ingestion
machines and object stores.
- **Slow Connections:** Bandwidth is the bottleneck, may be faster without it.
- **Local Reads:** May benefit mechanical drives; SSDs typically perform fine without it.

While buffered reads process the file twice, the tradeoff improves ingestion performance and
reduces object-store request costs.

#### Chaining `fsspec` Protocols

When combining advanced protocols like `simplecache` and using a remote store like `s3` the
URL can be chained like `simplecache::s3://bucket/prefix/file.mdio`. When doing this the
`--storage-options` argument must explicitly state parameters for the cloud backend and the
Expand All @@ -181,10 +206,10 @@ extra protocol. For the above example it would look like this:
```

In one line:

```json
{"s3": {"key": "my_super_private_key", "secret": "my_super_private_secret"}, "simplecache": {"cache_storage": "/custom/temp/storage/path"}
```
````

## CLI Reference

Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def mypy(session: Session) -> None:
def tests(session: Session) -> None:
"""Run the test suite."""
session.install(".")
session.install("coverage[toml]", "pytest", "pygments", "pytest-dependency")
session.install("coverage[toml]", "pytest", "pygments", "pytest-dependency", "s3fs")
try:
session.run("coverage", "run", "--parallel", "-m", "pytest", *session.posargs)
finally:
Expand Down
2 changes: 1 addition & 1 deletion src/mdio/commands/segy.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@


@cli.command(name="import")
@argument("segy-path", type=Path(exists=True))
@argument("segy-path", type=STRING)
@argument("mdio-path", type=STRING)
@option(
"-loc",
Expand Down
10 changes: 9 additions & 1 deletion src/mdio/segy/_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import os
from typing import TYPE_CHECKING
from typing import Any

Expand Down Expand Up @@ -37,7 +38,14 @@ def header_scan_worker(
Returns:
HeaderArray parsed from SEG-Y library.
"""
return segy_file.header[slice(*trace_range)]
slice_ = slice(*trace_range)

cloud_native_mode = os.getenv("MDIO__IMPORT__CLOUD_NATIVE", default="False")

if cloud_native_mode.lower() in {"true", "1"}:
return segy_file.trace[slice_].header

return segy_file.header[slice_]


def trace_worker(
Expand Down
10 changes: 7 additions & 3 deletions src/mdio/segy/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import multiprocessing as mp
import os
from concurrent.futures import ProcessPoolExecutor
from itertools import repeat
Expand Down Expand Up @@ -46,15 +47,18 @@ def parse_index_headers(
trace_ranges = []
for idx in range(n_blocks):
start, stop = idx * block_size, (idx + 1) * block_size
if stop > trace_count:
stop = trace_count
stop = min(stop, trace_count)

trace_ranges.append((start, stop))

# For Unix async reads with s3fs/fsspec & multiprocessing,
# use 'spawn' instead of default 'fork' to avoid deadlocks
# on cloud stores. Slower but necessary. Default on Windows.
num_workers = min(n_blocks, NUM_CPUS)
context = mp.get_context("spawn")

tqdm_kw = dict(unit="block", dynamic_ncols=True)
with ProcessPoolExecutor(num_workers) as executor:
with ProcessPoolExecutor(num_workers, mp_context=context) as executor:
# pool.imap is lazy
lazy_work = executor.map(
header_scan_worker, # fn
Expand Down
11 changes: 8 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ def fake_segy_tmp(tmp_path_factory):


@pytest.fixture(scope="session")
def segy_input(tmp_path_factory):
def segy_input_uri():
"""Path to dome dataset for cloud testing."""
return "http://s3.amazonaws.com/teapot/filt_mig.sgy"


@pytest.fixture(scope="session")
def segy_input(segy_input_uri, tmp_path_factory):
"""Download teapot dome dataset for testing."""
url = "http://s3.amazonaws.com/teapot/filt_mig.sgy"
tmp_dir = tmp_path_factory.mktemp("segy")
tmp_file = path.join(tmp_dir, "teapot.segy")
urlretrieve(url, tmp_file) # noqa: S310
urlretrieve(segy_input_uri, tmp_file) # noqa: S310

return tmp_file

Expand Down
22 changes: 18 additions & 4 deletions tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test cases for the __main__ module."""

import os
from pathlib import Path

import pytest
Expand All @@ -8,18 +9,31 @@
from mdio import __main__


@pytest.fixture()
@pytest.fixture
def runner() -> CliRunner:
"""Fixture for invoking command-line interfaces."""
return CliRunner()


@pytest.mark.dependency()
@pytest.mark.dependency
def test_main_succeeds(runner: CliRunner, segy_input: str, zarr_tmp: Path) -> None:
"""It exits with a status code of zero."""
cli_args = ["segy", "import", segy_input, str(zarr_tmp)]
cli_args.extend(["-loc", "181,185"])
cli_args.extend(["-names", "inline,crossline"])
cli_args.extend(["--header-locations", "181,185"])
cli_args.extend(["--header-names", "inline,crossline"])

result = runner.invoke(__main__.main, args=cli_args)
assert result.exit_code == 0


@pytest.mark.dependency(depends=["test_main_succeeds"])
def test_main_cloud(runner: CliRunner, segy_input_uri: str, zarr_tmp: Path) -> None:
"""It exits with a status code of zero."""
os.environ["MDIO__IMPORT__CLOUD_NATIVE"] = "true"
cli_args = ["segy", "import", str(segy_input_uri), str(zarr_tmp)]
cli_args.extend(["--header-locations", "181,185"])
cli_args.extend(["--header-names", "inline,crossline"])
cli_args.extend(["--overwrite"])

result = runner.invoke(__main__.main, args=cli_args)
assert result.exit_code == 0
Expand Down

0 comments on commit f904855

Please sign in to comment.