Skip to content

Commit

Permalink
🎉 Destination Local CSV: add custom delimiter (#17998)
Browse files Browse the repository at this point in the history
* Adds delimeter to the spec file. Adds function to get delimeter from the config.

* New delimiter works, but only checked raw airbyte tables.

* Fixes testWriteFailure(), testWriteSuccess() still broken.

* Corrects CSVFormat and now all tests pass.

* Implements tab separator.

* Corrects tooltip on destination settings page.

* Creates CSV file with delimiters and prints it as stirng.

* Adds try catch block for assertion. Deletes file after test run.

* Removes separate format for tab dleimiter, it is not needed.

* Cleans up code.

* Adds missing bracket.

* Adds files from incorrect rebase.

* Corrects imports.

* Fixes connectors base build.

* Corrects Dockerfile version bump. Adds changelog.

* Corrects getProtocolVersion method and makes CSVDataArgumentsProvider class static.

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
1 parent 58352c9 commit 364973e
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
- name: Local CSV
destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.10
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/local-csv
icon: file-csv.svg
releaseStage: alpha
Expand Down
44 changes: 42 additions & 2 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3297,7 +3297,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-csv:0.2.10"
- dockerImage: "airbyte/destination-csv:1.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/local-csv"
connectionSpecification:
Expand All @@ -3306,7 +3306,7 @@
type: "object"
required:
- "destination_path"
additionalProperties: false
additionalProperties: true
properties:
destination_path:
description: "Path to the directory where csv files will be written. The\
Expand All @@ -3317,6 +3317,46 @@
type: "string"
examples:
- "/local"
delimiter_type:
type: "object"
title: "Delimiter"
description: "The character delimiting individual cells in the CSV data."
oneOf:
- title: "Comma"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u002c"
- title: "Semicolon"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u003b"
- title: "Pipe"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u007c"
- title: "Tab"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u0009"
- title: "Space"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u0020"
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-csv

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.10
LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class CsvDestination extends BaseConnector implements Destination {

static final String DESTINATION_PATH_FIELD = "destination_path";

static final String DELIMITER_TYPE = "delimiter_type";

private final StandardNameTransformer namingResolver;

public CsvDestination() {
Expand Down Expand Up @@ -73,6 +75,8 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final Consumer<AirbyteMessage> outputRecordCollector)
throws IOException {
final Path destinationDir = getDestinationPath(config);
final Character delimiter = getDelimiter(config);
CSVFormat csvFormat;

FileUtils.forceMkdir(destinationDir.toFile());

Expand All @@ -83,8 +87,9 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final String tmpTableName = namingResolver.getTmpTableName(streamName);
final Path tmpPath = destinationDir.resolve(tmpTableName + ".csv");
final Path finalPath = destinationDir.resolve(tableName + ".csv");
CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
csvFormat = CSVFormat.DEFAULT.withDelimiter(delimiter);
csvFormat = csvFormat.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
Expand All @@ -96,7 +101,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
}
final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), Charset.defaultCharset(), isAppendMode);
final CSVPrinter printer = new CSVPrinter(fileWriter, csvFormat);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath));
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath, delimiter));
}

return new CsvConsumer(writeConfigs, catalog, outputRecordCollector);
Expand All @@ -122,6 +127,28 @@ protected Path getDestinationPath(final JsonNode config) {
return destinationPath;
}

/**
* Extract provided delimiter from csv config object.
*
* @param config - csv config object
* @return delimiter.
*/
protected Character getDelimiter(final JsonNode config) {

JsonNode tempConfig = config;
Character delimiter;

if (tempConfig.has(DELIMITER_TYPE)) {
String delimiter_as_text = tempConfig.get(DELIMITER_TYPE).get("delimiter").asText();
delimiter = (char) Integer.parseInt(delimiter_as_text.substring(2),16);
return delimiter;
} else {
delimiter = ',';
}
Preconditions.checkNotNull(delimiter);
return delimiter;
}

/**
* This consumer writes individual records to temporary files. If all of the messages are written
* successfully, it moves the tmp files to files named by their respective stream. If there are any
Expand Down Expand Up @@ -214,11 +241,13 @@ private static class WriteConfig {
private final CSVPrinter writer;
private final Path tmpPath;
private final Path finalPath;
private final Character delimiter;

public WriteConfig(final CSVPrinter writer, final Path tmpPath, final Path finalPath) {
public WriteConfig(final CSVPrinter writer, final Path tmpPath, final Path finalPath, final Character delimiter) {
this.writer = writer;
this.tmpPath = tmpPath;
this.finalPath = finalPath;
this.delimiter = delimiter;
}

public CSVPrinter getWriter() {
Expand All @@ -233,6 +262,10 @@ public Path getFinalPath() {
return finalPath;
}

public Character getDelimiter() {
return delimiter;
}

}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,69 @@
"title": "CSV Destination Spec",
"type": "object",
"required": ["destination_path"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"destination_path": {
"description": "Path to the directory where csv files will be written. The destination uses the local mount \"/local\" and any data files will be placed inside that local mount. For more information check out our <a href=\"https://docs.airbyte.com/integrations/destinations/local-csv\">docs</a>",
"type": "string",
"examples": ["/local"]
},
"delimiter_type": {
"type": "object",
"title": "Delimiter",
"description": "The character delimiting individual cells in the CSV data.",
"oneOf": [
{
"title": "Comma",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u002c"
}
}
},
{
"title": "Semicolon",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u003b"
}
}
},
{
"title": "Pipe",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u007c"
}
}
},
{
"title": "Tab",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u0009"
}
}
},
{
"title": "Space",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u0020"
}
}
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,39 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.argproviders.DataArgumentsProvider;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;

import java.io.FileReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.airbyte.integrations.standardtest.destination.argproviders.util.ArgumentProviderUtil;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsSource;

public class CsvDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final Path RELATIVE_PATH = Path.of("integration_test/test");
private JsonNode config;

@Override
protected String getImageName() {
Expand All @@ -37,6 +53,11 @@ protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.of("destination_path", Path.of("/local").resolve(RELATIVE_PATH).toString()));
}

protected JsonNode getConfigWithDelimiter(String delimiter) {
config = Jsons.jsonNode(ImmutableMap.of("destination_path", Path.of("/local").resolve(RELATIVE_PATH).toString(), "delimiter", delimiter));
return config;
}

// todo (cgardens) - it would be great if we could find a configuration here that failed. the
// commented out one fails in mac but not on the linux box that the github action runs in. instead
// we override the test here so it never runs.
Expand All @@ -54,6 +75,25 @@ protected JsonNode getFailCheckConfig() {
@Override
public void testCheckConnectionInvalidCredentials() {}


@ParameterizedTest
@ArgumentsSource(CSVDataArgumentsProvider.class)
public void testSyncWithDelimiter(final String messagesFilename, final String catalogFilename, String delimiter)
throws Exception {
final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename),
AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());

final JsonNode config = getConfigWithDelimiter(delimiter);
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema);
}

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
final String streamName,
Expand Down Expand Up @@ -90,4 +130,20 @@ protected void tearDown(final TestDestinationEnv testEnv) {
// no op
}

public static class CSVDataArgumentsProvider extends DataArgumentsProvider {

public CSVDataArgumentsProvider(){};
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) throws Exception {
ProtocolVersion protocolVersion = ArgumentProviderUtil.getProtocolVersion(context);
return Stream.of(
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u002c"),
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u003b"),
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u007c"),
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u0009"),
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u0020")
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ class CsvDestinationTest {
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, null, Field.of("goal", JsonSchemaType.STRING))));

private Path destinationPath;
private JsonNode delimiter;
private JsonNode config;

@BeforeEach
void setup() throws IOException {
destinationPath = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test");
config = Jsons.jsonNode(ImmutableMap.of(CsvDestination.DESTINATION_PATH_FIELD, destinationPath.toString()));
delimiter = Jsons.jsonNode(ImmutableMap.of("delimiter", "\\u002c"));
config = Jsons.jsonNode(ImmutableMap.of(CsvDestination.DESTINATION_PATH_FIELD, destinationPath.toString(), CsvDestination.DELIMITER_TYPE, delimiter));
}

private CsvDestination getDestination() {
Expand All @@ -98,6 +100,12 @@ private CsvDestination getDestination() {
return result;
}

private CsvDestination getDelimiter() {
final CsvDestination result = spy(CsvDestination.class);
doReturn(delimiter).when(result).getDelimiter(any());
return result;
}

@Test
void testSpec() throws Exception {
final ConnectorSpecification actual = getDestination().spec();
Expand Down
Loading

0 comments on commit 364973e

Please sign in to comment.