diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/csv_format.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/csv_format.py
index 1c93636f66f7..1fda1016a00c 100644
--- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/csv_format.py
+++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/csv_format.py
@@ -4,9 +4,9 @@
import codecs
from enum import Enum
-from typing import Optional
+from typing import Any, Mapping, Optional, Set
-from pydantic import BaseModel, Field, validator
+from pydantic import BaseModel, Field, root_validator, validator
from typing_extensions import Literal
@@ -17,6 +17,10 @@ class QuotingBehavior(Enum):
QUOTE_NONE = "Quote None"
+DEFAULT_TRUE_VALUES = ["y", "yes", "t", "true", "on", "1"]
+DEFAULT_FALSE_VALUES = ["n", "no", "f", "false", "off", "0"]
+
+
class CsvFormat(BaseModel):
filetype: Literal["csv"] = "csv"
delimiter: str = Field(
@@ -46,10 +50,34 @@ class CsvFormat(BaseModel):
default=QuotingBehavior.QUOTE_SPECIAL_CHARACTERS,
description="The quoting behavior determines when a value in a row should have quote marks added around it. For example, if Quote Non-numeric is specified, while reading, quotes are expected for row values that do not contain numbers. Or for Quote All, every row value will be expecting quotes.",
)
-
- # Noting that the existing S3 connector had a config option newlines_in_values. This was only supported by pyarrow and not
- # the Python csv package. It has a little adoption, but long term we should ideally phase this out because of the drawbacks
- # of using pyarrow
+ null_values: Set[str] = Field(
+ title="Null Values",
+ default=[],
+ description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
+ )
+ skip_rows_before_header: int = Field(
+ title="Skip Rows Before Header",
+ default=0,
+ description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
+ )
+ skip_rows_after_header: int = Field(
+ title="Skip Rows After Header", default=0, description="The number of rows to skip after the header row."
+ )
+ autogenerate_column_names: bool = Field(
+ title="Autogenerate Column Names",
+ default=False,
+ description="Whether to autogenerate column names if column_names is empty. If true, column names will be of the form “f0”, “f1”… If false, column names will be read from the first CSV row after skip_rows_before_header.",
+ )
+ true_values: Set[str] = Field(
+ title="True Values",
+ default=DEFAULT_TRUE_VALUES,
+ description="A set of case-sensitive strings that should be interpreted as true values.",
+ )
+ false_values: Set[str] = Field(
+ title="False Values",
+ default=DEFAULT_FALSE_VALUES,
+ description="A set of case-sensitive strings that should be interpreted as false values.",
+ )
@validator("delimiter")
def validate_delimiter(cls, v: str) -> str:
@@ -78,3 +106,11 @@ def validate_encoding(cls, v: str) -> str:
except LookupError:
raise ValueError(f"invalid encoding format: {v}")
return v
+
+ @root_validator
+ def validate_option_combinations(cls, values: Mapping[str, Any]) -> Mapping[str, Any]:
+ skip_rows_before_header = values.get("skip_rows_before_header", 0)
+ auto_generate_column_names = values.get("autogenerate_column_names", False)
+ if skip_rows_before_header > 0 and auto_generate_column_names:
+ raise ValueError("Cannot skip rows before header and autogenerate column names at the same time.")
+ return values
diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py
index 62594e429a3c..479402877272 100644
--- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py
+++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py
@@ -5,12 +5,13 @@
import csv
import json
import logging
-from distutils.util import strtobool
-from typing import Any, Dict, Iterable, Mapping, Optional
+from functools import partial
+from io import IOBase
+from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Set
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, QuotingBehavior
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
-from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError
+from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
@@ -34,30 +35,25 @@ async def infer_schema(
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Dict[str, Any]:
- config_format = config.format.get(config.file_type) if config.format else None
- if config_format:
- if not isinstance(config_format, CsvFormat):
- raise ValueError(f"Invalid format config: {config_format}")
- dialect_name = config.name + DIALECT_NAME
- csv.register_dialect(
- dialect_name,
- delimiter=config_format.delimiter,
- quotechar=config_format.quote_char,
- escapechar=config_format.escape_char,
- doublequote=config_format.double_quote,
- quoting=config_to_quoting.get(config_format.quoting_behavior, csv.QUOTE_MINIMAL),
- )
- with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
- # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
- # sources will likely require one. Rather than modify the interface now we can wait until the real use case
- reader = csv.DictReader(fp, dialect=dialect_name) # type: ignore
- schema = {field.strip(): {"type": "string"} for field in next(reader)}
- csv.unregister_dialect(dialect_name)
- return schema
- else:
- with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
- reader = csv.DictReader(fp) # type: ignore
- return {field.strip(): {"type": "string"} for field in next(reader)}
+ config_format = config.format.get(config.file_type) if config.format else CsvFormat()
+ if not isinstance(config_format, CsvFormat):
+ raise ValueError(f"Invalid format config: {config_format}")
+ dialect_name = config.name + DIALECT_NAME
+ csv.register_dialect(
+ dialect_name,
+ delimiter=config_format.delimiter,
+ quotechar=config_format.quote_char,
+ escapechar=config_format.escape_char,
+ doublequote=config_format.double_quote,
+ quoting=config_to_quoting.get(config_format.quoting_behavior, csv.QUOTE_MINIMAL),
+ )
+ with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
+ # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
+ # sources will likely require one. Rather than modify the interface now we can wait until the real use case
+ headers = self._get_headers(fp, config_format, dialect_name)
+ schema = {field.strip(): {"type": "string"} for field in headers}
+ csv.unregister_dialect(dialect_name)
+ return schema
def parse_records(
self,
@@ -67,30 +63,28 @@ def parse_records(
logger: logging.Logger,
) -> Iterable[Dict[str, Any]]:
schema: Mapping[str, Any] = config.input_schema # type: ignore
- config_format = config.format.get(config.file_type) if config.format else None
- if config_format:
- if not isinstance(config_format, CsvFormat):
- raise ValueError(f"Invalid format config: {config_format}")
- # Formats are configured individually per-stream so a unique dialect should be registered for each stream.
- # Wwe don't unregister the dialect because we are lazily parsing each csv file to generate records
- dialect_name = config.name + DIALECT_NAME
- csv.register_dialect(
- dialect_name,
- delimiter=config_format.delimiter,
- quotechar=config_format.quote_char,
- escapechar=config_format.escape_char,
- doublequote=config_format.double_quote,
- quoting=config_to_quoting.get(config_format.quoting_behavior, csv.QUOTE_MINIMAL),
- )
- with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
- # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
- # sources will likely require one. Rather than modify the interface now we can wait until the real use case
- reader = csv.DictReader(fp, dialect=dialect_name) # type: ignore
- yield from self._read_and_cast_types(reader, schema, logger)
- else:
- with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
- reader = csv.DictReader(fp) # type: ignore
- yield from self._read_and_cast_types(reader, schema, logger)
+ config_format = config.format.get(config.file_type) if config.format else CsvFormat()
+ if not isinstance(config_format, CsvFormat):
+ raise ValueError(f"Invalid format config: {config_format}")
+ # Formats are configured individually per-stream so a unique dialect should be registered for each stream.
+ # We don't unregister the dialect because we are lazily parsing each csv file to generate records
+ # This will potentially be a problem if we ever process multiple streams concurrently
+ dialect_name = config.name + DIALECT_NAME
+ csv.register_dialect(
+ dialect_name,
+ delimiter=config_format.delimiter,
+ quotechar=config_format.quote_char,
+ escapechar=config_format.escape_char,
+ doublequote=config_format.double_quote,
+ quoting=config_to_quoting.get(config_format.quoting_behavior, csv.QUOTE_MINIMAL),
+ )
+ with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
+ # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
+ # sources will likely require one. Rather than modify the interface now we can wait until the real use case
+ self._skip_rows_before_header(fp, config_format.skip_rows_before_header)
+ field_names = self._auto_generate_headers(fp, config_format) if config_format.autogenerate_column_names else None
+ reader = csv.DictReader(fp, dialect=dialect_name, fieldnames=field_names) # type: ignore
+ yield from self._read_and_cast_types(reader, schema, config_format, logger)
@property
def file_read_mode(self) -> FileReadMode:
@@ -98,7 +92,7 @@ def file_read_mode(self) -> FileReadMode:
@staticmethod
def _read_and_cast_types(
- reader: csv.DictReader, schema: Optional[Mapping[str, Any]], logger: logging.Logger # type: ignore
+ reader: csv.DictReader, schema: Optional[Mapping[str, Any]], config_format: CsvFormat, logger: logging.Logger # type: ignore
) -> Iterable[Dict[str, Any]]:
"""
If the user provided a schema, attempt to cast the record values to the associated type.
@@ -107,16 +101,65 @@ def _read_and_cast_types(
cast it to a string. Downstream, the user's validation policy will determine whether the
record should be emitted.
"""
- if not schema:
- yield from reader
+ cast_fn = CsvParser._get_cast_function(schema, config_format, logger)
+ for i, row in enumerate(reader):
+ if i < config_format.skip_rows_after_header:
+ continue
+ # The row was not properly parsed if any of the values are None
+ if any(val is None for val in row.values()):
+ raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD)
+ else:
+ yield CsvParser._to_nullable(cast_fn(row), config_format.null_values)
- else:
+ @staticmethod
+ def _get_cast_function(
+ schema: Optional[Mapping[str, Any]], config_format: CsvFormat, logger: logging.Logger
+ ) -> Callable[[Mapping[str, str]], Mapping[str, str]]:
+ # Only cast values if the schema is provided
+ if schema:
property_types = {col: prop["type"] for col, prop in schema["properties"].items()}
- for row in reader:
- yield cast_types(row, property_types, logger)
+ return partial(_cast_types, property_types=property_types, config_format=config_format, logger=logger)
+ else:
+ # If no schema is provided, yield the rows as they are
+ return _no_cast
+
+ @staticmethod
+ def _to_nullable(row: Mapping[str, str], null_values: Set[str]) -> Dict[str, Optional[str]]:
+ nullable = row | {k: None if v in null_values else v for k, v in row.items()}
+ return nullable
+
+ @staticmethod
+ def _skip_rows_before_header(fp: IOBase, rows_to_skip: int) -> None:
+ """
+ Skip rows before the header. This has to be done on the file object itself, not the reader
+ """
+ for _ in range(rows_to_skip):
+ fp.readline()
+
+ def _get_headers(self, fp: IOBase, config_format: CsvFormat, dialect_name: str) -> List[str]:
+ # Note that this method assumes the dialect has already been registered if we're parsing the headers
+ if config_format.autogenerate_column_names:
+ return self._auto_generate_headers(fp, config_format)
+ else:
+ # If we're not autogenerating column names, we need to skip the rows before the header
+ self._skip_rows_before_header(fp, config_format.skip_rows_before_header)
+ # Then read the header
+ reader = csv.DictReader(fp, dialect=dialect_name) # type: ignore
+ return next(reader) # type: ignore
+ def _auto_generate_headers(self, fp: IOBase, config_format: CsvFormat) -> List[str]:
+ """
+ Generates field names as [f0, f1, ...] in the same way as pyarrow's csv reader with autogenerate_column_names=True.
+ See https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html
+ """
+ next_line = next(fp).strip()
+ number_of_columns = len(next_line.split(config_format.delimiter)) # type: ignore
+ # Reset the file pointer to the beginning of the file so that the first row is not skipped
+ fp.seek(0)
+ return [f"f{i}" for i in range(number_of_columns)]
-def cast_types(row: Dict[str, str], property_types: Dict[str, Any], logger: logging.Logger) -> Dict[str, Any]:
+
+def _cast_types(row: Dict[str, str], property_types: Dict[str, Any], config_format: CsvFormat, logger: logging.Logger) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
@@ -142,7 +185,7 @@ def cast_types(row: Dict[str, str], property_types: Dict[str, Any], logger: logg
elif python_type == bool:
try:
- cast_value = strtobool(value)
+ cast_value = _value_to_bool(value, config_format.true_values, config_format.false_values)
except ValueError:
warnings.append(_format_warning(key, value, prop_type))
@@ -178,5 +221,17 @@ def cast_types(row: Dict[str, str], property_types: Dict[str, Any], logger: logg
return result
+def _value_to_bool(value: str, true_values: Set[str], false_values: Set[str]) -> bool:
+ if value in true_values:
+ return True
+ if value in false_values:
+ return False
+ raise ValueError(f"Value {value} is not a valid boolean value")
+
+
def _format_warning(key: str, value: str, expected_type: Optional[Any]) -> str:
return f"{key}: value={value},expected_type={expected_type}"
+
+
+def _no_cast(row: Mapping[str, str]) -> Mapping[str, str]:
+ return row
diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
index e31d841d6f7a..76093016e2d5 100644
--- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
+++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
@@ -15,6 +15,7 @@
FileBasedSourceError,
InvalidSchemaError,
MissingSchemaError,
+ RecordParseError,
SchemaInferenceError,
StopSyncPerValidationPolicy,
)
@@ -105,6 +106,18 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping
)
break
+ except RecordParseError:
+ # Increment line_no because the exception was raised before we could increment it
+ line_no += 1
+ yield AirbyteMessage(
+ type=MessageType.LOG,
+ log=AirbyteLogMessage(
+ level=Level.ERROR,
+ message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
+ stack_trace=traceback.format_exc(),
+ ),
+ )
+
except Exception:
yield AirbyteMessage(
type=MessageType.LOG,
diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/config/test_csv_format.py b/airbyte-cdk/python/unit_tests/sources/file_based/config/test_csv_format.py
new file mode 100644
index 000000000000..6903f126af30
--- /dev/null
+++ b/airbyte-cdk/python/unit_tests/sources/file_based/config/test_csv_format.py
@@ -0,0 +1,23 @@
+#
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+#
+
+import pytest as pytest
+from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
+
+
+@pytest.mark.parametrize(
+ "skip_rows_before_header, autogenerate_column_names, expected_error",
+ [
+ pytest.param(1, True, ValueError, id="test_skip_rows_before_header_and_autogenerate_column_names"),
+ pytest.param(1, False, None, id="test_skip_rows_before_header_and_no_autogenerate_column_names"),
+ pytest.param(0, True, None, id="test_no_skip_rows_before_header_and_autogenerate_column_names"),
+ pytest.param(0, False, None, id="test_no_skip_rows_before_header_and_no_autogenerate_column_names"),
+ ]
+)
+def test_csv_format(skip_rows_before_header, autogenerate_column_names, expected_error):
+ if expected_error:
+ with pytest.raises(expected_error):
+ CsvFormat(skip_rows_before_header=skip_rows_before_header, autogenerate_column_names=autogenerate_column_names)
+ else:
+ CsvFormat(skip_rows_before_header=skip_rows_before_header, autogenerate_column_names=autogenerate_column_names)
diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_csv_parser.py b/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_csv_parser.py
index 746ea7671817..1d2079396383 100644
--- a/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_csv_parser.py
+++ b/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_csv_parser.py
@@ -3,9 +3,12 @@
#
import logging
+from unittest.mock import MagicMock, Mock
import pytest
-from airbyte_cdk.sources.file_based.file_types.csv_parser import cast_types
+from airbyte_cdk.sources.file_based.config.csv_format import DEFAULT_FALSE_VALUES, DEFAULT_TRUE_VALUES, CsvFormat
+from airbyte_cdk.sources.file_based.exceptions import RecordParseError
+from airbyte_cdk.sources.file_based.file_types.csv_parser import CsvParser, _cast_types
PROPERTY_TYPES = {
"col1": "null",
@@ -23,7 +26,7 @@
@pytest.mark.parametrize(
- "row,expected_output",
+ "row, true_values, false_values, expected_output",
[
pytest.param(
{
@@ -36,7 +39,10 @@
"col7": '[1, 2]',
"col8": '["1", "2"]',
"col9": '[{"a": "b"}, {"a": "c"}]',
- }, {
+ },
+ DEFAULT_TRUE_VALUES,
+ DEFAULT_FALSE_VALUES,
+ {
"col1": None,
"col2": True,
"col3": 1,
@@ -47,20 +53,46 @@
"col8": ["1", "2"],
"col9": [{"a": "b"}, {"a": "c"}],
}, id="cast-all-cols"),
- pytest.param({"col1": "1"}, {"col1": "1"}, id="cannot-cast-to-null"),
- pytest.param({"col2": "1"}, {"col2": True}, id="cast-1-to-bool"),
- pytest.param({"col2": "0"}, {"col2": False}, id="cast-0-to-bool"),
- pytest.param({"col2": "yes"}, {"col2": True}, id="cast-yes-to-bool"),
- pytest.param({"col2": "no"}, {"col2": False}, id="cast-no-to-bool"),
- pytest.param({"col2": "10"}, {"col2": "10"}, id="cannot-cast-to-bool"),
- pytest.param({"col3": "1.1"}, {"col3": "1.1"}, id="cannot-cast-to-int"),
- pytest.param({"col4": "asdf"}, {"col4": "asdf"}, id="cannot-cast-to-float"),
- pytest.param({"col6": "{'a': 'b'}"}, {"col6": "{'a': 'b'}"}, id="cannot-cast-to-dict"),
- pytest.param({"col7": "['a', 'b']"}, {"col7": "['a', 'b']"}, id="cannot-cast-to-list-of-ints"),
- pytest.param({"col8": "['a', 'b']"}, {"col8": "['a', 'b']"}, id="cannot-cast-to-list-of-strings"),
- pytest.param({"col9": "['a', 'b']"}, {"col9": "['a', 'b']"}, id="cannot-cast-to-list-of-objects"),
- pytest.param({"col10": "x"}, {"col10": "x"}, id="item-not-in-props-doesn't-error"),
+ pytest.param({"col1": "1"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col1": "1"}, id="cannot-cast-to-null"),
+ pytest.param({"col2": "1"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col2": True}, id="cast-1-to-bool"),
+ pytest.param({"col2": "0"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col2": False}, id="cast-0-to-bool"),
+ pytest.param({"col2": "yes"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col2": True}, id="cast-yes-to-bool"),
+ pytest.param({"col2": "this_is_a_true_value"}, ["this_is_a_true_value"], DEFAULT_FALSE_VALUES, {"col2": True}, id="cast-custom-true-value-to-bool"),
+ pytest.param({"col2": "this_is_a_false_value"}, DEFAULT_TRUE_VALUES, ["this_is_a_false_value"], {"col2": False}, id="cast-custom-false-value-to-bool"),
+ pytest.param({"col2": "no"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col2": False}, id="cast-no-to-bool"),
+ pytest.param({"col2": "10"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col2": "10"}, id="cannot-cast-to-bool"),
+ pytest.param({"col3": "1.1"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col3": "1.1"}, id="cannot-cast-to-int"),
+ pytest.param({"col4": "asdf"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col4": "asdf"}, id="cannot-cast-to-float"),
+ pytest.param({"col6": "{'a': 'b'}"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col6": "{'a': 'b'}"}, id="cannot-cast-to-dict"),
+ pytest.param({"col7": "['a', 'b']"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col7": "['a', 'b']"}, id="cannot-cast-to-list-of-ints"),
+ pytest.param({"col8": "['a', 'b']"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col8": "['a', 'b']"}, id="cannot-cast-to-list-of-strings"),
+ pytest.param({"col9": "['a', 'b']"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col9": "['a', 'b']"}, id="cannot-cast-to-list-of-objects"),
+ pytest.param({"col10": "x"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col10": "x"}, id="item-not-in-props-doesn't-error"),
+ ]
+)
+def test_cast_to_python_type(row, true_values, false_values, expected_output):
+ csv_format = CsvFormat(true_values=true_values, false_values=false_values)
+ assert _cast_types(row, PROPERTY_TYPES, csv_format, logger) == expected_output
+
+
+@pytest.mark.parametrize(
+ "reader_values, expected_rows", [
+ pytest.param([{"col1": "1", "col2": None}], None, id="raise_exception_if_any_value_is_none"),
+ pytest.param([{"col1": "1", "col2": "2"}], [{"col1": "1", "col2": "2"}], id="read_no_cast"),
]
)
-def test_cast_to_python_type(row, expected_output):
- assert cast_types(row, PROPERTY_TYPES, logger) == expected_output
+def test_read_and_cast_types(reader_values, expected_rows):
+ reader = MagicMock()
+ reader.__iter__.return_value = reader_values
+ schema = {}
+ config_format = CsvFormat()
+ logger = Mock()
+
+ parser = CsvParser()
+
+ expected_rows = expected_rows
+ if expected_rows is None:
+ with pytest.raises(RecordParseError):
+ list(parser._read_and_cast_types(reader, schema, config_format, logger))
+ else:
+ assert expected_rows == list(parser._read_and_cast_types(reader, schema, config_format, logger))
diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/in_memory_files_source.py b/airbyte-cdk/python/unit_tests/sources/file_based/in_memory_files_source.py
index 693b295535f0..463b4bf557ef 100644
--- a/airbyte-cdk/python/unit_tests/sources/file_based/in_memory_files_source.py
+++ b/airbyte-cdk/python/unit_tests/sources/file_based/in_memory_files_source.py
@@ -100,7 +100,14 @@ def open_file(self, file: RemoteFile, mode: FileReadMode, logger: logging.Logger
raise NotImplementedError(f"No implementation for file type: {self.file_type}")
def _make_csv_file_contents(self, file_name: str) -> IOBase:
+
+ # Some tests define the csv as an array of strings to make it easier to validate the handling
+ # of quotes, delimiter, and escpare chars.
+ if isinstance(self.files[file_name]["contents"][0], str):
+ return io.StringIO("\n".join([s.strip() for s in self.files[file_name]["contents"]]))
+
fh = io.StringIO()
+
if self.file_write_options:
csv.register_dialect("in_memory_dialect", **self.file_write_options)
writer = csv.writer(fh, dialect="in_memory_dialect")
diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py
index 093cf81a6e6e..c5714cc570b3 100644
--- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py
+++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py
@@ -45,45 +45,68 @@
"properties": {
"streams": {
"title": "The list of streams to sync",
- "description": 'Each instance of this configuration defines a stream. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
+ "description": "Each instance of this configuration defines a stream. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.",
"order": 10,
"type": "array",
"items": {
"title": "FileBasedStreamConfig",
"type": "object",
"properties": {
- "name": {"title": "Name", "description": "The name of the stream.", "type": "string"},
+ "name": {
+ "title": "Name",
+ "description": "The name of the stream.",
+ "type": "string"
+ },
"file_type": {
"title": "File Type",
"description": "The data file type that is being extracted for a stream.",
- "type": "string",
+ "type": "string"
},
"globs": {
"title": "Globs",
- "description": 'The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look here.',
+ "description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look here.",
"type": "array",
- "items": {"type": "string"},
+ "items": {
+ "type": "string"
+ }
},
"validation_policy": {
"title": "Validation Policy",
"description": "The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
- "type": "string",
+ "type": "string"
},
"input_schema": {
"title": "Input Schema",
"description": "The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
- "oneOf": [{"type": "object"}, {"type": "string"}],
+ "oneOf": [
+ {
+ "type": "object"
+ },
+ {
+ "type": "string"
+ }
+ ]
},
"primary_key": {
"title": "Primary Key",
"description": "The column or columns (for a composite key) that serves as the unique identifier of a record.",
- "oneOf": [{"type": "string"}, {"type": "array", "items": {"type": "string"}}],
+ "oneOf": [
+ {
+ "type": "string"
+ },
+ {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ }
+ ]
},
"days_to_sync_if_history_is_full": {
"title": "Days To Sync If History Is Full",
"description": "When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
"default": 3,
- "type": "integer",
+ "type": "integer"
},
"format": {
"oneOf": [
@@ -100,16 +123,18 @@
"filetype": {
"title": "Filetype",
"default": "avro",
- "enum": ["avro"],
- "type": "string",
+ "enum": [
+ "avro"
+ ],
+ "type": "string"
},
"decimal_as_float": {
"title": "Convert Decimal Fields to Floats",
"description": "Whether to convert decimal fields to floats. There is a loss of precision when converting decimals to floats, so this is not recommended.",
"default": False,
- "type": "boolean",
- },
- },
+ "type": "boolean"
+ }
+ }
},
{
"title": "CsvFormat",
@@ -118,37 +143,39 @@
"filetype": {
"title": "Filetype",
"default": "csv",
- "enum": ["csv"],
- "type": "string",
+ "enum": [
+ "csv"
+ ],
+ "type": "string"
},
"delimiter": {
"title": "Delimiter",
"description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
"default": ",",
- "type": "string",
+ "type": "string"
},
"quote_char": {
"title": "Quote Character",
"description": "The character used for quoting CSV values. To disallow quoting, make this field blank.",
- "default": '"',
- "type": "string",
+ "default": "\"",
+ "type": "string"
},
"escape_char": {
"title": "Escape Character",
"description": "The character used for escaping special characters. To disallow escaping, leave this field blank.",
- "type": "string",
+ "type": "string"
},
"encoding": {
"title": "Encoding",
- "description": 'The character encoding of the CSV data. Leave blank to default to UTF8. See list of python encodings for allowable options.',
+ "description": "The character encoding of the CSV data. Leave blank to default to UTF8. See list of python encodings for allowable options.",
"default": "utf8",
- "type": "string",
+ "type": "string"
},
"double_quote": {
"title": "Double Quote",
"description": "Whether two quotes in a quoted CSV value denote a single quote in the data.",
"default": True,
- "type": "boolean",
+ "type": "boolean"
},
"quoting_behavior": {
"title": "Quoting Behavior",
@@ -158,10 +185,72 @@
"Quote All",
"Quote Special Characters",
"Quote Non-numeric",
- "Quote None",
+ "Quote None"
+ ]
+ },
+ "null_values": {
+ "title": "Null Values",
+ "description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
+ "default": [],
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "uniqueItems": True
+ },
+ "skip_rows_before_header": {
+ "title": "Skip Rows Before Header",
+ "description": "The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
+ "default": 0,
+ "type": "integer"
+ },
+ "skip_rows_after_header": {
+ "title": "Skip Rows After Header",
+ "description": "The number of rows to skip after the header row.",
+ "default": 0,
+ "type": "integer"
+ },
+ "autogenerate_column_names": {
+ "title": "Autogenerate Column Names",
+ "description": "Whether to autogenerate column names if column_names is empty. If true, column names will be of the form \u201cf0\u201d, \u201cf1\u201d\u2026 If false, column names will be read from the first CSV row after skip_rows_before_header.",
+ "default": False,
+ "type": "boolean"
+ },
+ "true_values": {
+ "title": "True Values",
+ "description": "A set of case-sensitive strings that should be interpreted as true values.",
+ "default": [
+ "y",
+ "yes",
+ "t",
+ "true",
+ "on",
+ "1"
],
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "uniqueItems": True
},
- },
+ "false_values": {
+ "title": "False Values",
+ "description": "A set of case-sensitive strings that should be interpreted as false values.",
+ "default": [
+ "n",
+ "no",
+ "f",
+ "false",
+ "off",
+ "0"
+ ],
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "uniqueItems": True
+ }
+ }
},
{
"title": "JsonlFormat",
@@ -170,10 +259,12 @@
"filetype": {
"title": "Filetype",
"default": "jsonl",
- "enum": ["jsonl"],
- "type": "string",
+ "enum": [
+ "jsonl"
+ ],
+ "type": "string"
}
- },
+ }
},
{
"title": "ParquetFormat",
@@ -182,50 +273,67 @@
"filetype": {
"title": "Filetype",
"default": "parquet",
- "enum": ["parquet"],
- "type": "string",
+ "enum": [
+ "parquet"
+ ],
+ "type": "string"
},
"decimal_as_float": {
"title": "Convert Decimal Fields to Floats",
"description": "Whether to convert decimal fields to floats. There is a loss of precision when converting decimals to floats, so this is not recommended.",
"default": False,
- "type": "boolean",
- },
- },
- },
+ "type": "boolean"
+ }
+ }
+ }
]
- },
+ }
},
{
"title": "Legacy Format",
- "required": ["filetype"],
+ "required": [
+ "filetype"
+ ],
"type": "object",
- "properties": {"filetype": {"title": "Filetype", "type": "string"}},
- },
+ "properties": {
+ "filetype": {
+ "title": "Filetype",
+ "type": "string"
+ }
+ }
+ }
]
},
"schemaless": {
"title": "Schemaless",
"description": "When enabled, syncs will not validate or structure records against the stream's schema.",
"default": False,
- "type": "boolean",
- },
+ "type": "boolean"
+ }
},
- "required": ["name", "file_type", "validation_policy"],
- },
+ "required": [
+ "name",
+ "file_type",
+ "validation_policy"
+ ]
+ }
},
"start_date": {
"title": "Start Date",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any file modified before this date will not be replicated.",
- "examples": ["2021-01-01T00:00:00Z"],
+ "examples": [
+ "2021-01-01T00:00:00Z"
+ ],
"format": "date-time",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"order": 1,
- "type": "string",
- },
+ "type": "string"
+ }
},
- "required": ["streams"],
- },
+ "required": [
+ "streams"
+ ]
+ }
}
)
.set_expected_catalog(
@@ -1531,3 +1639,1102 @@
.set_expected_discover_error(ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value)
.set_expected_read_error(ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value)
).build()
+
+csv_string_can_be_null_with_input_schemas_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_string_can_be_null_with_input_schema")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "input_schema": {"col1": "string", "col2": "string"},
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "null_values": ["null"],
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("col1", "col2"),
+ ("2", "null"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": "string"
+ },
+ "col2": {
+ "type": "string"
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": "2", "col2": None, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_string_not_null_if_no_null_values_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_string_not_null_if_no_null_values")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("col1", "col2"),
+ ("2", "null"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": "2", "col2": "null", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_strings_can_be_null_not_quoted_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_strings_can_be_null_no_input_schema")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "null_values": ["null"]
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("col1", "col2"),
+ ("2", "null"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": "2", "col2": None, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_newline_in_values_quoted_value_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_newline_in_values_quoted_value")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "quoting_behavior": "Quote All"
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ '''"col1","col2"''',
+ '''"2","val\n2"''',
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": "2", "col2": 'val\n2', "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_newline_in_values_not_quoted_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_newline_in_values_not_quoted")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ '''col1,col2''',
+ '''2,val\n2''',
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ # Note that the value for col2 is truncated to "val" because the newline is not escaped
+ {"data": {"col1": "2", "col2": 'val', "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+ .set_expected_logs({"read": [
+ {
+ "level": "ERROR",
+ "message": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. stream=stream1 file=a.csv line_no=2 n_skipped=0",
+ }
+ ]})
+).build()
+
+csv_escape_char_is_set_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_escape_char_is_set")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "double_quotes": False,
+ "quote_char": '"',
+ "delimiter": ",",
+ "escape_char": "\\",
+ "quoting_behavior": "Quote All",
+
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ '''col1,col2''',
+ '''val11,"val\\"2"''',
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": 'val11', "col2": 'val"2', "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_double_quote_is_set_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_doublequote_is_set")
+ # This scenario tests that quotes are properly escaped when double_quotes is True
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "double_quotes": True,
+ "quote_char": '"',
+ "delimiter": ",",
+ "quoting_behavior": "Quote All",
+
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ '''col1,col2''',
+ '''val11,"val""2"''',
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": 'val11', "col2": 'val"2', "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_custom_delimiter_with_escape_char_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_custom_delimiter_with_escape_char")
+ # This scenario tests that a value can contain the delimiter if it is wrapped in the quote_char
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "double_quotes": True,
+ "quote_char": '@',
+ "delimiter": "|",
+ "escape_char": "+"
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ '''col1|col2''',
+ '''val"1,1|val+|2''',
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": 'val"1,1', "col2": 'val|2', "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_custom_delimiter_in_double_quotes_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_custom_delimiter_in_double_quotes")
+ # This scenario tests that a value can contain the delimiter if it is wrapped in the quote_char
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "double_quotes": True,
+ "quote_char": '@',
+ "delimiter": "|",
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ '''col1|col2''',
+ '''val"1,1|@val|2@''',
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": 'val"1,1', "col2": 'val|2', "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+
+csv_skip_before_header_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_skip_before_header")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "skip_rows_before_header": 2
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("skip_this", "skip_this"),
+ ("skip_this_too", "skip_this_too"),
+ ("col1", "col2"),
+ ("val11", "val12"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": "val11", "col2": "val12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_skip_after_header_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_skip_after_header")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "skip_rows_after_header": 2
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("col1", "col2"),
+ ("skip_this", "skip_this"),
+ ("skip_this_too", "skip_this_too"),
+ ("val11", "val12"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": "val11", "col2": "val12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+
+csv_skip_before_and_after_header_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_skip_before_after_header")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "skip_rows_before_header": 1,
+ "skip_rows_after_header": 1,
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("skip_this", "skip_this"),
+ ("col1", "col2"),
+ ("skip_this_too", "skip_this_too"),
+ ("val11", "val12"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": ["null", "string"]
+ },
+ "col2": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": "val11", "col2": "val12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_autogenerate_column_names_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_autogenerate_column_names")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "autogenerate_column_names": True,
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("val11", "val12"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "f0": {
+ "type": ["null", "string"]
+ },
+ "f1": {
+ "type": ["null", "string"]
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"f0": "val11", "f1": "val12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_custom_bool_values_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_custom_bool_values")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "input_schema": {"col1": "boolean", "col2": "boolean"},
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "true_values": ["this_is_true"],
+ "false_values": ["this_is_false"],
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("col1", "col2"),
+ ("this_is_true", "this_is_false"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": "boolean"
+ },
+ "col2": {
+ "type": "boolean"
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": True, "col2": False, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
+
+csv_custom_null_values_scenario = (
+ TestScenarioBuilder()
+ .set_name("csv_custom_null_values")
+ .set_config(
+ {
+ "streams": [
+ {
+ "name": "stream1",
+ "file_type": "csv",
+ "globs": ["*"],
+ "validation_policy": "emit_record",
+ "input_schema": {"col1": "boolean", "col2": "string"},
+ "format": {
+ "csv": {
+ "filetype": "csv",
+ "null_values": ["null"],
+ }
+ }
+ }
+ ],
+ "start_date": "2023-06-04T03:54:07Z"
+ }
+ )
+ .set_files(
+ {
+ "a.csv": {
+ "contents": [
+ ("col1", "col2"),
+ ("null", "na"),
+ ],
+ "last_modified": "2023-06-05T03:54:07.000Z",
+ }
+ }
+ )
+ .set_file_type("csv")
+ .set_expected_catalog(
+ {
+ "streams": [
+ {
+ "default_cursor_field": ["_ab_source_file_last_modified"],
+ "json_schema": {
+ "type": "object",
+ "properties": {
+ "col1": {
+ "type": "boolean"
+ },
+ "col2": {
+ "type": "string"
+ },
+ "_ab_source_file_last_modified": {
+ "type": "string"
+ },
+ "_ab_source_file_url": {
+ "type": "string"
+ },
+ },
+ },
+ "name": "stream1",
+ "source_defined_cursor": True,
+ "supported_sync_modes": ["full_refresh", "incremental"],
+ }
+ ]
+ }
+ )
+ .set_expected_records(
+ [
+ {"data": {"col1": None, "col2": "na", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
+ "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
+ ]
+ )
+).build()
diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py
index 6c93a8a0053b..f6116e482b3c 100644
--- a/airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py
+++ b/airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py
@@ -34,10 +34,25 @@
success_user_provided_schema_scenario,
)
from unit_tests.sources.file_based.scenarios.csv_scenarios import (
+ csv_autogenerate_column_names_scenario,
+ csv_custom_bool_values_scenario,
+ csv_custom_delimiter_in_double_quotes_scenario,
+ csv_custom_delimiter_with_escape_char_scenario,
csv_custom_format_scenario,
+ csv_custom_null_values_scenario,
+ csv_double_quote_is_set_scenario,
+ csv_escape_char_is_set_scenario,
csv_legacy_format_scenario,
csv_multi_stream_scenario,
+ csv_newline_in_values_not_quoted_scenario,
+ csv_newline_in_values_quoted_value_scenario,
csv_single_stream_scenario,
+ csv_skip_after_header_scenario,
+ csv_skip_before_and_after_header_scenario,
+ csv_skip_before_header_scenario,
+ csv_string_can_be_null_with_input_schemas_scenario,
+ csv_string_not_null_if_no_null_values_scenario,
+ csv_strings_can_be_null_not_quoted_scenario,
empty_schema_inference_scenario,
invalid_csv_scenario,
multi_csv_scenario,
@@ -162,11 +177,26 @@
jsonl_user_input_schema_scenario,
schemaless_jsonl_scenario,
schemaless_jsonl_multi_stream_scenario,
+ csv_string_can_be_null_with_input_schemas_scenario,
+ csv_string_not_null_if_no_null_values_scenario,
+ csv_strings_can_be_null_not_quoted_scenario,
+ csv_newline_in_values_quoted_value_scenario,
+ csv_escape_char_is_set_scenario,
+ csv_double_quote_is_set_scenario,
+ csv_custom_delimiter_with_escape_char_scenario,
+ csv_custom_delimiter_in_double_quotes_scenario,
+ csv_skip_before_header_scenario,
+ csv_skip_after_header_scenario,
+ csv_skip_before_and_after_header_scenario,
+ csv_custom_bool_values_scenario,
+ csv_custom_null_values_scenario,
single_avro_scenario,
avro_all_types_scenario,
multiple_avro_combine_schema_scenario,
multiple_streams_avro_scenario,
avro_file_with_decimal_as_float_scenario,
+ csv_newline_in_values_not_quoted_scenario,
+ csv_autogenerate_column_names_scenario,
]