Skip to content

Commit

Permalink
pig in lipstick
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Dec 5, 2020
1 parent 76ef9fd commit f70c5af
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -100,10 +99,8 @@ public void run(String[] args) throws Exception {
case READ -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
// todo (cgardens) - should we should only send the contents of the state field to the integration,
// not the whole struct. this runner obfuscates everything but the contents.
final Optional<State> stateOptional = parsed.getStatePath().map(path -> parseConfig(path, State.class));
final Stream<AirbyteMessage> messageStream = source.read(config, catalog, stateOptional.map(State::getState).orElse(null));
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
final Stream<AirbyteMessage> messageStream = source.read(config, catalog, stateOptional.orElse(null));
messageStream.map(Jsons::serialize).forEach(stdoutConsumer);
messageStream.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
Expand Down Expand Up @@ -78,14 +77,13 @@ class IntegrationRunnerTest {

private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME)));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG);
private static final State STATE = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945")));
private static final JsonNode STATE = Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945"));

private IntegrationCliParser cliParser;
private Consumer<String> stdoutConsumer;
private Destination destination;
private Source source;
private Path configPath;
private Path catalogPath;
private Path configuredCatalogPath;
private Path statePath;

Expand All @@ -99,7 +97,6 @@ void setup() throws IOException {
Path configDir = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test");

configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING);
catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(CATALOG));
configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG));
statePath = IOs.writeFile(configDir, STATE_FILE_NAME, Jsons.serialize(STATE));
}
Expand Down Expand Up @@ -185,11 +182,11 @@ void testRead() throws Exception {
.withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald"))));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE.getState())).thenReturn(Stream.of(message1, message2));
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenReturn(Stream.of(message1, message2));

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);

verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE.getState());
verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE);
verify(stdoutConsumer).accept(Jsons.serialize(message1));
verify(stdoutConsumer).accept(Jsons.serialize(message2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@

package io.airbyte.integrations.standardtest.source;

import static io.airbyte.protocol.models.SyncMode.FULL_REFRESH;
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.StandardCheckConnectionInput;
Expand Down Expand Up @@ -65,6 +64,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -263,10 +263,10 @@ public void testIncrementalSyncWithState() throws Exception {
return;
}

ConfiguredAirbyteCatalog configuredAirbyteCatalog = withSourceDefinedCursors(getConfiguredCatalog());
List<AirbyteMessage> airbyteMessages = runRead(configuredAirbyteCatalog, getState());
List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
List<AirbyteStateMessage> stateMessages = airbyteMessages
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = withSourceDefinedCursors(getConfiguredCatalog());
final List<AirbyteMessage> airbyteMessages = runRead(configuredAirbyteCatalog, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == Type.STATE)
.map(AirbyteMessage::getState)
Expand Down Expand Up @@ -310,8 +310,18 @@ private List<AirbyteRecordMessage> filterRecords(Collection<AirbyteMessage> mess
private ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatalog catalog) {
ConfiguredAirbyteCatalog clone = Jsons.clone(catalog);
for (ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getSyncMode() == INCREMENTAL && configuredStream.getStream().getSourceDefinedCursor()) {
configuredStream.setCursorField(configuredStream.getStream().getDefaultCursorField());
if (configuredStream.getStream().getSupportedSyncModes().contains(io.airbyte.protocol.models.SyncMode.INCREMENTAL)) {
configuredStream.setSyncMode(io.airbyte.protocol.models.SyncMode.INCREMENTAL);
if (Optional.ofNullable(configuredStream.getStream().getSourceDefinedCursor()).orElse(false)) {
configuredStream.setCursorField(configuredStream.getStream().getDefaultCursorField());
} else {
// todo (cgardens) - this is really too terrible. there are some column types that aren't supported
// so you just need to order your columns such that we pick a valid one. too much guessing here. got
// to fix.
// see cursor field to an arbitrary field in the stream.
configuredStream
.setCursorField(Lists.newArrayList(new ArrayList<>(Jsons.keys(configuredStream.getStream().getJsonSchema().get("properties"))).get(0)));
}
}
}
return clone;
Expand All @@ -320,8 +330,8 @@ private ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatal
private ConfiguredAirbyteCatalog withFullRefreshSyncModes(ConfiguredAirbyteCatalog catalog) {
ConfiguredAirbyteCatalog clone = Jsons.clone(catalog);
for (ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getStream().getSupportedSyncModes().contains(FULL_REFRESH)) {
configuredStream.setSyncMode(FULL_REFRESH);
if (configuredStream.getStream().getSupportedSyncModes().contains(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)) {
configuredStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH);
}
}
return clone;
Expand All @@ -330,7 +340,7 @@ private ConfiguredAirbyteCatalog withFullRefreshSyncModes(ConfiguredAirbyteCatal
private boolean sourceSupportsIncremental() throws Exception {
ConfiguredAirbyteCatalog catalog = getConfiguredCatalog();
for (ConfiguredAirbyteStream stream : catalog.getStreams()) {
if (stream.getStream().getSupportedSyncModes().contains(INCREMENTAL)) {
if (stream.getStream().getSupportedSyncModes().contains(io.airbyte.protocol.models.SyncMode.INCREMENTAL)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.AbstractJooqSource;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSource extends AbstractJdbcSource implements Source {
public class PostgresSource extends AbstractJooqSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
Expand All @@ -35,6 +36,8 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -102,10 +105,15 @@ protected JsonNode getConfig() {

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.createConfiguredAirbyteCatalog(
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = CatalogHelpers.createConfiguredAirbyteCatalog(
STREAM_NAME,
Field.of("id", Field.JsonSchemaPrimitive.NUMBER),
Field.of("name", Field.JsonSchemaPrimitive.STRING));
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING));

configuredAirbyteCatalog.getStreams().forEach(
configuredStream -> configuredStream.getStream().withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)));

return configuredAirbyteCatalog;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,10 @@ public void testCanReadUtf8() throws Exception {

@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void testReadFailure() throws Exception {
void testReadFailure() {
final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0));
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream));
doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream();
doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream();

final PostgresSource source = new PostgresSource();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.commons.json.Jsons;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -64,6 +65,28 @@ public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String strea
return new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields)));
}

public static ConfiguredAirbyteStream createIncrementalConfiguredAirbyteStream(
String streamName,
SyncMode syncMode,
String cursorFieldName,
Field... fields) {
return createIncrementalConfiguredAirbyteStream(streamName, syncMode, cursorFieldName, Arrays.asList(fields));
}

public static ConfiguredAirbyteStream createIncrementalConfiguredAirbyteStream(
String streamName,
SyncMode syncMode,
String cursorFieldName,
List<Field> fields) {
return new ConfiguredAirbyteStream()
.withStream(new AirbyteStream()
.withName(streamName)
.withSupportedSyncModes(Collections.singletonList(syncMode))
.withJsonSchema(fieldsToJsonSchema(fields)))
.withSyncMode(syncMode)
.withCursorField(Collections.singletonList(cursorFieldName));
}

/**
* Convert a Catalog into a ConfiguredCatalog. This applies minimum default to the Catalog to make
* it a valid ConfiguredCatalog.
Expand Down

0 comments on commit f70c5af

Please sign in to comment.