Skip to content

Commit

Permalink
Logging recordWriter and onStreamFlush completion (#23360)
Browse files Browse the repository at this point in the history
* Adds additional logging when flushing buffer and writing records

* Removes logging for writeRecord since this will explode log lines

* Added logging when uploading records to stage/bucket

* Fixes log lines to properly capture when records have been uploaded

* Bumps version and fixes logging message to more accurately reflect logic

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
ryankfu and octavia-squidington-iii authored Feb 27, 2023
1 parent 90884d0 commit 32ae1b0
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.48
dockerImageTag: 0.4.49
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6050,7 +6050,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.48"
- dockerImage: "airbyte/destination-snowflake:0.4.49"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,16 @@ public String uploadRecordsToBucket(final SerializableBuffer recordsData,
}

try {
return loadDataIntoBucket(objectPath, recordsData);
final String fileName = loadDataIntoBucket(objectPath, recordsData);
LOGGER.info("Successfully loaded records to stage {} with {} re-attempt(s)", objectPath, exceptionsThrown.size());
return fileName;
} catch (final Exception e) {
LOGGER.error("Failed to upload records into storage {}", objectPath, e);
exceptionsThrown.add(e);
}
}
// Verifying that ALL exceptions are authentication related before assuming this is a configuration
// issue
// reduces risk of misidentifying errors or reporting a transient error.
// issue reduces risk of misidentifying errors or reporting a transient error.
final boolean areAllExceptionsAuthExceptions = exceptionsThrown.stream().filter(e -> e instanceof AmazonS3Exception)
.map(s3e -> ((AmazonS3Exception) s3e).getStatusCode())
.filter(ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES::contains)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public Optional<BufferFlushType> addRecord(final AirbyteStreamNameNamespacePair

@Override
public void flushWriter(final AirbyteStreamNameNamespacePair stream, final SerializableBuffer writer) throws Exception {
LOGGER.info("Flushing single stream {}: {} records", stream, streamBuffer.get(stream).size());
LOGGER.info("Flushing single stream {}: {} records", stream.getName(), streamBuffer.get(stream).size());
recordWriter.accept(stream, streamBuffer.get(stream));
LOGGER.info("Flushing completed for {}", stream.getName());
}

@Override
Expand All @@ -88,6 +89,7 @@ public void flushAll() throws Exception {
if (checkAndRemoveRecordWriter != null) {
fileName = checkAndRemoveRecordWriter.apply(entry.getKey(), fileName);
}
LOGGER.info("Flushing completed for {}", entry.getKey().getName());
}
close();
clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void flushWriter(final AirbyteStreamNameNamespacePair stream, final Seria
onStreamFlush.accept(stream, writer);
totalBufferSizeInBytes -= writer.getByteCount();
allBuffers.remove(stream);
LOGGER.info("Flushing completed for {}", stream.getName());
}

@Override
Expand All @@ -130,10 +131,10 @@ public void flushAll() throws Exception {
for (final Entry<AirbyteStreamNameNamespacePair, SerializableBuffer> entry : allBuffers.entrySet()) {
LOGGER.info("Flushing buffer of stream {} ({})", entry.getKey().getName(), FileUtils.byteCountToDisplaySize(entry.getValue().getByteCount()));
onStreamFlush.accept(entry.getKey(), entry.getValue());
LOGGER.info("Flushing completed for {}", entry.getKey().getName());
}
close();
clear();

totalBufferSizeInBytes = 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.48
LABEL io.airbyte.version=0.4.49
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public String uploadRecordsToStage(final JdbcDatabase database,
final List<Exception> exceptionsThrown = new ArrayList<>();
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT) {
try {
return loadDataIntoBucket(stagingPath, recordsData);
final String fileName = loadDataIntoBucket(stagingPath, recordsData);
LOGGER.info("Successfully loaded records to stage {} with {} re-attempt(s)", stagingPath, exceptionsThrown.size());
return fileName;
} catch (final Exception e) {
LOGGER.error("Failed to upload records into storage {}", stagingPath, e);
exceptionsThrown.add(e);
Expand All @@ -113,6 +115,17 @@ public String uploadRecordsToStage(final JdbcDatabase database,
throw new RuntimeException(String.format("Exceptions thrown while uploading records into storage: %s", Strings.join(exceptionsThrown, "\n")));
}

/**
* Upload the file from {@code recordsData} to S3 and simplify the filename as <partId>.<extension>.
*
* <p>
* Method mirrors similarly named method within {@link io.airbyte.integrations.destination.s3.S3StorageOperations}
* </p>
*
* @param objectPath filepath to the object
* @param recordsData serialized {@link io.airbyte.protocol.models.AirbyteRecordMessage}s
* @return the uploaded filename, which is different from the serialized buffer filename
*/
private String loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {

final String fullObjectKey = objectPath + recordsData.getFilename();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public String uploadRecordsToStage(final JdbcDatabase database,
boolean succeeded = false;
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT && !succeeded) {
try {
loadDataIntoStage(database, stageName, stagingPath, recordsData);
uploadRecordsToBucket(database, stageName, stagingPath, recordsData);
succeeded = true;
} catch (final Exception e) {
LOGGER.error("Failed to upload records into stage {}", stagingPath, e);
Expand All @@ -84,10 +84,11 @@ public String uploadRecordsToStage(final JdbcDatabase database,
throw new RuntimeException(
String.format("Exceptions thrown while uploading records into stage: %s", Strings.join(exceptionsThrown, "\n")));
}
LOGGER.info("Successfully loaded records to stage {} with {} re-attempt(s)", stagingPath, exceptionsThrown.size());
return recordsData.getFilename();
}

private void loadDataIntoStage(final JdbcDatabase database, final String stageName, final String stagingPath, final SerializableBuffer recordsData)
private void uploadRecordsToBucket(final JdbcDatabase database, final String stageName, final String stagingPath, final SerializableBuffer recordsData)
throws Exception {
final String query = getPutQuery(stageName, stagingPath, recordsData.getFile().getAbsolutePath());
LOGGER.debug("Executing query: {}", query);
Expand Down Expand Up @@ -132,7 +133,7 @@ public void createStageIfNotExists(final JdbcDatabase database, final String sta
LOGGER.debug("Executing query: {}", query);
try {
database.execute(query);
} catch (Exception e) {
} catch (final Exception e) {
throw checkForKnownConfigExceptions(e).orElseThrow(() -> e);
}
}
Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@
| **S3 Glue** | <img alt="S3 Glue icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/s3-glue.svg" height="30" height="30"/> | Destination | airbyte/destination-s3-glue:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/destinations/s3-glue) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-s3-glue) | <small>`471e5cab-8ed1-49f3-ba11-79c687784737`</small> |
| **SFTP-JSON** | <img alt="SFTP-JSON icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Destination | airbyte/destination-sftp-json:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/destinations/sftp-json) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-sftp-json) | <small>`e9810f61-4bab-46d2-bb22-edfc902e0644`</small> |
| **Scylla** | <img alt="Scylla icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/scylla.svg" height="30" height="30"/> | Destination | airbyte/destination-scylla:0.1.3 | alpha | [link](https://docs.airbyte.com/integrations/destinations/scylla) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-scylla) | <small>`3dc6f384-cd6b-4be3-ad16-a41450899bf0`</small> |
| **Snowflake** | <img alt="Snowflake icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/snowflake.svg" height="30" height="30"/> | Destination | airbyte/destination-snowflake:0.4.48 | generally_available | [link](https://docs.airbyte.com/integrations/destinations/snowflake) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-snowflake) | <small>`424892c4-daac-4491-b35d-c6688ba547ba`</small> |
| **Snowflake** | <img alt="Snowflake icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/snowflake.svg" height="30" height="30"/> | Destination | airbyte/destination-snowflake:0.4.49 | generally_available | [link](https://docs.airbyte.com/integrations/destinations/snowflake) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-snowflake) | <small>`424892c4-daac-4491-b35d-c6688ba547ba`</small> |
| **Streamr** | <img alt="Streamr icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/streamr.svg" height="30" height="30"/> | Destination | ghcr.io/devmate-cloud/streamr-airbyte-connectors:0.0.1 | alpha | [link](https://docs.airbyte.com/integrations/destinations/streamr) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/devmate-cloud) | <small>`eebd85cf-60b2-4af6-9ba0-edeca01437b0`</small> |
| **Teradata Vantage** | <img alt="Teradata Vantage icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/teradata.svg" height="30" height="30"/> | Destination | airbyte/destination-teradata:0.1.0 | alpha | [link](https://docs.airbyte.io/integrations/destinations/teradata) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-teradata) | <small>`58e6f9da-904e-11ed-a1eb-0242ac120002`</small> |
| **TiDB** | <img alt="TiDB icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/tidb.svg" height="30" height="30"/> | Destination | airbyte/destination-tidb:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/destinations/tidb) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-tidb) | <small>`06ec60c7-7468-45c0-91ac-174f6e1a788b`</small> |
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,12 @@ Otherwise, make sure to grant the role the required permissions in the desired n
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.49 | 2023-02-27 | [\#23360](https://github.com/airbytehq/airbyte/pull/23360) | Added logging for flushing and writing data to destination storage |
| 0.4.48 | 2023-02-23 | [\#22877](https://github.com/airbytehq/airbyte/pull/22877) | Add handler for IP not in whitelist error and more handlers for insufficient permission error |
| 0.4.47 | 2023-01-30 | [\#21912](https://github.com/airbytehq/airbyte/pull/21912) | Catch "Create" Table and Stage Known Permissions and rethrow as ConfigExceptions |
| 0.4.46 | 2023-01-26 | [\#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging |
| 0.4.45 | 2023-01-25 | [#21087](https://github.com/airbytehq/airbyte/pull/21764) | Catch Known Permissions and rethrow as ConfigExceptions |
| 0.4.44 | 2023-01-20 | [#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
| 0.4.45 | 2023-01-25 | [\#21087](https://github.com/airbytehq/airbyte/pull/21764) | Catch Known Permissions and rethrow as ConfigExceptions |
| 0.4.44 | 2023-01-20 | [\#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
| 0.4.43 | 2023-01-20 | [\#21450](https://github.com/airbytehq/airbyte/pull/21450) | Updated Check methods to handle more possible s3 and gcs stagings issues |
| 0.4.42 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams |
| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards |
Expand Down

0 comments on commit 32ae1b0

Please sign in to comment.