Skip to content

Commit

Permalink
materialize-s3-iceberg: fetch table paths concurrently
Browse files Browse the repository at this point in the history
Adds concurrency to fetching table storage paths, since otherwise this can take
a really long time if there are lots of enabled bindings.
  • Loading branch information
williamhbaker committed Oct 21, 2024
1 parent 9aa980f commit 091600b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 14 deletions.
30 changes: 26 additions & 4 deletions materialize-s3-iceberg/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,35 @@ func (c *catalog) infoSchema() (*boilerplate.InfoSchema, error) {
return is, nil
}

func (c *catalog) tablePath(resourcePath []string) (string, error) {
b, err := runIcebergctl(c.cfg, "table-path", pathToFQN(resourcePath))
// Table paths returns the registered storage path for each resource path in a
// list having the order corresponding to the input list of resource paths.
func (c *catalog) tablePaths(resourcePaths [][]string) ([]string, error) {
tableNames := make([]string, 0, len(resourcePaths))
for _, p := range resourcePaths {
tableNames = append(tableNames, pathToFQN(p))
}

tableNamesJson, err := json.Marshal(tableNames)
if err != nil {
return nil, err
}

b, err := runIcebergctl(c.cfg, "table-paths", string(tableNamesJson))
if err != nil {
return "", err
return nil, err
}

fqnToPath := make(map[string]string)
if err := json.Unmarshal(b, &fqnToPath); err != nil {
return nil, err
}

out := make([]string, 0, len(resourcePaths))
for _, p := range resourcePaths {
out = append(out, fqnToPath[pathToFQN(p)])
}

return string(b), nil
return out, nil
}

func (c *catalog) listNamespaces() ([]string, error) {
Expand Down
12 changes: 6 additions & 6 deletions materialize-s3-iceberg/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,13 @@ func (d driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boil
}

catalog := newCatalog(cfg, resourcePaths, open.Materialization)
for idx := range bindings {
catalogTablePath, err := catalog.tablePath(bindings[idx].path)
if err != nil {
return nil, nil, nil, fmt.Errorf("getting catalog table path: %w", err)
}
tablePaths, err := catalog.tablePaths(resourcePaths)
if err != nil {
return nil, nil, nil, fmt.Errorf("looking up table paths: %w", err)
}

bindings[idx].catalogTablePath = catalogTablePath
for idx := range bindings {
bindings[idx].catalogTablePath = tablePaths[idx]
}

s3store, err := filesink.NewS3Store(ctx, filesink.S3StoreConfig{
Expand Down
20 changes: 17 additions & 3 deletions materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
from typing import Any, Literal

Expand Down Expand Up @@ -169,17 +170,30 @@ def info_schema(
print(TypeAdapter(dict[str, list[IcebergColumn]]).dump_json(tables).decode())


async def fetch_table_paths(catalog: Catalog, tables: list[str]) -> dict[str, str]:
sem = asyncio.Semaphore(10)

async def fetch_table_path(table: str) -> tuple[str, str]:
async with sem:
return (table, catalog.load_table(table).location())

return dict(await asyncio.gather(
*(fetch_table_path(table) for table in tables)
))


@run.command()
@click.pass_context
@click.argument("table", type=str)
def table_path(
def table_paths(
ctx: Context,
table: str,
tables: str,
):
catalog = ctx.obj["catalog"]
assert isinstance(catalog, Catalog)

print(catalog.load_table(table).location())
res = asyncio.run(fetch_table_paths(catalog, TypeAdapter(list[str]).validate_json(tables)))
print(json.dumps(res))


@run.command()
Expand Down
15 changes: 14 additions & 1 deletion materialize-s3-iceberg/iceberg-ctl/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions materialize-s3-iceberg/iceberg-ctl/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ python = "^3.11"
pyiceberg = {extras = ["glue", "pyarrow", "s3fs"], version = "^0.7.0"}
click = "^8.1.7"
setuptools = "^70.0.0"
asyncio = "^3.4.3"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
Expand Down

0 comments on commit 091600b

Please sign in to comment.