Skip to content

Commit

Permalink
Source S3: speed up discovery (#22500)
Browse files Browse the repository at this point in the history
* #1470 source S3: speed up discovery

* #1470 source s3: upd changelog

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Feb 9, 2023
1 parent d82e01a commit 3dc79f5
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.31
dockerImageTag: 0.1.32
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12775,7 +12775,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.31"
- dockerImage: "airbyte/source-s3:0.1.32"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.31
LABEL io.airbyte.version=0.1.32
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#


import concurrent.futures
import json
import threading
from abc import ABC, abstractmethod
from copy import deepcopy
from datetime import datetime, timedelta
Expand All @@ -29,6 +31,7 @@
JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"]

LOGGER = AirbyteLogger()
LOCK = threading.Lock()


class ConfigurationError(Exception):
Expand All @@ -52,6 +55,7 @@ def fileformatparser_map(self) -> Mapping[str, type]:
ab_file_name_col = "_ab_source_file_url"
airbyte_columns = [ab_additional_col, ab_last_mod_col, ab_file_name_col]
datetime_format_string = "%Y-%m-%dT%H:%M:%S%z"
parallel_tasks_size = 256

def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str, schema: str = None):
"""
Expand Down Expand Up @@ -202,6 +206,17 @@ def _broadest_type(type_1: str, type_2: str) -> Optional[str]:
if types == {"number", "string"}:
return "string"

@staticmethod
def guess_file_schema(storage_file, file_reader, file_info, processed_files, schemas):
try:
with storage_file.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f, file_info)
with LOCK:
schemas[file_info] = this_schema
processed_files.append(file_info)
except OSError:
pass

def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
"""
In order to auto-infer a schema across many files and/or allow for additional properties (columns),
Expand All @@ -224,19 +239,25 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
file_reader = self.fileformatparser_class(self._format)

processed_files = []
for file_info in self.get_time_ordered_file_infos():
# skip this file if it's earlier than min_datetime
if (min_datetime is not None) and (file_info.last_modified < min_datetime):
continue

storagefile = self.storagefile_class(file_info, self._provider)
try:
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f, file_info)
processed_files.append(file_info)
except OSError:
continue

schemas = {}

file_infos = list(self.get_time_ordered_file_infos())
if min_datetime is not None:
file_infos = [info for info in file_infos if info.last_modified >= min_datetime]

for i in range(0, len(file_infos), self.parallel_tasks_size):
chunk_infos = file_infos[i : i + self.parallel_tasks_size]
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(
lambda args: self.guess_file_schema(*args),
[
(self.storagefile_class(file_info, self._provider), file_reader, file_info, processed_files, schemas)
for file_info in chunk_infos
],
)

for file_info in file_infos:
this_schema = schemas[file_info]
if this_schema == master_schema:
continue # exact schema match so go to next file

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------|
| 0.1.32 | 2023-02-07 | [22500](https://github.com/airbytehq/airbyte/pull/22500) | Speed up discovery |
| 0.1.31 | 2023-02-08 | [22550](https://github.com/airbytehq/airbyte/pull/22550) | Validate CSV read options and convert options |
| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI |
| 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer |
Expand Down

0 comments on commit 3dc79f5

Please sign in to comment.