Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination Redshift (copy): accept bucket path for staging data #8607

Merged
merged 52 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
4f8eb4f
rename to Legacy
edgao Dec 7, 2021
0c4159d
add legacy test
edgao Dec 7, 2021
a62ead7
create S3StreamCopier
edgao Dec 7, 2021
daa9352
fix param deletion
edgao Dec 7, 2021
48d25d6
wip
edgao Dec 8, 2021
babcc77
wip
edgao Dec 8, 2021
098ce39
make tests work. mocks are awful.
edgao Dec 9, 2021
b949984
WIP replace old code; nothing works yet
edgao Dec 9, 2021
533880e
add getObjectKey; add S3CsvWriterTest
edgao Dec 9, 2021
36bdea8
write to multiple files correctly
edgao Dec 9, 2021
b03896a
correct deleteStagingFiles test
edgao Dec 9, 2021
dc8a2c4
completed things
edgao Dec 9, 2021
94029e4
fix test
edgao Dec 9, 2021
3019ef0
unit capitalization
edgao Dec 11, 2021
50a3a29
formatting
edgao Dec 11, 2021
7dfd452
wip
edgao Dec 11, 2021
15b9f27
remove mistaken dep
edgao Dec 13, 2021
c54baf3
use UUID suffix
edgao Dec 13, 2021
5c367df
various improvements
edgao Dec 13, 2021
2b27a4e
optional header; unit test file contents
edgao Dec 14, 2021
2bcf9a5
fix field name
edgao Dec 14, 2021
24c6a19
remove comment
edgao Dec 14, 2021
d860fd6
RECORD CLASS RECORD CLASS
edgao Dec 14, 2021
ff400c7
warning
edgao Dec 14, 2021
0fe979b
text block
edgao Dec 14, 2021
685f366
add more csv options
edgao Dec 14, 2021
d327bcc
update comment
edgao Dec 14, 2021
0f7b2bd
assert copy operation
edgao Dec 14, 2021
866db40
add test
edgao Dec 14, 2021
1b4f41f
cutover to non-legacy stream copier
edgao Dec 14, 2021
89f53fe
update param name
edgao Dec 14, 2021
5cd8b56
minor comments about sheet generator + Flattening
edgao Dec 14, 2021
7a1de37
timezones :(
edgao Dec 15, 2021
d4fcff7
add dup code comment
edgao Dec 15, 2021
9d3c6c1
delete redundant tests
edgao Dec 15, 2021
e975bb8
manifest also exists within bucketPath
edgao Dec 15, 2021
0ef601e
add comment
edgao Dec 15, 2021
734141d
better comment
edgao Dec 15, 2021
8cd2e51
rename getObjectKey + add javadoc
edgao Dec 15, 2021
842e1d9
explain default behavior
edgao Dec 15, 2021
485fe5d
remove from abstract classes
edgao Dec 15, 2021
1bdfab4
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 15, 2021
ed07183
reformat
edgao Dec 16, 2021
5d0427a
add implementation for getObjectPath
edgao Dec 16, 2021
fed6c36
prepare for publish
edgao Dec 16, 2021
f58bb90
follow doc conventions
edgao Dec 16, 2021
87b8543
follow doc conventions
edgao Dec 16, 2021
028c3b6
rename to getOutputPath
edgao Dec 16, 2021
192d6a3
add comment
edgao Dec 16, 2021
257dcb7
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 16, 2021
372098d
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 17, 2021
7625bee
regenerate seed specs
edgao Dec 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.LegacyS3StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
Expand All @@ -23,9 +24,8 @@
import org.slf4j.LoggerFactory;

/**
* This implementation is similar to
* {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. The difference is that
* this implementation creates Parquet staging files, instead of CSV ones.
* This implementation is similar to {@link LegacyS3StreamCopier}. The difference is that this
* implementation creates Parquet staging files, instead of CSV ones.
* <p>
* </p>
* It does the following operations:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class GcsAvroWriter extends BaseGcsWriter implements S3Writer {
private final StreamTransferManager uploadManager;
private final MultiPartOutputStream outputStream;
private final DataFileWriter<GenericData.Record> dataFileWriter;
private final String objectKey;

public GcsAvroWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -47,7 +48,7 @@ public GcsAvroWriter(final GcsDestinationConfig config,
super(config, s3Client, configuredStream);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.AVRO);
final String objectKey = String.join("/", outputPrefix, outputFilename);
objectKey = String.join("/", outputPrefix, outputFilename);

LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
objectKey);
Expand Down Expand Up @@ -85,4 +86,9 @@ protected void closeWhenFail() throws IOException {
uploadManager.abort();
}

@Override
public String getObjectPath() {
return objectKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class GcsCsvWriter extends BaseGcsWriter implements S3Writer {
private final MultiPartOutputStream outputStream;
private final CSVPrinter csvPrinter;
private final String gcsCsvFileLocation; // this used in destination-bigquery (GCS upload type)
private final String objectKey;

public GcsCsvWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -48,7 +49,7 @@ public GcsCsvWriter(final GcsDestinationConfig config,
this.csvSheetGenerator = CsvSheetGenerator.Factory.create(configuredStream.getStream().getJsonSchema(), formatConfig);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.CSV);
final String objectKey = String.join("/", outputPrefix, outputFilename);
objectKey = String.join("/", outputPrefix, outputFilename);
gcsCsvFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);

LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
Expand Down Expand Up @@ -90,4 +91,9 @@ public CSVPrinter getCsvPrinter() {
return csvPrinter;
}

@Override
public String getObjectPath() {
return objectKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class GcsJsonlWriter extends BaseGcsWriter implements S3Writer {
private final StreamTransferManager uploadManager;
private final MultiPartOutputStream outputStream;
private final PrintWriter printWriter;
private final String objectKey;

public GcsJsonlWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -43,7 +44,7 @@ public GcsJsonlWriter(final GcsDestinationConfig config,
super(config, s3Client, configuredStream);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.JSONL);
final String objectKey = String.join("/", outputPrefix, outputFilename);
objectKey = String.join("/", outputPrefix, outputFilename);

LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);

Expand Down Expand Up @@ -78,4 +79,9 @@ protected void closeWhenFail() {
uploadManager.abort();
}

@Override
public String getObjectPath() {
return objectKey;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig;
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
Expand Down Expand Up @@ -37,10 +36,10 @@ public class GcsParquetWriter extends BaseGcsWriter implements S3Writer {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsParquetWriter.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ObjectWriter WRITER = MAPPER.writer();

private final ParquetWriter<Record> parquetWriter;
private final AvroRecordFactory avroRecordFactory;
private final String objectKey;

public GcsParquetWriter(final GcsDestinationConfig config,
final AmazonS3 s3Client,
Expand All @@ -52,7 +51,7 @@ public GcsParquetWriter(final GcsDestinationConfig config,
super(config, s3Client, configuredStream);

final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.PARQUET);
final String objectKey = String.join("/", outputPrefix, outputFilename);
objectKey = String.join("/", outputPrefix, outputFilename);
LOGGER.info("Storage path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);

final URI uri = new URI(String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename));
Expand Down Expand Up @@ -109,4 +108,9 @@ public void close(final boolean hasFailed) throws IOException {
}
}

@Override
public String getObjectPath() {
return objectKey;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.gcs.avro;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;

class GcsAvroWriterTest {

@Test
public void generatesCorrectObjectPath() throws IOException {
final GcsAvroWriter writer = new GcsAvroWriter(
new GcsDestinationConfig(
"fake-bucket",
"fake-bucketPath",
"fake-bucketRegion",
null,
new S3AvroFormatConfig(new ObjectMapper().createObjectNode())),
mock(AmazonS3.class, RETURNS_DEEP_STUBS),
new ConfiguredAirbyteStream()
.withStream(new AirbyteStream()
.withNamespace("fake-namespace")
.withName("fake-stream")),
Timestamp.from(Instant.ofEpochMilli(1234)),
mock(Schema.class),
null);

assertEquals("fake-bucketPath/fake_namespace/fake_stream/1970_01_01_1234_0.avro", writer.getObjectPath());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'

testImplementation "org.testcontainers:postgresql:1.15.3"
testImplementation "org.mockito:mockito-inline:4.1.0"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation "org.testcontainers:postgresql:1.15.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class CopyConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(CopyConsumerFactory.class);

private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 mib
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 MiB

public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public interface StreamCopier {
/**
* Creates the staging file and all the necessary items to write data to this file.
*
* @return the name of the staging file
* @return A string that unqiuely identifies the file. E.g. the filename, or a unique suffix that is
* appended to a shared filename prefix
*/
String prepareStagingFile();

Expand Down
Loading