From 4a2a5ebbb8a94c6d0508ccd97a3e9e05599bfea7 Mon Sep 17 00:00:00 2001 From: Nataly Merezhuk <65251165+natalyjazzviolin@users.noreply.github.com> Date: Tue, 10 Jan 2023 13:40:25 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Destination=20Local=20CSV:=20add?= =?UTF-8?q?=20custom=20delimiter=20(#17998)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adds delimeter to the spec file. Adds function to get delimeter from the config. * New delimiter works, but only checked raw airbyte tables. * Fixes testWriteFailure(), testWriteSuccess() still broken. * Corrects CSVFormat and now all tests pass. * Implements tab separator. * Corrects tooltip on destination settings page. * Creates CSV file with delimiters and prints it as stirng. * Adds try catch block for assertion. Deletes file after test run. * Removes separate format for tab dleimiter, it is not needed. * Cleans up code. * Adds missing bracket. * Adds files from incorrect rebase. * Corrects imports. * Fixes connectors base build. * Corrects Dockerfile version bump. Adds changelog. * Corrects getProtocolVersion method and makes CSVDataArgumentsProvider class static. * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 44 +++++++++++++- .../connectors/destination-csv/Dockerfile | 2 +- .../destination/csv/CsvDestination.java | 41 +++++++++++-- .../src/main/resources/spec.json | 59 ++++++++++++++++++- .../csv/CsvDestinationAcceptanceTest.java | 56 ++++++++++++++++++ .../destination/csv/CsvDestinationTest.java | 10 +++- docs/integrations/destinations/local-csv.md | 25 ++++++++ 8 files changed, 229 insertions(+), 10 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 36152ce7d170..de95948e3457 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -185,7 +185,7 @@ - name: Local CSV destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6 dockerRepository: airbyte/destination-csv - dockerImageTag: 0.2.10 + dockerImageTag: 1.0.0 documentationUrl: https://docs.airbyte.com/integrations/destinations/local-csv icon: file-csv.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index c23a6def9dce..a1d6ebd5f3a4 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3297,7 +3297,7 @@ supportsDBT: false supported_destination_sync_modes: - "append" -- dockerImage: "airbyte/destination-csv:0.2.10" +- dockerImage: "airbyte/destination-csv:1.0.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/local-csv" connectionSpecification: @@ -3306,7 +3306,7 @@ type: "object" required: - "destination_path" - additionalProperties: false + additionalProperties: true properties: destination_path: description: "Path to the directory where csv files will be written. The\ @@ -3317,6 +3317,46 @@ type: "string" examples: - "/local" + delimiter_type: + type: "object" + title: "Delimiter" + description: "The character delimiting individual cells in the CSV data." + oneOf: + - title: "Comma" + required: + - "delimiter" + properties: + delimiter: + type: "string" + const: "\\u002c" + - title: "Semicolon" + required: + - "delimiter" + properties: + delimiter: + type: "string" + const: "\\u003b" + - title: "Pipe" + required: + - "delimiter" + properties: + delimiter: + type: "string" + const: "\\u007c" + - title: "Tab" + required: + - "delimiter" + properties: + delimiter: + type: "string" + const: "\\u0009" + - title: "Space" + required: + - "delimiter" + properties: + delimiter: + type: "string" + const: "\\u0020" supportsIncremental: true supportsNormalization: false supportsDBT: false diff --git a/airbyte-integrations/connectors/destination-csv/Dockerfile b/airbyte-integrations/connectors/destination-csv/Dockerfile index f9f3456bf150..86684b73ad30 100644 --- a/airbyte-integrations/connectors/destination-csv/Dockerfile +++ b/airbyte-integrations/connectors/destination-csv/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-csv COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.10 +LABEL io.airbyte.version=1.0.0 LABEL io.airbyte.name=airbyte/destination-csv diff --git a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index 82f9dcbdea24..7d0155a02edb 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -45,6 +45,8 @@ public class CsvDestination extends BaseConnector implements Destination { static final String DESTINATION_PATH_FIELD = "destination_path"; + static final String DELIMITER_TYPE = "delimiter_type"; + private final StandardNameTransformer namingResolver; public CsvDestination() { @@ -73,6 +75,8 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final Consumer outputRecordCollector) throws IOException { final Path destinationDir = getDestinationPath(config); + final Character delimiter = getDelimiter(config); + CSVFormat csvFormat; FileUtils.forceMkdir(destinationDir.toFile()); @@ -83,8 +87,9 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final String tmpTableName = namingResolver.getTmpTableName(streamName); final Path tmpPath = destinationDir.resolve(tmpTableName + ".csv"); final Path finalPath = destinationDir.resolve(tableName + ".csv"); - CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT, - JavaBaseConstants.COLUMN_NAME_DATA); + csvFormat = CSVFormat.DEFAULT.withDelimiter(delimiter); + csvFormat = csvFormat.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT, + JavaBaseConstants.COLUMN_NAME_DATA); final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); if (syncMode == null) { throw new IllegalStateException("Undefined destination sync mode"); @@ -96,7 +101,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, } final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), Charset.defaultCharset(), isAppendMode); final CSVPrinter printer = new CSVPrinter(fileWriter, csvFormat); - writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath)); + writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath, delimiter)); } return new CsvConsumer(writeConfigs, catalog, outputRecordCollector); @@ -122,6 +127,28 @@ protected Path getDestinationPath(final JsonNode config) { return destinationPath; } + /** + * Extract provided delimiter from csv config object. + * + * @param config - csv config object + * @return delimiter. + */ + protected Character getDelimiter(final JsonNode config) { + + JsonNode tempConfig = config; + Character delimiter; + + if (tempConfig.has(DELIMITER_TYPE)) { + String delimiter_as_text = tempConfig.get(DELIMITER_TYPE).get("delimiter").asText(); + delimiter = (char) Integer.parseInt(delimiter_as_text.substring(2),16); + return delimiter; + } else { + delimiter = ','; + } + Preconditions.checkNotNull(delimiter); + return delimiter; + } + /** * This consumer writes individual records to temporary files. If all of the messages are written * successfully, it moves the tmp files to files named by their respective stream. If there are any @@ -214,11 +241,13 @@ private static class WriteConfig { private final CSVPrinter writer; private final Path tmpPath; private final Path finalPath; + private final Character delimiter; - public WriteConfig(final CSVPrinter writer, final Path tmpPath, final Path finalPath) { + public WriteConfig(final CSVPrinter writer, final Path tmpPath, final Path finalPath, final Character delimiter) { this.writer = writer; this.tmpPath = tmpPath; this.finalPath = finalPath; + this.delimiter = delimiter; } public CSVPrinter getWriter() { @@ -233,6 +262,10 @@ public Path getFinalPath() { return finalPath; } + public Character getDelimiter() { + return delimiter; + } + } public static void main(final String[] args) throws Exception { diff --git a/airbyte-integrations/connectors/destination-csv/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-csv/src/main/resources/spec.json index 8236712ee5e3..75a2558f173a 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-csv/src/main/resources/spec.json @@ -9,12 +9,69 @@ "title": "CSV Destination Spec", "type": "object", "required": ["destination_path"], - "additionalProperties": false, + "additionalProperties": true, "properties": { "destination_path": { "description": "Path to the directory where csv files will be written. The destination uses the local mount \"/local\" and any data files will be placed inside that local mount. For more information check out our docs", "type": "string", "examples": ["/local"] + }, + "delimiter_type": { + "type": "object", + "title": "Delimiter", + "description": "The character delimiting individual cells in the CSV data.", + "oneOf": [ + { + "title": "Comma", + "required": ["delimiter"], + "properties": { + "delimiter": { + "type": "string", + "const": "\\u002c" + } + } + }, + { + "title": "Semicolon", + "required": ["delimiter"], + "properties": { + "delimiter": { + "type": "string", + "const": "\\u003b" + } + } + }, + { + "title": "Pipe", + "required": ["delimiter"], + "properties": { + "delimiter": { + "type": "string", + "const": "\\u007c" + } + } + }, + { + "title": "Tab", + "required": ["delimiter"], + "properties": { + "delimiter": { + "type": "string", + "const": "\\u0009" + } + } + }, + { + "title": "Space", + "required": ["delimiter"], + "properties": { + "delimiter": { + "type": "string", + "const": "\\u0020" + } + } + } + ] } } } diff --git a/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationAcceptanceTest.java index a298bca67da6..a703b9c99c74 100644 --- a/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationAcceptanceTest.java @@ -9,9 +9,13 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.standardtest.destination.ProtocolVersion; +import io.airbyte.integrations.standardtest.destination.argproviders.DataArgumentsProvider; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; + import java.io.FileReader; import java.nio.charset.Charset; import java.nio.file.Files; @@ -19,13 +23,25 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; + +import io.airbyte.integrations.standardtest.destination.argproviders.util.ArgumentProviderUtil; +import io.airbyte.protocol.models.v0.AirbyteCatalog; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.CatalogHelpers; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsSource; public class CsvDestinationAcceptanceTest extends DestinationAcceptanceTest { private static final Path RELATIVE_PATH = Path.of("integration_test/test"); + private JsonNode config; @Override protected String getImageName() { @@ -37,6 +53,11 @@ protected JsonNode getConfig() { return Jsons.jsonNode(ImmutableMap.of("destination_path", Path.of("/local").resolve(RELATIVE_PATH).toString())); } + protected JsonNode getConfigWithDelimiter(String delimiter) { + config = Jsons.jsonNode(ImmutableMap.of("destination_path", Path.of("/local").resolve(RELATIVE_PATH).toString(), "delimiter", delimiter)); + return config; + } + // todo (cgardens) - it would be great if we could find a configuration here that failed. the // commented out one fails in mac but not on the linux box that the github action runs in. instead // we override the test here so it never runs. @@ -54,6 +75,25 @@ protected JsonNode getFailCheckConfig() { @Override public void testCheckConnectionInvalidCredentials() {} + + @ParameterizedTest + @ArgumentsSource(CSVDataArgumentsProvider.class) + public void testSyncWithDelimiter(final String messagesFilename, final String catalogFilename, String delimiter) + throws Exception { + final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), + AirbyteCatalog.class); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog( + catalog); + final List messages = MoreResources.readResource(messagesFilename).lines() + .map(record -> Jsons.deserialize(record, AirbyteMessage.class)) + .collect(Collectors.toList()); + + final JsonNode config = getConfigWithDelimiter(delimiter); + final String defaultSchema = getDefaultSchema(config); + runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false); + retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema); + } + @Override protected List retrieveRecords(final TestDestinationEnv testEnv, final String streamName, @@ -90,4 +130,20 @@ protected void tearDown(final TestDestinationEnv testEnv) { // no op } + public static class CSVDataArgumentsProvider extends DataArgumentsProvider { + + public CSVDataArgumentsProvider(){}; + @Override + public Stream provideArguments(final ExtensionContext context) throws Exception { + ProtocolVersion protocolVersion = ArgumentProviderUtil.getProtocolVersion(context); + return Stream.of( + Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u002c"), + Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u003b"), + Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u007c"), + Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u0009"), + Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u0020") + ); + } + } + } diff --git a/airbyte-integrations/connectors/destination-csv/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java b/airbyte-integrations/connectors/destination-csv/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java index 788fe4cc1039..ab1a6f482f91 100644 --- a/airbyte-integrations/connectors/destination-csv/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java +++ b/airbyte-integrations/connectors/destination-csv/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java @@ -84,12 +84,14 @@ class CsvDestinationTest { CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, null, Field.of("goal", JsonSchemaType.STRING)))); private Path destinationPath; + private JsonNode delimiter; private JsonNode config; @BeforeEach void setup() throws IOException { destinationPath = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test"); - config = Jsons.jsonNode(ImmutableMap.of(CsvDestination.DESTINATION_PATH_FIELD, destinationPath.toString())); + delimiter = Jsons.jsonNode(ImmutableMap.of("delimiter", "\\u002c")); + config = Jsons.jsonNode(ImmutableMap.of(CsvDestination.DESTINATION_PATH_FIELD, destinationPath.toString(), CsvDestination.DELIMITER_TYPE, delimiter)); } private CsvDestination getDestination() { @@ -98,6 +100,12 @@ private CsvDestination getDestination() { return result; } + private CsvDestination getDelimiter() { + final CsvDestination result = spy(CsvDestination.class); + doReturn(delimiter).when(result).getDelimiter(any()); + return result; + } + @Test void testSpec() throws Exception { final ConnectorSpecification actual = getDestination().spec(); diff --git a/docs/integrations/destinations/local-csv.md b/docs/integrations/destinations/local-csv.md index 86e437ea6140..77ffadec8e70 100644 --- a/docs/integrations/destinations/local-csv.md +++ b/docs/integrations/destinations/local-csv.md @@ -71,3 +71,28 @@ docker cp airbyte-server:/tmp/airbyte_local/{destination_path}/{filename}.csv . Note: If you are running Airbyte on Windows with Docker backed by WSL2, you have to use similar step as above or refer to this [link](../../operator-guides/locating-files-local-destination.md) for an alternative approach. +## Changelog + +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------| +| 1.0.0 | 2022-12-20 | [17998](https://github.com/airbytehq/airbyte/pull/17998) | Breaking changes: non backwards compatible. Adds delimiter dropdown. | +| 0.2.10 | 2022-06-20 | [13932](https://github.com/airbytehq/airbyte/pull/13932) | Merging published connector changes | +| 0.2.9 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add ExitOnOutOfMemoryError to java connectors and bump versions | +| 0.2.8 | 2021-07-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Checkpointing: Partial Success in BufferedStreamConsumer (Destination) | +| 0.2.7 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | add AIRBYTE_ENTRYPOINT for kubernetes support | +| 0.2.6 | 2021-05-25 | [3290](https://github.com/airbytehq/airbyte/pull/3290) | Checkpointing: Worker use destination (instead of source) for state | +| 0.2.5 | 2021-05-10 | [3327](https://github.com/airbytehq/airbyte/pull/3327) | don't split lines on LSEP unicode characters when reading lines in destinations | +| 0.2.4 | 2021-05-10 | [3289](https://github.com/airbytehq/airbyte/pull/3289) | bump all destination versions to support outputting messages | +| 0.2.3 | 2021-03-31 | [2668](https://github.com/airbytehq/airbyte/pull/2668) | Add SupportedDestinationSyncModes to destination specs objects | +| 0.2.2 | 2021-03-19 | [2460](https://github.com/airbytehq/airbyte/pull/2460) | Destinations supports destination sync mode | +| 0.2.0 | 2021-03-09 | [2238](https://github.com/airbytehq/airbyte/pull/2238) | Upgrade all connectors (0.2.0) so protocol allows future / unknown properties | +| 0.1.8 | 2021-01-29 | [1882](https://github.com/airbytehq/airbyte/pull/1882) | Local File Destinations UX change with destination paths | +| 0.1.7 | 2021-01-20 | [1737](https://github.com/airbytehq/airbyte/pull/1737) | Rename destination tables | +| 0.1.6 | 2021-01-19 | [1708](https://github.com/airbytehq/airbyte/pull/1708) | Add metadata prefix to destination internal columns | +| 0.1.5 | 2020-12-12 | [1294](https://github.com/airbytehq/airbyte/pull/1294) | Incremental CSV destination | +| 0.1.4 | 2020-11-30 | [1038](https://github.com/airbytehq/airbyte/pull/1038) | Change jdbc sources to discover more than standard schemas | +| 0.1.3 | 2020-11-20 | [1021](https://github.com/airbytehq/airbyte/pull/1021) | Incremental Docs and Data Model Update | +| 0.1.2 | 2020-11-18 | [998](https://github.com/airbytehq/airbyte/pull/998) | Adding incremental to the data model | +| 0.1.1 | 2020-11-10 | [895](https://github.com/airbytehq/airbyte/pull/895) | bump versions: all destinations and source exchange rate | +| 0.1.0 | 2020-10-21 | [676](https://github.com/airbytehq/airbyte/pull/676) | Integrations Reorganization: Connectors | +