Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-based CDK: handle legacy path_prefix + globs #29389

Merged
merged 1 commit into from
Aug 15, 2023

Conversation

clnoll
Copy link
Contributor

@clnoll clnoll commented Aug 13, 2023

When doing some testing I noticed that we weren't always matching the same files in v3 and v4 of the S3 connector.

This is because we weren't quite handling the differences in semantics between having separate path_prefix and path_pattern (from v3) to simply using globs (and extracting a path prefix for more efficient S3 querying), when handling v3 config in v4 code.

The simplest way that I found to honor the legacy config was to keep path_prefix as-is in the v4 stream reader; if it's present we'll use it, but otherwise we'll just infer the prefix from the glob as we have been doing in v4.

I'm mapping path_prefix to legacy_prefix and making it a hidden field because I don't think this is something we should support in v4. It's much cleaner and resolves some ambiguities that users have found confusing - to just have a single place, globs, where we determine which files are matched.

Testing

I used this script to run the tests. Prior to this I ran a separate script to create and populate all the files in the FILEPATHS list in the clnoll-file-based-cdk-test-files bucket (the filepaths, path_prefixes and path_patterns were created from patterns identified in user configs). Please let me know if there are cases you feel like aren't sufficiently covered and I'll add the files and prefixes/patterns.

import json
import os
import tempfile
from logging import Logger
from typing import Any, Dict, List, Optional

import pytest
from source_s3.source import SourceS3 as v3SourceS3
from source_s3.v4.source import SourceS3 as v4SourceS3
from source_s3.v4.stream_reader import SourceS3StreamReader
from source_s3.v4.config import Config

logger = Logger("")

FILEPATHS = [
    "test",
    "/test",
    "/test.csv",
    "/test.json",
    "/test.parquet",
    "/test.txt",
    "/path.csv",
    "/2022-09-12.csv",
    "/test.xlsx",
    "/test.json.gz",
    "/path/test.csv",
    "/path/test.json",
    "/path/2022-09-12.csv",
    "/path/test.xlsx",
    "/path/test.json.gz",
    "/path/nested/test.csv",
    "/path/nested/test.json",
    "/path/nested/2022-09-12.csv",
    "/path/nested/with.dot.csv",
    "/path/nested/test.xlsx",
    "/path/nested/test.json.gz",
    "/path/nested/with spaces/test.csv",
    "/path/nested/special=01/test.csv",
    "/path/nested/special=01/and.dot/test.csv",
    "/path/nested/special=01/and.dot/test.parquet",
    "/path/nested/special=01/and.dot/other/test.parquet",
    "/path/nested/doubleneested/test.csv",
    "/path/nested/doubleneested/test.json",
    "/path/nested/doubleneested/test.xlsx",
    "/path/nested/doubleneested/test.json.gz",
    "/path/nested/doubleneested/with.dot/test.csv",
    "/path/nested/doubleneested/with.dot/test.json",
    "/path/nested/doubleneested/with.dot/test.xlsx",
    "/path/nested/doubleneested/with.dot/test.json.gz",
    "/path/nested/doubleneested/with.dot/other/test.parquet",
    "/path with spaces/test.csv",
    "/path with spaces/test.json",
    "/path with spaces/nested with space/test.csv",
    "/path with spaces/nested with space/test.json",
    "/path with spaces/nested with space/other/test.csv",
    "/path with spaces/nested with space/other/test.json",
    "/path=special/nested=test/internal/test.parquet",
    "/path=special/nested=test/internal/other/test.parquet",
    "/test.(mds).csv",
    "/path/test.(mds).csv",
    "/path/nested/test.(mds).csv",
    "/path/nested/doubleneested/test.(mds).csv",
    "/path with spaces/nested with space/test.(mds).csv",
    "/test.xlsx.md",
    "/test.xlsx.gz",
    "/test.xlsx.parquet",
    "/path/test.xlsx.md",
    "/path/test.xlsx.gz",
    "/path/test.xlsx.parquet",
    "/path/nested/test.xlsx.md",
    "/path/nested/test.xlsx.gz",
    "/path/nested/test.xlsx.parquet",
    "/path/nested/doubleneested/test.xlsx.md",
    "/path/nested/doubleneested/test.xlsx.gz",
    "/path/nested/doubleneested/test.xlsx.parquet",
    "/path with spaces/nested with space/test.xlsx.md",
    "/path with spaces/nested with space/test.xlsx.gz",
    "/path with spaces/nested with space/test.xlsx.parquet",
    "test.csv",
    "test.json",
    "test.parquet",
    "test.txt",
    "path.csv",
    "2022-09-12.csv",
    "test.xlsx",
    "test.json.gz",
    "'path in quotes'/test.csv",
    '"path in quotes"/test2.csv',
    "path/test.csv",
    "path/test.json",
    "path/2022-09-12.csv",
    "path/test.xlsx",
    "path/test.json.gz",
    "path/nested/test.csv",
    "path/nested/test.json",
    "path/nested/2022-09-12.csv",
    "path/nested/with.dot.csv",
    "path/nested/test.xlsx",
    "path/nested/test.json.gz",
    "path/nested/with spaces/test.csv",
    "path/nested/special=01/test.csv",
    "path/nested/special=01/and.dot/test.csv",
    "path/nested/special=01/and.dot/test.parquet",
    "path/nested/special=01/and.dot/other/test.parquet",
    "path/nested/doubleneested/test.csv",
    "path/nested/doubleneested/test.json",
    "path/nested/doubleneested/test.xlsx",
    "path/nested/doubleneested/test.json.gz",
    "path/nested/doubleneested/with.dot/test.csv",
    "path/nested/doubleneested/with.dot/test.json",
    "path/nested/doubleneested/with.dot/test.xlsx",
    "path/nested/doubleneested/with.dot/test.json.gz",
    "path/nested/doubleneested/with.dot/other/test.parquet",
    "path with spaces/test.csv",
    "path with spaces/test.json",
    "path with spaces/nested with space/test.csv",
    "path with spaces/nested with space/test.json",
    "path with spaces/nested with space/other/test.csv",
    "path with spaces/nested with space/other/test.json",
    "path=special/nested=test/internal/test.parquet",
    "path=special/nested=test/internal/other/test.parquet",
    "test.(mds).csv",
    "path/test.(mds).csv",
    "path/nested/test.(mds).csv",
    "path/nested/doubleneested/test.(mds).csv",
    "path with spaces/nested with space/test.(mds).csv",
    "test.xlsx.md",
    "test.xlsx.gz",
    "test.xlsx.parquet",
    "path/test.xlsx.md",
    "path/test.xlsx.gz",
    "path/test.xlsx.parquet",
    "path/nested/test.xlsx.md",
    "path/nested/test.xlsx.gz",
    "path/nested/test.xlsx.parquet",
    "path/nested/doubleneested/test.xlsx.md",
    "path/nested/doubleneested/test.xlsx.gz",
    "path/nested/doubleneested/test.xlsx.parquet",
    "path with space/nested with space/test.xlsx.md",
    "path with space/nested with space/test.xlsx.gz",
    "path with space/nested with space/test.xlsx.parquet",
    "/mytestbucket/test.csv",
    "/mytestbucket/test.json",
    "/mytestbucket/test.parquet",
    "/mytestbucket/test.txt",
    "/mytestbucket/path.csv",
    "/mytestbucket/2022-09-12.csv",
    "/mytestbucket/test.xlsx",
    "/mytestbucket/test.json.gz",
    "/mytestbucket/path/test.csv",
    "/mytestbucket/path/test.json",
    "/mytestbucket/path/2022-09-12.csv",
    "/mytestbucket/path/test.xlsx",
    "/mytestbucket/path/test.json.gz",
    "/mytestbucket/path/nested/test.csv",
    "/mytestbucket/path/nested/test.json",
    "/mytestbucket/path/nested/2022-09-12.csv",
    "/mytestbucket/path/nested/with.dot.csv",
    "/mytestbucket/path/nested/test.xlsx",
    "/mytestbucket/path/nested/test.json.gz",
    "/mytestbucket/path/nested/with spaces/test.csv",
    "/mytestbucket/path/nested/special=01/test.csv",
    "/mytestbucket/path/nested/special=01/and.dot/test.csv",
    "/mytestbucket/path/nested/special=01/and.dot/test.parquet",
    "/mytestbucket/path/nested/special=01/and.dot/other/test.parquet",
    "/mytestbucket/path/nested/doubleneested/test.csv",
    "/mytestbucket/path/nested/doubleneested/test.json",
    "/mytestbucket/path/nested/doubleneested/test.xlsx",
    "/mytestbucket/path/nested/doubleneested/test.json.gz",
    "/mytestbucket/path/nested/doubleneested/with.dot/test.csv",
    "/mytestbucket/path/nested/doubleneested/with.dot/test.json",
    "/mytestbucket/path/nested/doubleneested/with.dot/test.xlsx",
    "/mytestbucket/path/nested/doubleneested/with.dot/test.json.gz",
    "/mytestbucket/path/nested/doubleneested/with.dot/other/test.parquet",
    "/mytestbucket/path with space/test.csv",
    "/mytestbucket/path with space/test.json",
    "/mytestbucket/path with space/nested with space/test.csv",
    "/mytestbucket/path with space/nested with space/test.json",
    "/mytestbucket/path with space/nested with space/other/test.csv",
    "/mytestbucket/path with space/nested with space/other/test.json",
    "/mytestbucket/path=special/nested=test/internal/test.parquet",
    "/mytestbucket/path=special/nested=test/internal/other/test.parquet",
    "/mytestbucket/test.(mds).csv",
    "/mytestbucket/path/test.(mds).csv",
    "/mytestbucket/path/nested/test.(mds).csv",
    "/mytestbucket/path/nested/doubleneested/test.(mds).csv",
    "/mytestbucket/path with space/nested with space/test.(mds).csv",
    "/mytestbucket/test.xlsx.md",
    "/mytestbucket/test.xlsx.gz",
    "/mytestbucket/test.xlsx.parquet",
    "/mytestbucket/path/test.xlsx.md",
    "/mytestbucket/path/test.xlsx.gz",
    "/mytestbucket/path/test.xlsx.parquet",
    "/mytestbucket/path/nested/test.xlsx.md",
    "/mytestbucket/path/nested/test.xlsx.gz",
    "/mytestbucket/path/nested/test.xlsx.parquet",
    "/mytestbucket/path/nested/doubleneested/test.xlsx.md",
    "/mytestbucket/path/nested/doubleneested/test.xlsx.gz",
    "/mytestbucket/path/nested/doubleneested/test.xlsx.parquet",
    "/mytestbucket/path with space/nested with space/test.xlsx.md",
    "/mytestbucket/path with space/nested with space/test.xlsx.gz",
    "/mytestbucket/path with space/nested with space/test.xlsx.parquet",
]


def get_v3_matched_files(config_path: str) -> List[str]:
    source = v3SourceS3()
    [stream] = source.streams(source.read_config(config_path))
    return [f.key for f in stream.get_time_ordered_file_infos()]


def get_v4_matched_files(config_path: str) -> List[str]:
    stream_reader = SourceS3StreamReader()
    source = v4SourceS3(stream_reader, Config)
    [stream] = source.streams(source.read_config(config_path))
    uris = [f.uri for f in stream_reader.get_matching_files(stream.config.globs, stream.config.legacy_prefix, logger)]
    return uris


def write_file(config: Dict[str, Any]) -> str:
    with tempfile.NamedTemporaryFile(dir="/tmp", delete=False, mode="w") as file:
        json.dump(config, file)
    return file.name


path_prefixes = [
    "",
    "\\",
    "/",
    "path",
    "path.csv",
    "path/filepath.csv",
    "path/filepath with space.csv",
    "/path",
    "/path/",
    "/path/nested/",
    "path/nested",
    "path/",
    "path/nested/doubleneested",
    "path/nested/doubleneested/",
    "path/nested/*/*/*/",
    "**/*.csv.gz",
    "path/nestedpath/*.csv",
    "'path in quotes'/",
    "path with space/nested with space/",
    "path with space/nested with space",
    "path/nested/double.nested/path=1/",
    "path/nested/double.nested/path=1",
    "/path+specialchar/file.csv",
    "/path/nested/with.dot",
    "path.with.dot/nested/with.dot",
    "s3://clnoll-test-files/path.csv",
]

patterns = [
    "",
    "*",
    "**/**",
    "mytestbucket*",
    "*.csv",
    "**/*.csv",
    "**/path with spaces*.csv",
    "'path in quotes'",
    '"path in quotes"',
    "path/*.csv|path/*.csv.gz",
    "path/nested/**/test*|!path/nested/doubleneested/**",
    "s3://clnoll-test-files/*.csv",
]


params = []
for prefix in path_prefixes:
    for pattern in patterns:
        params.append(pytest.param(prefix, pattern, id=f"prefix='{prefix}' pattern='{pattern}'"))


@pytest.mark.parametrize("path_prefix,path_pattern", params)
def test(path_prefix: Optional[str], path_pattern: str):
    config = {
        "dataset": "test",
        "provider": {
            "storage": "S3",
            "bucket": "clnoll-test-files",
            "aws_access_key_id": os.environ["AWS_ACCESS_KEY_ID"],
            "aws_secret_access_key": os.environ["AWS_SECRET_ACCESS_KEY"],
            "path_prefix": path_prefix,
        },
        "format": {
            "filetype": "csv",
            "advanced_options": "{}",
            "encoding": "utf8"
        },
        "path_pattern": path_pattern,
        "schema": "{}",
    }
    config_path = write_file(config)
    v3 = get_v3_matched_files(config_path)
    v4 = get_v4_matched_files(config_path)
    assert sorted(v3) == sorted(v4)

@clnoll clnoll requested a review from a team as a code owner August 13, 2023 02:48
@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Aug 13, 2023
Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases and the code change looks good!

Questions:

  1. are we losing any expressivity going from two fields to one?
  2. do we need to document a migration path for users to re-write their config in the v4 format? For instance, if a user wants to update the config of an existing connector configured with the legacy spec

@clnoll
Copy link
Contributor Author

clnoll commented Aug 15, 2023

@girarda to answer your questions - I don't think we're losing expressivity by collapsing 2 fields to 1. In v3, the path prefix is just passed straight to S3, and then the globs are matched against the result. So in some configs there is redundancy between the prefix and the globs, because if you want the glob to match a URI with a particular prefix your glob may have to repeat the prefix. Users can also only specify one prefix in v3, whereas now we allow multiple.

As far as a migration path goes, I think we could script that using the config conversion tools.

@clnoll clnoll force-pushed the file-cdk-v3-v4-file-match-comparison branch from d7dad51 to 7b95e0e Compare August 15, 2023 13:18
@octavia-squidington-iii octavia-squidington-iii removed area/connectors Connector related issues area/documentation Improvements or additions to documentation labels Aug 15, 2023
@clnoll clnoll requested review from girarda and removed request for a team August 15, 2023 13:28
Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

As far as a migration path goes, I think we could script that using the config conversion tools
@clnoll can you write a follow up issue to migrate existing configs to v4?

@clnoll
Copy link
Contributor Author

clnoll commented Aug 15, 2023

Done @girarda - #29435

@clnoll clnoll merged commit 7c1d608 into master Aug 15, 2023
@clnoll clnoll deleted the file-cdk-v3-v4-file-match-comparison branch August 15, 2023 16:18
harrytou pushed a commit to KYVENetwork/airbyte that referenced this pull request Sep 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants