From 091600bdadb1841beda49a10d91d04dd6e56d243 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Mon, 21 Oct 2024 12:49:35 -0400 Subject: [PATCH] materialize-s3-iceberg: fetch table paths concurrently Adds concurrency to fetching table storage paths, since otherwise this can take a really long time if there are lots of enabled bindings. --- materialize-s3-iceberg/catalog.go | 30 ++++++++++++++++--- materialize-s3-iceberg/driver.go | 12 ++++---- .../iceberg-ctl/iceberg_ctl/__main__.py | 20 +++++++++++-- .../iceberg-ctl/poetry.lock | 15 +++++++++- .../iceberg-ctl/pyproject.toml | 1 + 5 files changed, 64 insertions(+), 14 deletions(-) diff --git a/materialize-s3-iceberg/catalog.go b/materialize-s3-iceberg/catalog.go index b258948e9b..46de413497 100644 --- a/materialize-s3-iceberg/catalog.go +++ b/materialize-s3-iceberg/catalog.go @@ -73,13 +73,35 @@ func (c *catalog) infoSchema() (*boilerplate.InfoSchema, error) { return is, nil } -func (c *catalog) tablePath(resourcePath []string) (string, error) { - b, err := runIcebergctl(c.cfg, "table-path", pathToFQN(resourcePath)) +// Table paths returns the registered storage path for each resource path in a +// list having the order corresponding to the input list of resource paths. +func (c *catalog) tablePaths(resourcePaths [][]string) ([]string, error) { + tableNames := make([]string, 0, len(resourcePaths)) + for _, p := range resourcePaths { + tableNames = append(tableNames, pathToFQN(p)) + } + + tableNamesJson, err := json.Marshal(tableNames) + if err != nil { + return nil, err + } + + b, err := runIcebergctl(c.cfg, "table-paths", string(tableNamesJson)) if err != nil { - return "", err + return nil, err + } + + fqnToPath := make(map[string]string) + if err := json.Unmarshal(b, &fqnToPath); err != nil { + return nil, err + } + + out := make([]string, 0, len(resourcePaths)) + for _, p := range resourcePaths { + out = append(out, fqnToPath[pathToFQN(p)]) } - return string(b), nil + return out, nil } func (c *catalog) listNamespaces() ([]string, error) { diff --git a/materialize-s3-iceberg/driver.go b/materialize-s3-iceberg/driver.go index 0fa1f64fd9..b97364eaff 100644 --- a/materialize-s3-iceberg/driver.go +++ b/materialize-s3-iceberg/driver.go @@ -362,13 +362,13 @@ func (d driver) NewTransactor(ctx context.Context, open pm.Request_Open, _ *boil } catalog := newCatalog(cfg, resourcePaths, open.Materialization) - for idx := range bindings { - catalogTablePath, err := catalog.tablePath(bindings[idx].path) - if err != nil { - return nil, nil, nil, fmt.Errorf("getting catalog table path: %w", err) - } + tablePaths, err := catalog.tablePaths(resourcePaths) + if err != nil { + return nil, nil, nil, fmt.Errorf("looking up table paths: %w", err) + } - bindings[idx].catalogTablePath = catalogTablePath + for idx := range bindings { + bindings[idx].catalogTablePath = tablePaths[idx] } s3store, err := filesink.NewS3Store(ctx, filesink.S3StoreConfig{ diff --git a/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py b/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py index 099f14fa4a..cb37e509c0 100644 --- a/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py +++ b/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py @@ -1,3 +1,4 @@ +import asyncio import json from typing import Any, Literal @@ -169,17 +170,30 @@ def info_schema( print(TypeAdapter(dict[str, list[IcebergColumn]]).dump_json(tables).decode()) +async def fetch_table_paths(catalog: Catalog, tables: list[str]) -> dict[str, str]: + sem = asyncio.Semaphore(10) + + async def fetch_table_path(table: str) -> tuple[str, str]: + async with sem: + return (table, catalog.load_table(table).location()) + + return dict(await asyncio.gather( + *(fetch_table_path(table) for table in tables) + )) + + @run.command() @click.pass_context @click.argument("table", type=str) -def table_path( +def table_paths( ctx: Context, - table: str, + tables: str, ): catalog = ctx.obj["catalog"] assert isinstance(catalog, Catalog) - print(catalog.load_table(table).location()) + res = asyncio.run(fetch_table_paths(catalog, TypeAdapter(list[str]).validate_json(tables))) + print(json.dumps(res)) @run.command() diff --git a/materialize-s3-iceberg/iceberg-ctl/poetry.lock b/materialize-s3-iceberg/iceberg-ctl/poetry.lock index b2222b63ed..f063933846 100644 --- a/materialize-s3-iceberg/iceberg-ctl/poetry.lock +++ b/materialize-s3-iceberg/iceberg-ctl/poetry.lock @@ -164,6 +164,19 @@ files = [ {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, ] +[[package]] +name = "asyncio" +version = "3.4.3" +description = "reference implementation of PEP 3156" +optional = false +python-versions = "*" +files = [ + {file = "asyncio-3.4.3-cp33-none-win32.whl", hash = "sha256:b62c9157d36187eca799c378e572c969f0da87cd5fc42ca372d92cdb06e7e1de"}, + {file = "asyncio-3.4.3-cp33-none-win_amd64.whl", hash = "sha256:c46a87b48213d7464f22d9a497b9eef8c1928b68320a2fa94240f969f6fec08c"}, + {file = "asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d"}, + {file = "asyncio-3.4.3.tar.gz", hash = "sha256:83360ff8bc97980e4ff25c964c7bd3923d333d177aa4f7fb736b019f26c7cb41"}, +] + [[package]] name = "attrs" version = "24.2.0" @@ -1598,4 +1611,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "bfdeebd1e6fc22361a17a2af53105de9d6afae99e2257a5c27182bbe6c57d39b" +content-hash = "d801a7453d09a3bbbadd1c495e58d1019c9663056ce967e8d8c3e68e77e67174" diff --git a/materialize-s3-iceberg/iceberg-ctl/pyproject.toml b/materialize-s3-iceberg/iceberg-ctl/pyproject.toml index bef254d245..7eb5baa6cc 100644 --- a/materialize-s3-iceberg/iceberg-ctl/pyproject.toml +++ b/materialize-s3-iceberg/iceberg-ctl/pyproject.toml @@ -10,6 +10,7 @@ python = "^3.11" pyiceberg = {extras = ["glue", "pyarrow", "s3fs"], version = "^0.7.0"} click = "^8.1.7" setuptools = "^70.0.0" +asyncio = "^3.4.3" [tool.poetry.group.dev.dependencies] debugpy = "^1.8.0"