Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination Local CSV: add custom delimiter #17998

Merged
merged 19 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@
- name: Local CSV
destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.10
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/local-csv
icon: file.svg
releaseStage: alpha
Expand Down
44 changes: 42 additions & 2 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3297,7 +3297,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-csv:0.2.10"
- dockerImage: "airbyte/destination-csv:1.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/local-csv"
connectionSpecification:
Expand All @@ -3306,7 +3306,7 @@
type: "object"
required:
- "destination_path"
additionalProperties: false
additionalProperties: true
properties:
destination_path:
description: "Path to the directory where csv files will be written. The\
Expand All @@ -3317,6 +3317,46 @@
type: "string"
examples:
- "/local"
delimiter_type:
type: "object"
title: "Delimiter"
description: "The character delimiting individual cells in the CSV data."
oneOf:
- title: "Comma"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u002c"
- title: "Semicolon"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u003b"
- title: "Pipe"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u007c"
- title: "Tab"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u0009"
- title: "Space"
required:
- "delimiter"
properties:
delimiter:
type: "string"
const: "\\u0020"
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-csv

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.10
LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class CsvDestination extends BaseConnector implements Destination {

static final String DESTINATION_PATH_FIELD = "destination_path";

static final String DELIMITER_TYPE = "delimiter_type";

private final StandardNameTransformer namingResolver;

public CsvDestination() {
Expand Down Expand Up @@ -73,6 +75,8 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final Consumer<AirbyteMessage> outputRecordCollector)
throws IOException {
final Path destinationDir = getDestinationPath(config);
final Character delimiter = getDelimiter(config);
CSVFormat csvFormat;

FileUtils.forceMkdir(destinationDir.toFile());

Expand All @@ -83,8 +87,9 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final String tmpTableName = namingResolver.getTmpTableName(streamName);
final Path tmpPath = destinationDir.resolve(tmpTableName + ".csv");
final Path finalPath = destinationDir.resolve(tableName + ".csv");
CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
csvFormat = CSVFormat.DEFAULT.withDelimiter(delimiter);
csvFormat = csvFormat.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
Expand All @@ -96,7 +101,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
}
final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), Charset.defaultCharset(), isAppendMode);
final CSVPrinter printer = new CSVPrinter(fileWriter, csvFormat);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath));
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath, delimiter));
}

return new CsvConsumer(writeConfigs, catalog, outputRecordCollector);
Expand All @@ -122,6 +127,28 @@ protected Path getDestinationPath(final JsonNode config) {
return destinationPath;
}

/**
* Extract provided delimiter from csv config object.
*
* @param config - csv config object
* @return delimiter.
*/
protected Character getDelimiter(final JsonNode config) {

JsonNode tempConfig = config;
Character delimiter;

if (tempConfig.has(DELIMITER_TYPE)) {
String delimiter_as_text = tempConfig.get(DELIMITER_TYPE).get("delimiter").asText();
delimiter = (char) Integer.parseInt(delimiter_as_text.substring(2),16);
return delimiter;
} else {
delimiter = ',';
}
Preconditions.checkNotNull(delimiter);
return delimiter;
}

/**
* This consumer writes individual records to temporary files. If all of the messages are written
* successfully, it moves the tmp files to files named by their respective stream. If there are any
Expand Down Expand Up @@ -214,11 +241,13 @@ private static class WriteConfig {
private final CSVPrinter writer;
private final Path tmpPath;
private final Path finalPath;
private final Character delimiter;

public WriteConfig(final CSVPrinter writer, final Path tmpPath, final Path finalPath) {
public WriteConfig(final CSVPrinter writer, final Path tmpPath, final Path finalPath, final Character delimiter) {
this.writer = writer;
this.tmpPath = tmpPath;
this.finalPath = finalPath;
this.delimiter = delimiter;
}

public CSVPrinter getWriter() {
Expand All @@ -233,6 +262,10 @@ public Path getFinalPath() {
return finalPath;
}

public Character getDelimiter() {
return delimiter;
}

}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,69 @@
"title": "CSV Destination Spec",
"type": "object",
"required": ["destination_path"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"destination_path": {
"description": "Path to the directory where csv files will be written. The destination uses the local mount \"/local\" and any data files will be placed inside that local mount. For more information check out our <a href=\"https://docs.airbyte.com/integrations/destinations/local-csv\">docs</a>",
"type": "string",
"examples": ["/local"]
},
"delimiter_type": {
"type": "object",
"title": "Delimiter",
"description": "The character delimiting individual cells in the CSV data.",
"oneOf": [
{
"title": "Comma",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u002c"
}
}
},
{
"title": "Semicolon",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u003b"
}
}
},
{
"title": "Pipe",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u007c"
}
}
},
{
"title": "Tab",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u0009"
}
}
},
{
"title": "Space",
"required": ["delimiter"],
"properties": {
"delimiter": {
"type": "string",
"const": "\\u0020"
}
}
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,39 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import io.airbyte.integrations.standardtest.destination.argproviders.DataArgumentsProvider;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;

import java.io.FileReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.airbyte.integrations.standardtest.destination.argproviders.util.ArgumentProviderUtil;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsSource;

public class CsvDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final Path RELATIVE_PATH = Path.of("integration_test/test");
private JsonNode config;

@Override
protected String getImageName() {
Expand All @@ -37,6 +53,11 @@ protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.of("destination_path", Path.of("/local").resolve(RELATIVE_PATH).toString()));
}

protected JsonNode getConfigWithDelimiter(String delimiter) {
config = Jsons.jsonNode(ImmutableMap.of("destination_path", Path.of("/local").resolve(RELATIVE_PATH).toString(), "delimiter", delimiter));
return config;
}

// todo (cgardens) - it would be great if we could find a configuration here that failed. the
// commented out one fails in mac but not on the linux box that the github action runs in. instead
// we override the test here so it never runs.
Expand All @@ -54,6 +75,25 @@ protected JsonNode getFailCheckConfig() {
@Override
public void testCheckConnectionInvalidCredentials() {}


@ParameterizedTest
@ArgumentsSource(CSVDataArgumentsProvider.class)
public void testSyncWithDelimiter(final String messagesFilename, final String catalogFilename, String delimiter)
throws Exception {
final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename),
AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());

final JsonNode config = getConfigWithDelimiter(delimiter);
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema);
}

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
final String streamName,
Expand Down Expand Up @@ -90,4 +130,20 @@ protected void tearDown(final TestDestinationEnv testEnv) {
// no op
}

public static class CSVDataArgumentsProvider extends DataArgumentsProvider {

public CSVDataArgumentsProvider(){};
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) throws Exception {
ProtocolVersion protocolVersion = ArgumentProviderUtil.getProtocolVersion(context);
return Stream.of(
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u002c"),
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u003b"),
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u007c"),
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u0009"),
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion), "\\u0020")
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ class CsvDestinationTest {
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, null, Field.of("goal", JsonSchemaType.STRING))));

private Path destinationPath;
private JsonNode delimiter;
private JsonNode config;

@BeforeEach
void setup() throws IOException {
destinationPath = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test");
config = Jsons.jsonNode(ImmutableMap.of(CsvDestination.DESTINATION_PATH_FIELD, destinationPath.toString()));
delimiter = Jsons.jsonNode(ImmutableMap.of("delimiter", "\\u002c"));
config = Jsons.jsonNode(ImmutableMap.of(CsvDestination.DESTINATION_PATH_FIELD, destinationPath.toString(), CsvDestination.DELIMITER_TYPE, delimiter));
}

private CsvDestination getDestination() {
Expand All @@ -98,6 +100,12 @@ private CsvDestination getDestination() {
return result;
}

private CsvDestination getDelimiter() {
final CsvDestination result = spy(CsvDestination.class);
doReturn(delimiter).when(result).getDelimiter(any());
return result;
}

@Test
void testSpec() throws Exception {
final ConnectorSpecification actual = getDestination().spec();
Expand Down
Loading