Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding incremental to the data model #998

Merged
merged 29 commits into from
Nov 18, 2020
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
worker builds
  • Loading branch information
cgardens committed Nov 18, 2020

Verified

This commit was signed with the committer’s verified signature.
mrT23 Tal
commit 41a18ffce47170723770a9ca6f0fc75c0d6dadc5
Original file line number Diff line number Diff line change
@@ -29,6 +29,8 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -51,6 +53,17 @@ public static AirbyteCatalog toCatalog(Schema schema) {
return new AirbyteCatalog().withStreams(airbyteStreams);
}

public static ConfiguredAirbyteCatalog toConfiguredCatalog(Schema schema) {
List<ConfiguredAirbyteStream> airbyteStreams = schema.getStreams().stream()
.map(s -> new ConfiguredAirbyteStream()
.withName(s.getName())
.withJsonSchema(toJson(s.getFields())))
// perform selection based on the output of toJson, which keeps properties if selected=true
.filter(s -> !s.getJsonSchema().get("properties").isEmpty())
.collect(Collectors.toList());
return new ConfiguredAirbyteCatalog().withStreams(airbyteStreams);
}

// todo (cgardens) - this will only work with table / column schemas. it's hack to get us through
// migration.
public static JsonNode toJson(List<Field> fields) {
Original file line number Diff line number Diff line change
@@ -55,6 +55,18 @@ public static AirbyteStream createAirbyteStream(String streamName, List<Field> f
return new AirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields));
}

public static ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(String streamName, Field... fields) {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(createConfiguredAirbyteStream(streamName, fields)));
}

public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String streamName, Field... fields) {
return createConfiguredAirbyteStream(streamName, Arrays.asList(fields));
}

public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String streamName, List<Field> fields) {
return new ConfiguredAirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields));
}

public static JsonNode fieldsToJsonSchema(Field... fields) {
return fieldsToJsonSchema(Arrays.asList(fields));
}
Original file line number Diff line number Diff line change
@@ -84,7 +84,6 @@ public OutputAndStatus<StandardSyncOutput> run(StandardSyncInput syncInput, Path
final StandardTargetConfig targetConfig = WorkerUtils.syncToTargetConfig(syncInput);

try (destination; source) {

destination.start(targetConfig, jobRoot);
source.start(tapConfig, jobRoot);

Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
@@ -62,7 +63,7 @@ public DefaultNormalizationRunner(final DestinationType destinationType, final P
}

@Override
public boolean normalize(Path jobRoot, JsonNode config, AirbyteCatalog catalog) throws Exception {
public boolean normalize(Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception {
IOs.writeFile(jobRoot, WorkerConstants.TARGET_CONFIG_JSON_FILENAME, Jsons.serialize(config));
IOs.writeFile(jobRoot, WorkerConstants.CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,14 +55,14 @@ default void start() throws Exception {
* @throws Exception - any exception thrown from normalization will be handled gracefully by the
* caller.
*/
boolean normalize(Path jobRoot, JsonNode config, AirbyteCatalog catalog) throws Exception;
boolean normalize(Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception;

class NoOpNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(NoOpNormalizationRunner.class);

@Override
public boolean normalize(Path jobRoot, JsonNode config, AirbyteCatalog catalog) {
public boolean normalize(Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog catalog) {
LOGGER.info("Running no op logger");
return true;
}
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ public static ImmutablePair<StandardSync, StandardSyncInput> createSyncConfig()
StandardSyncInput syncInput = new StandardSyncInput()
.withDestinationConnection(destinationConnectionConfig)
.withSyncMode(standardSync.getSyncMode())
.withCatalog(AirbyteProtocolConverters.toCatalog(standardSync.getSchema()))
.withCatalog(AirbyteProtocolConverters.toConfiguredCatalog(standardSync.getSchema()))
.withConnectionId(standardSync.getConnectionId())
.withSourceConnection(sourceConnectionConfig)
.withState(state);
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
@@ -49,7 +50,7 @@ class DefaultNormalizationRunnerTest {
private ProcessBuilderFactory pbf;
private Process process;
private JsonNode config;
private AirbyteCatalog catalog;
private ConfiguredAirbyteCatalog catalog;

@BeforeEach
void setup() throws IOException, WorkerException {
@@ -59,7 +60,7 @@ void setup() throws IOException, WorkerException {
process = mock(Process.class);

config = mock(JsonNode.class);
catalog = mock(AirbyteCatalog.class);
catalog = mock(ConfiguredAirbyteCatalog.class);

when(pbf.create(jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME, "run",
"--integration-type", "bigquery",
Original file line number Diff line number Diff line change
@@ -44,6 +44,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.workers.WorkerConstants;
@@ -69,9 +71,9 @@ class DefaultAirbyteSourceTest {
private static final String STREAM_NAME = "user_preferences";
private static final String FIELD_NAME = "favorite_color";

private static final AirbyteCatalog CATALOG = new AirbyteCatalog()
private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog()
.withStreams(Collections.singletonList(
new AirbyteStream()
new ConfiguredAirbyteStream()
.withName("hudi:latest")
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(new Field(FIELD_NAME, Field.JsonSchemaPrimitive.STRING)))));

@@ -80,7 +82,7 @@ class DefaultAirbyteSourceTest {
.withSourceConnectionConfiguration(Jsons.jsonNode(Map.of(
"apiKey", "123",
"region", "us-east")))
.withCatalog(CatalogHelpers.createAirbyteCatalog("hudi:latest", Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING)));
.withCatalog(CatalogHelpers.createConfiguredAirbyteCatalog("hudi:latest", Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING)));

private static final List<AirbyteMessage> MESSAGES = Lists.newArrayList(
AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"),