Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 8, 2021
1 parent d7bbaad commit 1ce7b90
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.integrations.destination.s3.csv.S3CsvWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.IOException;
import java.io.PrintWriter;
Expand Down Expand Up @@ -58,45 +62,66 @@ public abstract class S3StreamCopier implements StreamCopier {
protected final JdbcDatabase db;
private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
private final ConfiguredAirbyteStream configuredAirbyteStream;
private final Timestamp uploadTime;
protected final Set<String> s3StagingFiles = new HashSet<>();
private final Map<String, StreamTransferManager> multipartUploadManagers = new HashMap<>();
private final Map<String, MultiPartOutputStream> outputStreams = new HashMap<>();
private final Map<String, CSVPrinter> csvPrinters = new HashMap<>();
protected final String stagingFolder;
private final StagingFilenameGenerator filenameGenerator;
private final Map<String, S3CsvWriter> stagingWriters = new HashMap<>();

public S3StreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
this.destSyncMode = destSyncMode;
final SqlOperations sqlOperations,
final ConfiguredAirbyteStream configuredAirbyteStream,
final Timestamp uploadTime) {
this.destSyncMode = configuredAirbyteStream.getDestinationSyncMode();
this.schemaName = schema;
this.streamName = streamName;
this.streamName = configuredAirbyteStream.getStream().getName();
this.stagingFolder = stagingFolder;
this.db = db;
this.nameTransformer = nameTransformer;
this.sqlOperations = sqlOperations;
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.configuredAirbyteStream = configuredAirbyteStream;
this.uploadTime = uploadTime;
this.tmpTableName = nameTransformer.getTmpTableName(this.streamName);
this.s3Client = client;
this.s3Config = s3Config;
this.filenameGenerator = new StagingFilenameGenerator(streamName, MAX_PARTS_PER_FILE);
this.filenameGenerator = new StagingFilenameGenerator(this.streamName, MAX_PARTS_PER_FILE);
}

private String prepareS3StagingFile() {
return String.join("/", stagingFolder, schemaName, filenameGenerator.getStagingFilename());
}

/*
* old behavior: create s3://bucket/randomUuid/(namespace|schemaName)/generatedFilename
* S3CsvWiter: create s3://bucket/bucketPath(/namespace)?/streamName/time.csv
*/
@Override
public String prepareStagingFile() {
final var name = prepareS3StagingFile();
if (!s3StagingFiles.contains(name)) {
s3StagingFiles.add(name);
if (!stagingWriters.containsKey(name)) {
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());

try {
final S3CsvWriter writer = new S3CsvWriter(
s3Config.cloneWithFormatConfig(new S3CsvFormatConfig(Flattening.ROOT_LEVEL, (long) s3Config.getPartSize())),
s3Client,
configuredAirbyteStream,
uploadTime
);
stagingWriters.put(name, writer);
} catch (final IOException e) {
throw new RuntimeException(e);
}

// The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not
// have support for streaming multipart uploads;
// The alternative is first writing the entire output to disk before loading into S3. This is not
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.airbyte.integrations.destination.jdbc.copy.s3;

import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;

public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {

/**
* Used by the copy consumer.
*/
@Override
public StreamCopier create(final String configuredSchema,
final S3DestinationConfig s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
try {
final AirbyteStream stream = configuredStream.getStream();
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
final AmazonS3 s3Client = s3Config.getS3Client();

return create(stagingFolder, syncMode, schema, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

/**
* For specific copier suppliers to implement.
*/
public abstract StreamCopier create(String stagingFolder,
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3DestinationConfig s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -44,9 +48,7 @@ public void setup() {

copier = new S3StreamCopier(
"fake-staging-folder",
DestinationSyncMode.OVERWRITE,
"fake-schema",
"fake-stream",
s3Client,
db,
new S3DestinationConfig(
Expand All @@ -59,7 +61,11 @@ public void setup() {
null
),
new ExtendedNameTransformer(),
sqlOperations
sqlOperations,
new ConfiguredAirbyteStream()
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(new AirbyteStream().withName("fake-stream")),
Timestamp.from(Instant.now())
) {
@Override
public void copyS3CsvFileIntoTable(
Expand Down Expand Up @@ -108,7 +114,8 @@ public void closesS3Upload_when_stagingUploaderClosedFailingly() throws Exceptio
final RuntimeException exception = assertThrows(RuntimeException.class, () -> copier.closeStagingUploader(true));

// the wrapping chain is RuntimeException -> ExecutionException -> RuntimeException -> InterruptedException
assertEquals(InterruptedException.class, exception.getCause().getCause().getCause().getClass(), "Original exception: " + ExceptionUtils.readStackTrace(exception));
assertEquals(InterruptedException.class, exception.getCause().getCause().getCause().getClass(),
"Original exception: " + ExceptionUtils.readStackTrace(exception));
}

@Test
Expand Down

0 comments on commit 1ce7b90

Please sign in to comment.