Skip to content

Commit

Permalink
✨Add common async methods (#29003)
Browse files Browse the repository at this point in the history
* Add common async methods

* Automated Commit - Format and Process Resources Changes

---------

Co-authored-by: benmoriceau <benmoriceau@users.noreply.github.com>
  • Loading branch information
benmoriceau and benmoriceau authored Aug 2, 2023
1 parent 516e89c commit 792a2e5
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.base;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedBiConsumer;
import io.airbyte.protocol.models.v0.AirbyteMessage;

Expand Down Expand Up @@ -53,4 +54,29 @@ public interface SerializedAirbyteMessageConsumer extends CheckedBiConsumer<Stri
@Override
void close() throws Exception;

/**
* Append a function to be called on {@link SerializedAirbyteMessageConsumer#close}.
*/
static SerializedAirbyteMessageConsumer appendOnClose(final SerializedAirbyteMessageConsumer consumer, final VoidCallable voidCallable) {
return new SerializedAirbyteMessageConsumer() {

@Override
public void start() throws Exception {
consumer.start();
}

@Override
public void accept(final String message, final Integer sizeInBytes) throws Exception {
consumer.accept(message, sizeInBytes);
}

@Override
public void close() throws Exception {
consumer.close();
voidCallable.call();
}

};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -79,7 +80,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final SshTunnel tunnel = (endPointKey != null) ? SshTunnel.getInstance(config, endPointKey) : SshTunnel.getInstance(config, hostKey, portKey);
final SshTunnel tunnel = getTunnelInstance(config);

final AirbyteMessageConsumer delegateConsumer;
try {
Expand All @@ -92,4 +93,27 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
return AirbyteMessageConsumer.appendOnClose(delegateConsumer, tunnel::close);
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final SshTunnel tunnel = getTunnelInstance(config);
final SerializedAirbyteMessageConsumer delegateConsumer;
try {
delegateConsumer = delegate.getSerializedMessageConsumer(tunnel.getConfigInTunnel(), catalog, outputRecordCollector);
} catch (final Exception e) {
LOGGER.error("Exception occurred while getting the delegate consumer, closing SSH tunnel", e);
tunnel.close();
throw e;
}
return SerializedAirbyteMessageConsumer.appendOnClose(delegateConsumer, tunnel::close);
}

protected SshTunnel getTunnelInstance(final JsonNode config) throws Exception {
return (endPointKey != null)
? SshTunnel.getInstance(config, endPointKey)
: SshTunnel.getInstance(config, hostKey, portKey);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -66,4 +67,14 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
return typeToDestination.get(destinationType).getConsumer(config, catalog, outputRecordCollector);
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final T destinationType = configToType.apply(config);
LOGGER.info("Using destination type: " + destinationType.name());
return typeToDestination.get(destinationType).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
}

}

0 comments on commit 792a2e5

Please sign in to comment.